文档章节

聊聊elasticsearch的ElectMasterService

go4it
 go4it
发布于 05/12 21:48
字数 1188
阅读 15
收藏 0

本文主要研究一下elasticsearch的ElectMasterService

ElectMasterService

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ElectMasterService.java

public class ElectMasterService {

    private static final Logger logger = LogManager.getLogger(ElectMasterService.class);

    public static final Setting<Integer> DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING =
        Setting.intSetting("discovery.zen.minimum_master_nodes", -1, Property.Dynamic, Property.NodeScope, Property.Deprecated);

    private volatile int minimumMasterNodes;

    /**
     * a class to encapsulate all the information about a candidate in a master election
     * that is needed to decided which of the candidates should win
     */
    public static class MasterCandidate {

        public static final long UNRECOVERED_CLUSTER_VERSION = -1;

        final DiscoveryNode node;

        final long clusterStateVersion;

        public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
            Objects.requireNonNull(node);
            assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
            assert node.isMasterNode();
            this.node = node;
            this.clusterStateVersion = clusterStateVersion;
        }

        public DiscoveryNode getNode() {
            return node;
        }

        public long getClusterStateVersion() {
            return clusterStateVersion;
        }

        @Override
        public String toString() {
            return "Candidate{" +
                "node=" + node +
                ", clusterStateVersion=" + clusterStateVersion +
                '}';
        }

        /**
         * compares two candidates to indicate which the a better master.
         * A higher cluster state version is better
         *
         * @return -1 if c1 is a batter candidate, 1 if c2.
         */
        public static int compare(MasterCandidate c1, MasterCandidate c2) {
            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
            // list, so if c2 has a higher cluster state version, it needs to come first.
            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
            if (ret == 0) {
                ret = compareNodes(c1.getNode(), c2.getNode());
            }
            return ret;
        }
    }

    public ElectMasterService(Settings settings) {
        this.minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
        logger.debug("using minimum_master_nodes [{}]", minimumMasterNodes);
    }

    public void minimumMasterNodes(int minimumMasterNodes) {
        this.minimumMasterNodes = minimumMasterNodes;
    }

    public int minimumMasterNodes() {
        return minimumMasterNodes;
    }

    public int countMasterNodes(Iterable<DiscoveryNode> nodes) {
        int count = 0;
        for (DiscoveryNode node : nodes) {
            if (node.isMasterNode()) {
                count++;
            }
        }
        return count;
    }

    public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
        if (candidates.isEmpty()) {
            return false;
        }
        if (minimumMasterNodes < 1) {
            return true;
        }
        assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
            "duplicates ahead: " + candidates;
        return candidates.size() >= minimumMasterNodes;
    }

    /**
     * Elects a new master out of the possible nodes, returning it. Returns {@code null}
     * if no master has been elected.
     */
    public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }

    /** selects the best active master to join, where multiple are discovered */
    public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
        return activeMasters.stream().min(ElectMasterService::compareNodes).get();
    }

    public boolean hasEnoughMasterNodes(Iterable<DiscoveryNode> nodes) {
        final int count = countMasterNodes(nodes);
        return count > 0 && (minimumMasterNodes < 0 || count >= minimumMasterNodes);
    }

    public boolean hasTooManyMasterNodes(Iterable<DiscoveryNode> nodes) {
        final int count = countMasterNodes(nodes);
        return count > 1 && minimumMasterNodes <= count / 2;
    }

    public void logMinimumMasterNodesWarningIfNecessary(ClusterState oldState, ClusterState newState) {
        // check if min_master_nodes setting is too low and log warning
        if (hasTooManyMasterNodes(oldState.nodes()) == false && hasTooManyMasterNodes(newState.nodes())) {
            logger.warn("value for setting \"{}\" is too low. This can result in data loss! Please set it to at least a quorum of master-" +
                    "eligible nodes (current value: [{}], total number of master-eligible nodes used for publishing in this round: [{}])",
                ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), minimumMasterNodes(),
                newState.getNodes().getMasterNodes().size());
        }
    }

    /**
     * Returns the given nodes sorted by likelihood of being elected as master, most likely first.
     * Non-master nodes are not removed but are rather put in the end
     */
    static List<DiscoveryNode> sortByMasterLikelihood(Iterable<DiscoveryNode> nodes) {
        ArrayList<DiscoveryNode> sortedNodes = CollectionUtils.iterableAsArrayList(nodes);
        CollectionUtil.introSort(sortedNodes, ElectMasterService::compareNodes);
        return sortedNodes;
    }

    /**
     * Returns a list of the next possible masters.
     */
    public DiscoveryNode[] nextPossibleMasters(ObjectContainer<DiscoveryNode> nodes, int numberOfPossibleMasters) {
        List<DiscoveryNode> sortedNodes = sortedMasterNodes(Arrays.asList(nodes.toArray(DiscoveryNode.class)));
        if (sortedNodes == null) {
            return new DiscoveryNode[0];
        }
        List<DiscoveryNode> nextPossibleMasters = new ArrayList<>(numberOfPossibleMasters);
        int counter = 0;
        for (DiscoveryNode nextPossibleMaster : sortedNodes) {
            if (++counter >= numberOfPossibleMasters) {
                break;
            }
            nextPossibleMasters.add(nextPossibleMaster);
        }
        return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);
    }

    private List<DiscoveryNode> sortedMasterNodes(Iterable<DiscoveryNode> nodes) {
        List<DiscoveryNode> possibleNodes = CollectionUtils.iterableAsArrayList(nodes);
        if (possibleNodes.isEmpty()) {
            return null;
        }
        // clean non master nodes
        possibleNodes.removeIf(node -> !node.isMasterNode());
        CollectionUtil.introSort(possibleNodes, ElectMasterService::compareNodes);
        return possibleNodes;
    }

    /** master nodes go before other nodes, with a secondary sort by id **/
     private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
        if (o1.isMasterNode() && !o2.isMasterNode()) {
            return -1;
        }
        if (!o1.isMasterNode() && o2.isMasterNode()) {
            return 1;
        }
        return o1.getId().compareTo(o2.getId());
    }
}
  • ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes
  • ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
  • electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回

ZenDiscovery.findMaster

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
	//......

    private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }

        final DiscoveryNode localNode = transportService.getLocalNode();

        // add our selves
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

        // filter responses
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

        List<DiscoveryNode> activeMasters = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        if (activeMasters.isEmpty()) {
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            assert !activeMasters.contains(localNode) :
                "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

	//......
}
  • ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回

小结

  • ElectMasterService的构造器读取discovery.zen.minimum_master_nodes配置到变量minimumMasterNodes;ElectMasterService定义了静态类MasterCandidate,它提供了compare静态方法用于比较两个MasterCandidate,它首先对比clusterStateVersion,如果该值相同再进行compareNodes,compareNodes会先判断下是否masterNode,都不是则对比他们各自的node的id
  • electMaster方法首先通过hasEnoughCandidates来确定是否有足够的candidates,足够的话则对他们进行排序,最后取第一个作为master返回
  • ZenDiscovery的findMaster方法在activeMasters.isEmpty()时会通过electMaster.electMaster(masterCandidates)来选取winner作为master返回

doc

© 著作权归作者所有

go4it
粉丝 90
博文 1129
码字总数 1061028
作品 0
深圳
私信 提问
聊聊elasticsearch的MembershipAction

序 本文主要研究一下elasticsearch的MembershipAction MembershipAction elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java MembershipActi......

go4it
04/25
9
0
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
76
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
23
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
10
0
聊聊Elasticsearch的SizeBlockingQueue

序 本文主要研究一下Elasticsearch的SizeBlockingQueue SizeBlockingQueue elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java Si......

go4it
06/01
19
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部