文档章节

使用ReentrantLock实现阻塞式的ThreadPoolExecutor

pillsilly
 pillsilly
发布于 2015/04/20 10:37
字数 355
阅读 75
收藏 0
java 自带的ThreadPoolExecutor相关类里貌似没有阻塞式的提交(submit)

有需要的话得自己实现

以下是测试代码

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

public class BlockedExecuterPoolTest {

	static class BlockedThreadPoolExecutor extends ThreadPoolExecutor {
		private int submitCount = 0;
		public synchronized int getSubmitCount() {
			return submitCount;
		}
		public synchronized void setSubmitCount(int submitCount) {
			this.submitCount = submitCount;
		}
		public BlockedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
			super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
		}
		protected void afterExecute(Runnable r, Throwable t) {
			super.afterExecute(r, t);
			try {
				logger.info("unblock the canSubmit condition ");
				lock.lock();
				canSubmit.signal();
			} finally {
				if(submitCount>1){
					setSubmitCount(submitCount-1);
				}
				lock.unlock();
			}
		}

		final ReentrantLock lock = new ReentrantLock();
		Condition canSubmit = lock.newCondition();
		
		@Override
		public void execute(Runnable command) {
			try {
				lock.lockInterruptibly();
				while (getSubmitCount()>getMaximumPoolSize()) {
					logger.info("limit reached (getSubmitCount()["+getSubmitCount()+"]>getMaximumPoolSize()["+getMaximumPoolSize()+"]),thread["+Thread.currentThread().getName()+"] is being blocking");
					canSubmit.await();
				}
				super.execute(command);
				logger.info("blocking over");
			} catch (Exception e) {
				logger.warn(e.getLocalizedMessage());
			} finally {
				lock.unlock();
			}
		}
		
		@Override
		public Future<?> submit(Runnable task) {
			setSubmitCount(submitCount+1);
			return super.submit(task);
		}
	}

	static final Logger logger = Logger.getLogger(BlockedExecuterPoolTest.class);
	static final BlockedThreadPoolExecutor threadPoolExecutor = new BlockedThreadPoolExecutor(25, 25, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(), new RejectedExecutionHandler() {
		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			logger.info("rejectedExecution!!!");
			System.exit(0);
		}
	});

	public static void main(String[] args) throws InterruptedException {
		int jobsAmount = 16166;
		logger.info("total ["+jobsAmount+"]"+"jobs awaits to be submitted");
		for (int i = 0; i < jobsAmount; i++) {
			String tag = "no." + i;
			logger.info("no.[" + tag + "] job is going to be submitted");
			threadPoolExecutor.submit(new TaskTest(tag));
			logger.info("no.[" + tag + "] job has been submitted:");
		}
		logger.info("total ["+jobsAmount+"]"+"jobs have been submitted");
	}

	public static class TaskTest implements Runnable {
		int jobTimeCost = 5000;
		public TaskTest(String string) {
			this.threadName = string;
		}
		String threadName;

		void sayBegin() {
			String s = "thread[" + this.threadName + "] is doing its job and it'll last for " + jobTimeCost + " milsecs";
			logger.info(s);
		}

		void sayEnd() {
			String s = "thread[" + this.threadName + "] has done its job and it last for " + jobTimeCost + " milsecs";
			logger.info(s);
		}

		@Override
		public void run() {
			sayBegin();
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} finally {
				sayEnd();
			}
		}
	}
}



© 著作权归作者所有

pillsilly
粉丝 2
博文 23
码字总数 45873
作品 0
杭州
私信 提问
Java里阻塞线程的三种实现方法

在日常开发中,我们有时会遇到遇到多线程处理任务的情况,JDK里提供了便利的ThreadPoolExecutor以及其包装的工具类Executors。但是我们知道ExecutorService.excute(Runnable r)是异步的,超过...

黄亿华
2013/03/18
15.4K
3
java多线程:jdk并发包的总结(转载)

转载地址:http://blog.csdn.net/yangbutao/article/details/8479520 1、java 高并发包所采用的几个机制(CAS,volatile,抽象队列同步) CAS(乐观操作),jdk5以前采用synchronized,对共享区...

无信不立
2016/08/25
0
0
这里有一份面经,请查收(七)

本文作者:伯乐在线 -朱小厮 。未经作者许可,禁止转载! 欢迎加入伯乐在线专栏作者。 本篇所要介绍的是一家互联网企业,简称MD好了。一面是电面,二三面是face2face的技术面,4面是HR面。 ...

伯乐在线
2016/09/04
0
0
Java集合--阻塞队列(ArrayBlockingQueue)

1 ArrayBlockingQueue ArrayBlockingQueue是一个阻塞队列,底层使用数组结构实现,按照先进先出(FIFO)的原则对元素进行排序。 ArrayBlockingQueue是一个线程安全的集合,通过ReentrantLock...

贾博岩
2017/12/07
0
0
Java中,关于多线程操作,你只要看这一篇就够了

引 如果对什么是线程、什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内。 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来...

ToEnd
2017/12/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

zk中选举Leader时的网络IO QuorumCnxManager解析

每台服务启动过程中,会启动一个QuorumCnxManager,负责各台服务器之间底层Leader选举过程中的网络通信 当集群中有服务器服务中断时,zk会重新选举leader 内部类 Message定义消息结构 包含了...

writeademo
11分钟前
2
0
使用mdBook 替代 gitbook。

###** 为什么要替代gitbook** gitbook 有个模板问题:如果md文件中有连续的大括号(比如:&{{父亲 40}}),gitbook会把{{ 父亲 40 }}中的父亲 40当做一个模板变量。如果这个变量不存在,会报...

王坤charlie
14分钟前
2
0
TL-A7HSAD采集卡硬件的处理器、NOR FLASH、DDR3

TL-A7HSAD是一款由广州创龙基于Xilinx Artix-7系列FPGA自主研发的高速数据采集卡,可配套广州创龙TMS320C6655、TMS320C6657、TMS320C6678开发板使用。该采集卡包含1个双通道250MSPS*12Bit的高...

Tronlong创龙
26分钟前
2
0
项目启动报fastjson版本可能过低

进行项目启动的过程中,之前都正常,这次启动突然就失败了: 查看日志说的是版本过低,后来查看官方网站版本,替换了最新版本: 选择了最新版本的1.2.60,1.2.62尝试后都不行,后来查看网上搜...

aiChuang
26分钟前
2
0
McDonald’s is using Alexa and Google to accepting job applications

McDonald’s today announced a new initiative the fast food chain is calling the “Apply Thru,” in which owners of Amazon Alexa or Google Assistant devices can begin job applic......

wowloop
30分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部