文档章节

【多线程】ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究

远方__
 远方__
发布于 2016/12/23 12:34
字数 1159
阅读 2
收藏 0

ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,还可以延迟执行任务或者周期性的执 行某个任务。scheduleWithFixedDelay和scheduleAtFixedRate就是用来完成这个功能的。平常使用 scheduleAtFixedRate这个方法时并没有多想,但是这几天在实现一个功能的时候,需要考虑scheduleAtFixedRate所执行 的task是否会影响任务的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么 这个command的执行会不会影响这个10秒的周期性。因此特意仔细看了下ScheduledThreadPoolExecutor的源代码,这里记录 一下,以便以后查看。

    scheduleAtFixedRate有两个时间参数,initialDelay和period,对应该方法的两个主要功能,即延迟运行任务和周期性执行任务。

 

  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
  2.                                               long initialDelay,  
  3.                                               long period,  
  4.                                               TimeUnit unit) {  
  5.     if (command == null || unit == null)  
  6.         throw new NullPointerException();  
  7.     if (period <= 0)  
  8.         throw new IllegalArgumentException();  
  9.     RunnableScheduledFuture<?> t = decorateTask(command,  
  10.         new ScheduledFutureTask<Object>(command,  
  11.                                         null,  
  12.                                         triggerTime(initialDelay, unit),  
  13.                                         unit.toNanos(period)));  
  14.     delayedExecute(t);  
  15.     return t;  
  16. }  
  17.   
  18. /** 
  19.  * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. 
  20.  */  
  21. private void delayedExecute(Runnable command) {  
  22.     if (isShutdown()) {  
  23.         reject(command);  
  24.         return;  
  25.     }  
  26.     // Prestart a thread if necessary. We cannot prestart it  
  27.     // running the task because the task (probably) shouldn't be  
  28.     // run yet, so thread will just idle until delay elapses.  
  29.     if (getPoolSize() < getCorePoolSize())  
  30.         prestartCoreThread();  
  31.   
  32.     super.getQueue().add(command);  
  33. }  

    首先创建一个ScheduledFutureTask,然后通过delayedExecute执行这个task。在delayedExecute中,首先 预先启动一个线程,这里要注意的是这个这里用来启动一个新线程的firstTask参数是null,所以新启动的线程是idle状态的,然后把这个 task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是 DelayedWorkQueue,这个DelayedWorkQueue就是实现delay的关键。DelayedWorkQueue内部使用的是 DelayQueue,DelayQueue实现task delay的关键就在于其Offer(E e)和Take.下面,通过分析这两个方法和结合ThreadPoolExecutor的运行原理来说明delay操作是如何实现的

 

  1. public boolean offer(E e) {  
  2.     final ReentrantLock lock = this.lock;  
  3.     lock.lock();  
  4.     try {  
  5.         E first = q.peek();  
  6.         q.offer(e);  
  7.         if (first == null || e.compareTo(first) < 0)  
  8.             available.signalAll();  
  9.         return true;  
  10.     } finally {  
  11.         lock.unlock();  
  12.     }  
  13. }  
  14.   
  15. public E take() throws InterruptedException {  
  16.     final ReentrantLock lock = this.lock;  
  17.     lock.lockInterruptibly();  
  18.     try {  
  19.         for (;;) {  
  20.             E first = q.peek();  
  21.             if (first == null) {  
  22.                 available.await();  
  23.             } else {  
  24.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
  25.                 if (delay > 0) {  
  26.                     long tl = available.awaitNanos(delay);  
  27.                 } else {  
  28.                     E x = q.poll();  
  29.                     assert x != null;  
  30.                     if (q.size() != 0)  
  31.                         available.signalAll(); // wake up other takers  
  32.                     return x;  
  33.   
  34.                 }  
  35.             }  
  36.         }  
  37.     } finally {  
  38.         lock.unlock();  
  39.     }  
  40. }  

      ScheduledThreadPoolExecutor执行task是通过工作线程Work来承担的,Work的Run方法如下:

 

  1. public void run() {  
  2.     try {  
  3.         Runnable task = firstTask;  
  4.         firstTask = null;  
  5.         while (task != null || (task = getTask()) != null) {  
  6.             runTask(task);  
  7.             task = null;  
  8.         }  
  9.     } finally {  
  10.         workerDone(this);  
  11.     }  
  12. }  

     因为前面在delayedExecute方法里面创建work线程的firstTask参数为null,所以就通过getTask去从workQueue 里面获取task,getTask在正常情况下(即线程池没有关闭,线程数量没有超过corePoolSize等)是通过 workQueue.take()从workQueue里获取任务。根据上面的贴出来的take方法的代码,如果queue是空的,则take方法会阻塞 住,直到有新task被add进来。而在上面的delayedExecute方法的最后,会把创建的scheduledFutureTask加入到 workQueue,这样take方法中的available.await()就被唤醒;在take方法里面,如果workQueue不为空,则执行 task.getDelay()方法获取task的delay

  1. public long getDelay(TimeUnit unit) {  
  2.     return unit.convert(time - now(), TimeUnit.NANOSECONDS);  
  3. }  

   这里的time是通过两个方法把initialDelay变成一个triggerTime

  1. /** 
  2.  * Returns the trigger time of a delayed action. 
  3.  */  
  4. private long triggerTime(long delay, TimeUnit unit) {  
  5.      return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  
  6. }  
  7.   
  8. /** 
  9.  * Returns the trigger time of a delayed action. 
  10.  */  
  11. long triggerTime(long delay) {  
  12.      return now() +  
  13.          ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  
  14. }  

注意看这个方法,这里返回的delay不是固定不变的,从task被放入workQueue起,不同的时间调用getDelay方法会得出不同的 delay。如果放入workQueue的task的initialDelay是5秒,那么根据take方法的代码,如果在放入workQueue5秒 后,就可以从delayQueue中拿到5秒前put进去的task,这样就实现了delay的功能。

 

   在本文的最前面提到scheduleAtFixedRate能够周期性地执行一项任务,那么这个是如何实现的呢?在 scheduleAtFixedRate方法里创建了一个ScheduledFutureTask,这个ScheduledFutureTask包装了 command,最后周期性执行的是ScheduledFutureTask的run方法。

  1. private void runPeriodic() {  
  2.     boolean ok = ScheduledFutureTask.super.runAndReset();  
  3.     boolean down = isShutdown();  
  4.     // Reschedule if not cancelled and not shutdown or policy allows  
  5.     if (ok && (!down ||  
  6.                (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&  
  7.                 !isStopped()))) {  
  8.         long p = period;  
  9.         if (p > 0)  
  10.             time += p;  
  11.         else  
  12.             time = triggerTime(-p);  
  13.         ScheduledThreadPoolExecutor.super.getQueue().add(this);  
  14.     }  
  15.     // This might have been the final executed delayed  
  16.     // task.  Wake up threads to check.  
  17.     else if (down)  
  18.         interruptIdleWorkers();  
  19. }  
  20.   
  21. /** 
  22.  * Overrides FutureTask version so as to reset/requeue if periodic. 
  23.  */  
  24. public void run() {  
  25.     if (isPeriodic())  
  26.         runPeriodic();  
  27.     else  
  28.         ScheduledFutureTask.super.run();  
  29. }  

     由上面的代码可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)这个方法的周期性会受 command的影响,如果command方法的执行时间是10秒,那么执行command的周期其实是20秒,即 scheduleAtFixedRate这个方法要等一个完整的command方法执行完成后才继续周期性地执行command方法,其实这样的设计也是 符合常理的。

 

     以上就是对ScheduledThreadPoolExecutor的一点小理解。

本文转载自:http://blog.csdn.net/sinat_27615265/article/details/49387185

远方__
粉丝 0
博文 82
码字总数 0
作品 0
丰台
程序员
私信 提问
java定时工具的辟谣

网络上关于java定时器的文章真的是错误百出,给我的学习造成了很大的困扰,Timer根本就没有线程安全问题,Timer的所有调度方法都和上次任务的结束时间没有关系,TImer和ScheduledThreadPoolExec...

肥肥小浣熊
2017/11/20
0
0
Timer和ScheduledThreadPoolExecutor的区别

Timer的主要方法有: // 安排在指定的时间执行 void schedule(TimerTask task, Date time) // 安排在指定的时间开始以重复的延时执行 void schedule(TimerTask task, Date firstTime, long p...

zlfwmm
2016/06/01
0
0
(java并发)ScheduledThreadPoolExcutor

1.用timer缺点非常大 Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之...

vshcxl
2016/11/28
106
0
Java多线程学习(八)线程池与Executor 框架

Java面试通关手册(Java学习指南,欢迎Star,会一直完善下去,欢迎建议和指导):https://github.com/Snailclimb/JavaGuide 历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理...

snailclimb
2018/05/31
0
0
Java中定时任务的实现:Timer与ScheduledExecutorService的不同

前言 在做后台任务的时候经常需要实现各种各种的定时的,周期性的任务。比如每隔一段时间更新一下缓存之类的。通常周期性的任务都可以使用如下方式实现: class MyTimerThread extends Thre...

wf78728381
2017/10/18
22
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部