文档章节

聊聊elasticsearch的LagDetector

go4it
 go4it
发布于 05/13 23:32
字数 973
阅读 12
收藏 2

本文主要研究一下elasticsearch的LagDetector

LagDetector

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java

/**
 * A publication can succeed and complete before all nodes have applied the published state and acknowledged it; however we need every node
 * eventually either to apply the published state (or a later state) or be removed from the cluster. This component achieves this by
 * removing any lagging nodes from the cluster after a timeout.
 */
public class LagDetector {

    private static final Logger logger = LogManager.getLogger(LagDetector.class);

    // the timeout for each node to apply a cluster state update after the leader has applied it, before being removed from the cluster
    public static final Setting<TimeValue> CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING =
        Setting.timeSetting("cluster.follower_lag.timeout",
            TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

    private final TimeValue clusterStateApplicationTimeout;
    private final Consumer<DiscoveryNode> onLagDetected;
    private final Supplier<DiscoveryNode> localNodeSupplier;
    private final ThreadPool threadPool;
    private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();

    public LagDetector(final Settings settings, final ThreadPool threadPool, final Consumer<DiscoveryNode> onLagDetected,
                       final Supplier<DiscoveryNode> localNodeSupplier) {
        this.threadPool = threadPool;
        this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
        this.onLagDetected = onLagDetected;
        this.localNodeSupplier = localNodeSupplier;
    }

    public void setTrackedNodes(final Iterable<DiscoveryNode> discoveryNodes) {
        final Set<DiscoveryNode> discoveryNodeSet = new HashSet<>();
        discoveryNodes.forEach(discoveryNodeSet::add);
        discoveryNodeSet.remove(localNodeSupplier.get());
        appliedStateTrackersByNode.keySet().retainAll(discoveryNodeSet);
        discoveryNodeSet.forEach(node -> appliedStateTrackersByNode.putIfAbsent(node, new NodeAppliedStateTracker(node)));
    }

    public void clearTrackedNodes() {
        appliedStateTrackersByNode.clear();
    }

    public void setAppliedVersion(final DiscoveryNode discoveryNode, final long appliedVersion) {
        final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode);
        if (nodeAppliedStateTracker == null) {
            // Received an ack from a node that a later publication has removed (or we are no longer master). No big deal.
            logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion);
        } else {
            nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
        }
    }

    public void startLagDetector(final long version) {
        final List<NodeAppliedStateTracker> laggingTrackers
            = appliedStateTrackersByNode.values().stream().filter(t -> t.appliedVersionLessThan(version)).collect(Collectors.toList());

        if (laggingTrackers.isEmpty()) {
            logger.trace("lag detection for version {} is unnecessary: {}", version, appliedStateTrackersByNode.values());
        } else {
            logger.debug("starting lag detector for version {}: {}", version, laggingTrackers);

            threadPool.scheduleUnlessShuttingDown(clusterStateApplicationTimeout, Names.GENERIC, new Runnable() {
                @Override
                public void run() {
                    laggingTrackers.forEach(t -> t.checkForLag(version));
                }

                @Override
                public String toString() {
                    return "lag detector for version " + version + " on " + laggingTrackers;
                }
            });
        }
    }

    @Override
    public String toString() {
        return "LagDetector{" +
            "clusterStateApplicationTimeout=" + clusterStateApplicationTimeout +
            ", appliedStateTrackersByNode=" + appliedStateTrackersByNode.values() +
            '}';
    }

    // for assertions
    Set<DiscoveryNode> getTrackedNodes() {
        return Collections.unmodifiableSet(appliedStateTrackersByNode.keySet());
    }

    private class NodeAppliedStateTracker {
        private final DiscoveryNode discoveryNode;
        private final AtomicLong appliedVersion = new AtomicLong();

        NodeAppliedStateTracker(final DiscoveryNode discoveryNode) {
            this.discoveryNode = discoveryNode;
        }

        void increaseAppliedVersion(long appliedVersion) {
            long maxAppliedVersion = this.appliedVersion.updateAndGet(v -> Math.max(v, appliedVersion));
            logger.trace("{} applied version {}, max now {}", this, appliedVersion, maxAppliedVersion);
        }

        boolean appliedVersionLessThan(final long version) {
            return appliedVersion.get() < version;
        }

        @Override
        public String toString() {
            return "NodeAppliedStateTracker{" +
                "discoveryNode=" + discoveryNode +
                ", appliedVersion=" + appliedVersion +
                '}';
        }

        void checkForLag(final long version) {
            if (appliedStateTrackersByNode.get(discoveryNode) != this) {
                logger.trace("{} no longer active when checking version {}", this, version);
                return;
            }

            long appliedVersion = this.appliedVersion.get();
            if (version <= appliedVersion) {
                logger.trace("{} satisfied when checking version {}, node applied version {}", this, version, appliedVersion);
                return;
            }

            logger.debug("{}, detected lag at version {}, node has only applied version {}", this, version, appliedVersion);
            onLagDetected.accept(discoveryNode);
        }
    }
}
  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法
  • NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)

Coordinator

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

public class Coordinator extends AbstractLifecycleComponent implements Discovery {
	//......

    public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
                       NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
                       Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
                       ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {
        this.settings = settings;
        this.transportService = transportService;
        this.masterService = masterService;
        this.allocationService = allocationService;
        this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
        this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
        this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
            this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
        this.persistedStateSupplier = persistedStateSupplier;
        this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
        this.lastKnownLeader = Optional.empty();
        this.lastJoin = Optional.empty();
        this.joinAccumulator = new InitialJoinAccumulator();
        this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
        this.random = random;
        this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
        this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen);
        configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
        this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
            new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
        this.publicationHandler = new PublicationTransportHandler(transportService, namedWriteableRegistry,
            this::handlePublishRequest, this::handleApplyCommit);
        this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());
        this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode);
        this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
        this.clusterApplier = clusterApplier;
        masterService.setClusterStateSupplier(this::getStateForMasterService);
        this.reconfigurator = new Reconfigurator(settings, clusterSettings);
        this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
            this::isInitialConfigurationSet, this::setInitialConfiguration);
        this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,
            this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
        this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
            transportService::getLocalNode);
        this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
            transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
    }

    private void removeNode(DiscoveryNode discoveryNode, String reason) {
        synchronized (mutex) {
            if (mode == Mode.LEADER) {
                masterService.submitStateUpdateTask("node-left",
                    new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
                    ClusterStateTaskConfig.build(Priority.IMMEDIATE),
                    nodeRemovalExecutor,
                    nodeRemovalExecutor);
            }
        }
    }

	//......
}
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

小结

  • LagDetector用于检测并移除lagging nodes,其构造器读取cluster.follower_lag.timeout配置,默认为90000ms,最小值为1ms
  • startLagDetector首先获取从appliedStateTrackersByNode中过滤出appliedVersion小于指定version的laggingTrackers,之后延时clusterStateApplicationTimeout执行检测,其run方法会遍历laggingTrackers,挨个执行器NodeAppliedStateTracker.checkForLag方法;NodeAppliedStateTracker的checkForLag方法首先进行version判断,最后调用onLagDetected.accept(discoveryNode)
  • Coordinator的构造器创建了LagDetector,其Consumer<DiscoveryNode>执行的是removeNode方法,该方法在当前mode为LEADER的时候会执行NodeRemovalClusterStateTaskExecutor.Task

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1097
码字总数 1034972
作品 0
深圳
私信 提问
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
58
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
23
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
8
0
CentOS7 部署 ElasticSearch 集群

环境 主机名 IP 操作系统 ES 版本 es227 192.168.1.227 CentOS7.5 6.5.4 es228 192.168.1.228 CentOS7.5 6.5.4 es229 192.168.1.229 CentOS7.5 6.5.4 下载 elasticsearch-6.5.4.tar.gz --- 各......

俊赛潘安-才比管乐
2018/12/27
464
0
聊聊Elasticsearch的SizeBlockingQueue

序 本文主要研究一下Elasticsearch的SizeBlockingQueue SizeBlockingQueue elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java Si......

go4it
06/01
17
0

没有更多内容

加载失败,请刷新页面

加载更多

lopatkin俄大神Windows精简版系统 安装教程 简单版

1.制作U盘启动盘 或 安装pe到电脑 下载微pe工具箱.(为什么用这个呢,因为这个无毒,无广告,无后门.其它pe在安装完系统会安装一堆木马,垃圾软件,后门什么的) pe制作工具下载http://www.wepe.com...

xiaogg
21分钟前
3
0
【0917】Linux shell基础知识2

【0917】Linux shell基础知识2 8.7/8.8 shell变量 8.9 环境变量配置文件 8.10 shell特殊符号cut命令 8.11 sort_wc_uniq命令 8.12 tee_tr_split命令 8.13 shell特殊符号 一、shell变量 1、使用...

飞翔的竹蜻蜓
23分钟前
3
0
管理角色认知-新晋管理常常犯的错

背景 管理是一门实践科学,从知道到做到,需要长时间的刻意练习,提前知道那些坑,可以提前规避。 坑1:被动执行 现象: 不主动找活干,等上级派活; 上级有了安排,指望上级替他决定实现方案...

春天spring
25分钟前
4
0
MongoDB4.0.2集群搭建

MongoDB4.0.2集群搭建 2019.02.01 01:02 619浏览 MongoDB4.0.2集群搭建 根据对象存储平台Django+MongoDB+Ceph的需求,现搭建部署一个十节点的MongoDB集群,主要以下关键点: 根据最新版本Mon...

linjin200
28分钟前
5
0
面试官问你B树和B+树,就把这篇文章丢给他

原文链接:面试官问你B树和B+树,就把这篇文章丢给他 1 B树 在介绍B+树之前, 先简单的介绍一下B树,这两种数据结构既有相似之处,也有他们的区别,最后,我们也会对比一下这两种数据结构的区...

欧阳思海
32分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部