文档章节

聊聊Elasticsearch的TaskScheduler

go4it
 go4it
发布于 05/30 21:26
字数 723
阅读 10
收藏 3

本文主要研究一下Elasticsearch的TaskScheduler

TaskScheduler

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java

public class TaskScheduler {

    private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));

    /**
     * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
     * relative nanotime after the scheduled time, the task will be returned. This method returns a
     * {@link Runnable} that can be run to cancel the scheduled task.
     *
     * @param task to schedule
     * @param relativeNanos defining when to execute the task
     * @return runnable that will cancel the task
     */
    public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {
        DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
        tasks.offer(delayedTask);
        return delayedTask;
    }

    Runnable pollTask(long relativeNanos) {
        DelayedTask task;
        while ((task = tasks.peek()) != null) {
            if (relativeNanos - task.deadline >= 0) {
                tasks.remove();
                if (task.cancelled == false) {
                    return task.runnable;
                }
            } else {
                return null;
            }
        }
        return null;
    }

    long nanosUntilNextTask(long relativeNanos) {
        DelayedTask nextTask = tasks.peek();
        if (nextTask == null) {
            return Long.MAX_VALUE;
        } else {
            return Math.max(nextTask.deadline - relativeNanos, 0);
        }
    }

    private static class DelayedTask implements Runnable {

        private final long deadline;
        private final Runnable runnable;
        private boolean cancelled = false;

        private DelayedTask(long deadline, Runnable runnable) {
            this.deadline = deadline;
            this.runnable = runnable;
        }

        private long getDeadline() {
            return deadline;
        }

        @Override
        public void run() {
            cancelled = true;
        }
    }
}
  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

SSLChannelContext

elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

public final class SSLChannelContext extends SocketChannelContext {
	//......

    @Override
    public void queueWriteOperation(WriteOperation writeOperation) {
        getSelector().assertOnSelectorThread();
        if (writeOperation instanceof CloseNotifyOperation) {
            sslDriver.initiateClose();
            long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
            closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
        } else {
            super.queueWriteOperation(writeOperation);
        }
    }

    private void channelCloseTimeout() {
        closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
        setCloseNow();
        getSelector().queueChannelClose(channel);
    }

	//......
}
  • SSLChannelContext的queueWriteOperation方法会使用taskScheduler的scheduleAtRelativeTime注册一个channelCloseTimeout的延时任务

NioSelector

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

public class NioSelector implements Closeable {

	//......

    public void runLoop() {
        if (runLock.tryLock()) {
            isRunningFuture.complete(null);
            try {
                setThread();
                while (isOpen()) {
                    singleLoop();
                }
            } finally {
                try {
                    cleanupAndCloseChannels();
                } finally {
                    try {
                        selector.close();
                    } catch (IOException e) {
                        eventHandler.selectorException(e);
                    } finally {
                        runLock.unlock();
                        exitedLoop.countDown();
                    }
                }
            }
        } else {
            throw new IllegalStateException("selector is already running");
        }
    }

    void singleLoop() {
        try {
            closePendingChannels();
            preSelect();
            long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
            int ready;
            if (nanosUntilNextTask == 0) {
                ready = selector.selectNow();
            } else {
                long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
                // Only select until the next task needs to be run. Do not select with a value of 0 because
                // that blocks without a timeout.
                ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
            }
            if (ready > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey sk = keyIterator.next();
                    keyIterator.remove();
                    if (sk.isValid()) {
                        try {
                            processKey(sk);
                        } catch (CancelledKeyException cke) {
                            eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  cke);
                        }
                    } else {
                        eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  new CancelledKeyException());
                    }
                }
            }

            handleScheduledTasks(System.nanoTime());
        } catch (ClosedSelectorException e) {
            if (isOpen()) {
                throw e;
            }
        } catch (IOException e) {
            eventHandler.selectorException(e);
        } catch (Exception e) {
            eventHandler.uncaughtException(e);
        }
    }

    private void handleScheduledTasks(long nanoTime) {
        Runnable task;
        while ((task = taskScheduler.pollTask(nanoTime)) != null) {
            try {
                task.run();
            } catch (Exception e) {
                eventHandler.taskException(e);
            }
        }
    }

    //......
}
  • NioSelector的runLoop方法调用了singleLoop方法,后者调用了handleScheduledTasks方法,而handleScheduledTasks方法则是从taskScheduler.pollTask,然后执行task.run()

小结

  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

doc

© 著作权归作者所有

go4it
粉丝 87
博文 1070
码字总数 1015091
作品 0
深圳
私信 提问
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
47
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
22
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
6
0
CentOS7 部署 ElasticSearch 集群

环境 主机名 IP 操作系统 ES 版本 es227 192.168.1.227 CentOS7.5 6.5.4 es228 192.168.1.228 CentOS7.5 6.5.4 es229 192.168.1.229 CentOS7.5 6.5.4 下载 elasticsearch-6.5.4.tar.gz --- 各......

俊赛潘安-才比管乐
2018/12/27
404
0
聊聊Elasticsearch的SizeBlockingQueue

序 本文主要研究一下Elasticsearch的SizeBlockingQueue SizeBlockingQueue elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java Si......

go4it
06/01
14
0

没有更多内容

加载失败,请刷新页面

加载更多

只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
昨天
59
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
昨天
28
0
全世界到底有多少软件开发人员?

埃文斯数据公司(Evans Data Corporation) 2019 最新的统计数据(原文)显示,2018 年全球共有 2300 万软件开发人员,预计到 2019 年底这个数字将达到 2640万,到 2023 年达到 2770万。 而来自...

红薯
昨天
65
0
Go 语言基础—— 通道(channel)

通过通信来共享内存(Java是通过共享内存来通信的) 定义 func service() string {time.Sleep(time.Millisecond * 50)return "Done"}func AsyncService() chan string {retCh := mak......

刘一草
昨天
58
0
Apache Flink 零基础入门(一):基础概念解析

Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速...

Vincent-Duan
昨天
60
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部