文档章节

聊聊Elasticsearch的TimedRunnable

go4it
 go4it
发布于 06/11 21:56
字数 919
阅读 23
收藏 0

本文主要研究一下Elasticsearch的TimedRunnable

TimedRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
    private final Runnable original;
    private final long creationTimeNanos;
    private long startTimeNanos;
    private long finishTimeNanos = -1;

    TimedRunnable(final Runnable original) {
        this.original = original;
        this.creationTimeNanos = System.nanoTime();
    }

    @Override
    public void doRun() {
        try {
            startTimeNanos = System.nanoTime();
            original.run();
        } finally {
            finishTimeNanos = System.nanoTime();
        }
    }

    @Override
    public void onRejection(final Exception e) {
        if (original instanceof AbstractRunnable) {
            ((AbstractRunnable) original).onRejection(e);
        }
    }

    @Override
    public void onAfter() {
        if (original instanceof AbstractRunnable) {
            ((AbstractRunnable) original).onAfter();
        }
    }

    @Override
    public void onFailure(final Exception e) {
        if (original instanceof AbstractRunnable) {
            ((AbstractRunnable) original).onFailure(e);
        }
    }

    @Override
    public boolean isForceExecution() {
        return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();
    }

    /**
     * Return the time since this task was created until it finished running.
     * If the task is still running or has not yet been run, returns -1.
     */
    long getTotalNanos() {
        if (finishTimeNanos == -1) {
            // There must have been an exception thrown, the total time is unknown (-1)
            return -1;
        }
        return Math.max(finishTimeNanos - creationTimeNanos, 1);
    }

    /**
     * Return the time this task spent being run.
     * If the task is still running or has not yet been run, returns -1.
     */
    long getTotalExecutionNanos() {
        if (startTimeNanos == -1 || finishTimeNanos == -1) {
            // There must have been an exception thrown, the total time is unknown (-1)
            return -1;
        }
        return Math.max(finishTimeNanos - startTimeNanos, 1);
    }

    @Override
    public Runnable unwrap() {
        return original;
    }

}
  • TimedRunnable继承了AbstractRunnable,同时实现了WrappedRunnable接口;它在doRun方法里头记录了原始Runnable的startTimeNanos及finishTimeNanos;同时提供了getTotalExecutionNanos来返回该task的执行耗时

实例

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java

public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
	//......

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        // A task has been completed, it has left the building. We should now be able to get the
        // total time as a combination of the time in the queue and time spent running the task. We
        // only want runnables that did not throw errors though, because they could be fast-failures
        // that throw off our timings, so only check when t is null.
        assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
        final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
        final long taskNanos = timedRunnable.getTotalNanos();
        final long totalNanos = totalTaskNanos.addAndGet(taskNanos);

        final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
        assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
        executionEWMA.addValue(taskExecutionNanos);

        if (taskCount.incrementAndGet() == this.tasksPerFrame) {
            final long endTimeNs = System.nanoTime();
            final long totalRuntime = endTimeNs - this.startNs;
            // Reset the start time for all tasks. At first glance this appears to need to be
            // volatile, since we are reading from a different thread when it is set, but it
            // is protected by the taskCount memory barrier.
            // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
            startNs = endTimeNs;

            // Calculate the new desired queue size
            try {
                final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));
                final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
                final int oldCapacity = workQueue.capacity();

                if (logger.isDebugEnabled()) {
                    final long avgTaskTime = totalNanos / tasksPerFrame;
                    logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
                                    "[{} tasks/s], optimal queue is [{}], current capacity [{}]",
                            getName(),
                            tasksPerFrame,
                            TimeValue.timeValueNanos(totalRuntime),
                            TimeValue.timeValueNanos(avgTaskTime),
                            TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
                            String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
                            desiredQueueSize,
                            oldCapacity);
                }

                // Adjust the queue size towards the desired capacity using an adjust of
                // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
                // values the queue size can have.
                final int newCapacity =
                        workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
                if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
                    logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),
                            newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
                            oldCapacity, newCapacity);
                }
            } catch (ArithmeticException e) {
                // There was an integer overflow, so just log about it, rather than adjust the queue size
                logger.warn(() -> new ParameterizedMessage(
                                "failed to calculate optimal queue size for [{}] thread pool, " +
                                "total frame time [{}ns], tasks [{}], task execution time [{}ns]",
                                getName(), totalRuntime, tasksPerFrame, totalNanos),
                        e);
            } finally {
                // Finally, decrement the task count and time back to their starting values. We
                // do this at the end so there is no concurrent adjustments happening. We also
                // decrement them instead of resetting them back to zero, as resetting them back
                // to zero causes operations that came in during the adjustment to be uncounted
                int tasks = taskCount.addAndGet(-this.tasksPerFrame);
                assert tasks >= 0 : "tasks should never be negative, got: " + tasks;

                if (tasks >= this.tasksPerFrame) {
                    // Start over, because we can potentially reach a "never adjusting" state,
                    //
                    // consider the following:
                    // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
                    // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
                    // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
                    // - Since taskCount will now be incremented forever, it will never be 10 again,
                    //   so there will be no further adjustments
                    logger.debug(
                            "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());
                    totalTaskNanos.getAndSet(1);
                    taskCount.getAndSet(0);
                    startNs = System.nanoTime();
                } else {
                    // Do a regular adjustment
                    totalTaskNanos.addAndGet(-totalNanos);
                }
            }
        }
    }

	//......
}
  • QueueResizingEsThreadPoolExecutor的afterExecute会使用timedRunnable.getTotalExecutionNanos()的来进行EWMA统计

小结

TimedRunnable继承了AbstractRunnable,同时实现了WrappedRunnable接口;它在doRun方法里头记录了原始Runnable的startTimeNanos及finishTimeNanos;同时提供了getTotalExecutionNanos来返回该task的执行耗时

doc

© 著作权归作者所有

go4it
粉丝 87
博文 1073
码字总数 1017707
作品 0
深圳
私信 提问
聊聊Elasticsearch的SizeBlockingQueue

序 本文主要研究一下Elasticsearch的SizeBlockingQueue SizeBlockingQueue elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java Si......

go4it
06/01
14
0
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
51
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
22
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
6
0
CentOS7 部署 ElasticSearch 集群

环境 主机名 IP 操作系统 ES 版本 es227 192.168.1.227 CentOS7.5 6.5.4 es228 192.168.1.228 CentOS7.5 6.5.4 es229 192.168.1.229 CentOS7.5 6.5.4 下载 elasticsearch-6.5.4.tar.gz --- 各......

俊赛潘安-才比管乐
2018/12/27
413
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
170
4
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
5
0
Spring Boot + Mybatis-Plus 集成与使用(二)

前言: 本章节介绍MyBatis-Puls的CRUD使用。在开始之前,先简单讲解下上章节关于Spring Boot是如何自动配置MyBatis-Plus。 一、自动配置 当Spring Boot应用从主方法main()启动后,首先加载S...

伴学编程
昨天
8
0
用最通俗的方法讲spring [一] ──── AOP

@[TOC](用最通俗的方法讲spring [一] ──── AOP) 写这个系列的目的(可以跳过不看) 自己写这个系列的目的,是因为自己是个比较笨的人,我曾一度怀疑自己的智商不适合干编程这个行业.因为在我...

小贼贼子
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部