文档章节

聊聊Elasticsearch RestClient的DeadHostState

go4it
 go4it
发布于 05/17 22:03
字数 1360
阅读 10
收藏 0

本文主要研究一下Elasticsearch RestClient的DeadHostState

DeadHostState

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/DeadHostState.java

/**
 * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and
 * when the host should be retried (based on number of previous failed attempts).
 * Class is immutable, a new copy of it should be created each time the state has to be changed.
 */
final class DeadHostState implements Comparable<DeadHostState> {

    private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1);
    static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30);

    private final int failedAttempts;
    private final long deadUntilNanos;
    private final TimeSupplier timeSupplier;

    /**
     * Build the initial dead state of a host. Useful when a working host stops functioning
     * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so.
     *
     * @param timeSupplier a way to supply the current time and allow for unit testing
     */
    DeadHostState(TimeSupplier timeSupplier) {
        this.failedAttempts = 1;
        this.deadUntilNanos = timeSupplier.nanoTime() + MIN_CONNECTION_TIMEOUT_NANOS;
        this.timeSupplier = timeSupplier;
    }

    /**
     * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence
     * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait
     * to retry that same host again. Minimum is 1 minute (for a node the only failed once created
     * through {@link #DeadHostState(TimeSupplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times)
     *
     * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt
     */
    DeadHostState(DeadHostState previousDeadHostState) {
        long timeoutNanos = (long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1),
                MAX_CONNECTION_TIMEOUT_NANOS);
        this.deadUntilNanos = previousDeadHostState.timeSupplier.nanoTime() + timeoutNanos;
        this.failedAttempts = previousDeadHostState.failedAttempts + 1;
        this.timeSupplier = previousDeadHostState.timeSupplier;
    }

    /**
     * Indicates whether it's time to retry to failed host or not.
     *
     * @return true if the host should be retried, false otherwise
     */
    boolean shallBeRetried() {
        return timeSupplier.nanoTime() - deadUntilNanos > 0;
    }

    /**
     * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried.
     * After that the host should be retried.
     */
    long getDeadUntilNanos() {
        return deadUntilNanos;
    }

    int getFailedAttempts() {
        return failedAttempts;
    }

    @Override
    public int compareTo(DeadHostState other) {
        if (timeSupplier != other.timeSupplier) {
            throw new IllegalArgumentException("can't compare DeadHostStates with different clocks ["
                    + timeSupplier + " != " + other.timeSupplier + "]");
        }
        return Long.compare(deadUntilNanos, other.deadUntilNanos);
    }

    @Override
    public String toString() {
        return "DeadHostState{" +
                "failedAttempts=" + failedAttempts +
                ", deadUntilNanos=" + deadUntilNanos +
                ", timeSupplier=" + timeSupplier +
                '}';
    }

    /**
     * Time supplier that makes timing aspects pluggable to ease testing
     */
    interface TimeSupplier {
        TimeSupplier DEFAULT = new TimeSupplier() {
            @Override
            public long nanoTime() {
                return System.nanoTime();
            }

            @Override
            public String toString() {
                return "nanoTime";
            }
        };

        long nanoTime();
    }
}
  • DeadHostState有failedAttempts、deadUntilNanos、timeSupplier三个属性,它提供了两类构造器,一类是根据timeSupplier来构造,一类是根据previousDeadHostState来构造
  • 根据previousDeadHostState来构造时会重新计算deadUntilNanos,递增failedAttempts;其中deadUntilNanos为timeSupplier.nanoTime() + timeoutNanos,而timeoutNanos的计算公式为(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
  • DeadHostState提供了shallBeRetried方法,其判断逻辑为timeSupplier.nanoTime() - deadUntilNanos > 0;DeadHostState实现了Comparable的compareTo方法,它是根据deadUntilNanos来进行比较

RestClient

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

public class RestClient implements Closeable {
	//......

    /**
     * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
     * if the previous attempt failed and so on. Package private for testing.
     */
    static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
                                      AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
        /*
         * Sort the nodes into living and dead lists.
         */
        List<Node> livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - blacklist.size()));
        List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
        for (Node node : nodeTuple.nodes) {
            DeadHostState deadness = blacklist.get(node.getHost());
            if (deadness == null) {
                livingNodes.add(node);
                continue;
            }
            if (deadness.shallBeRetried()) {
                livingNodes.add(node);
                continue;
            }
            deadNodes.add(new DeadNode(node, deadness));
        }

        if (false == livingNodes.isEmpty()) {
            /*
             * Normal state: there is at least one living node. If the
             * selector is ok with any over the living nodes then use them
             * for the request.
             */
            List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
            nodeSelector.select(selectedLivingNodes);
            if (false == selectedLivingNodes.isEmpty()) {
                /*
                 * Rotate the list using a global counter as the distance so subsequent
                 * requests will try the nodes in a different order.
                 */
                Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
                return selectedLivingNodes;
            }
        }

        /*
         * Last resort: there are no good nodes to use, either because
         * the selector rejected all the living nodes or because there aren't
         * any living ones. Either way, we want to revive a single dead node
         * that the NodeSelectors are OK with. We do this by passing the dead
         * nodes through the NodeSelector so it can have its say in which nodes
         * are ok. If the selector is ok with any of the nodes then we will take
         * the one in the list that has the lowest revival time and try it.
         */
        if (false == deadNodes.isEmpty()) {
            final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
            /*
             * We'd like NodeSelectors to remove items directly from deadNodes
             * so we can find the minimum after it is filtered without having
             * to compare many things. This saves us a sort on the unfiltered
             * list.
             */
            nodeSelector.select(new Iterable<Node>() {
                @Override
                public Iterator<Node> iterator() {
                    return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
                }
            });
            if (false == selectedDeadNodes.isEmpty()) {
                return singletonList(Collections.min(selectedDeadNodes).node);
            }
        }
        throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
                + "living " + livingNodes + " and dead " + deadNodes);
    }

    /**
     * Called after each successful request call.
     * Receives as an argument the host that was used for the successful request.
     */
    private void onResponse(Node node) {
        DeadHostState removedHost = this.blacklist.remove(node.getHost());
        if (logger.isDebugEnabled() && removedHost != null) {
            logger.debug("removed [" + node + "] from blacklist");
        }
    }

    /**
     * Called after each failed attempt.
     * Receives as an argument the host that was used for the failed attempt.
     */
    private void onFailure(Node node) {
        while(true) {
            DeadHostState previousDeadHostState =
                blacklist.putIfAbsent(node.getHost(), new DeadHostState(TimeSupplier.DEFAULT));
            if (previousDeadHostState == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("added [" + node + "] to blacklist");
                }
                break;
            }
            if (blacklist.replace(node.getHost(), previousDeadHostState,
                    new DeadHostState(previousDeadHostState))) {
                if (logger.isDebugEnabled()) {
                    logger.debug("updated [" + node + "] already in blacklist");
                }
                break;
            }
        }
        failureListener.onFailure(node);
    }

    //......
}
  • RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node
  • RestClient的onResponse方法会将该node的host从blacklist中移除
  • RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中

小结

  • DeadHostState有failedAttempts、deadUntilNanos、timeSupplier三个属性,它提供了两类构造器,一类是根据timeSupplier来构造,一类是根据previousDeadHostState来构造;DeadHostState提供了shallBeRetried方法,其判断逻辑为timeSupplier.nanoTime() - deadUntilNanos > 0
  • 根据previousDeadHostState来构造DeadHostState时会重新计算deadUntilNanos,递增failedAttempts;其中deadUntilNanos为timeSupplier.nanoTime() + timeoutNanos,而timeoutNanos的计算公式为(long)Math.min(MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1)
  • RestClient的selectNodes方法首先将不在blacklist中的node,或者在blacklist中但是shallBeRetried返回为true的node归为livingNodes,之后通过nodeSelector来从这些livingNodes中选择一个node;RestClient的onResponse方法会将该node的host从blacklist中移除;RestClient的onFailure方法会往blacklist创建或更新host对应的DeadHostState,如果之前该host没有DeadHostState则使用TimeSupplier.DEFAULT创建一个新的并放入blacklist,如果该host已经有DeadHostState则使用该DeadHostState创建新的DeadHostState然后更新到blacklist中

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1096
码字总数 1034311
作品 0
深圳
私信 提问
聊聊Elasticsearch的NodesSniffer

序 本文主要研究一下Elasticsearch的NodesSniffer NodesSniffer elasticsearch-7.0.1/client/sniffer/src/main/java/org/elasticsearch/client/sniff/NodesSniffer.java NodesSniffer接口定义......

go4it
05/18
10
0
Java REST Client 访问阿里云5.5 Elasticsearch 实例实现

开发环境:InteliJ IDEA 操作系统 :macOS Mojave Elasticsearch 版本:阿里云 5.5.3withX-Pack 客户端版本:REST Client 5.5.3 1. 预先创建好阿里云 ES 实例,开启公网地址访问白名单。 2....

garygao305
01/13
0
0
聊聊Elasticsearch RestClient的RequestLogger

序 本文主要研究一下Elasticsearch RestClient的RequestLogger RequestLogger elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RequestLogger.java RequestLogger......

go4it
05/16
18
0
Elasticsearch Java Rest Client 上手指南(上)

开始看Elasticsearch Java API 的时候,被这段话浇了盆凉水 We plan on deprecating the in Elasticsearch 7.0 and removing it completely in 8.0. Instead, you should be using the Java......

MaxZing
2018/07/17
0
0
Elasitcsearch High Level Rest Client学习笔记(一)

文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.1/java-rest-high.html java doc地址: https://artifacts.elastic.co/javadoc/org/elasticsearch/client/el......

木子SMZ
2018/07/12
2.5K
0

没有更多内容

加载失败,请刷新页面

加载更多

Go 关闭 channel 的 close 方法

在 Go 中我们所以 close() 来关闭一个 channel 官方的注释如下 The close built-in function closes a channel, which must be either bidirectional or send-only. It should be executed o......

mickelfeng
22分钟前
3
0
语音转文字什么方法比较简单

在很多时候一些比较重要的对话需要录制下来,在录制完成后还需要整理出文字,可是长时间的录音内容想要整理出文字是非常的麻烦的。需要花费大量的时间将录制的声音转换成文字,那么想要简单快...

401恶户
26分钟前
5
0
IIS7配置thinkphp5项目到public目录下

有个项目,tp5写的,要配置到项目的public目录下,一开始报错了...后面删除了配置,重新配置成功了,记录一下过程 1.首先,将网站根目录变为你的public目录下 2.添加解析程序的CGI,这里选择你需要解...

老bia同学
30分钟前
10
0
Redis主从复制的配置和实现原理

Redis的持久化功能在一定程度上保证了数据的安全性,即便是服务器宕机的情况下,也可以保证数据的丢失非常少。通常,为了避免服务的单点故障,会把数据复制到多个副本放在不同的服务器上,且...

TurboSanil
32分钟前
8
0
counsul 集群

1 master节点 cat << EOF > /lib/systemd/system/consul.service[Unit]Description=consul-masterAfter=network-online.target [Service]ExecStart=/bin/sh -c 'consul agent ......

拜了个拜
32分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部