java并发编程(七): 线程池的使用

原创
2014/04/01 12:50
阅读数 1.2K

线程池的使用:

  • 线程池分配调优,使用注意事项

在任务与执行策略之间的隐性耦合:

  • 有些任务需要明确指定执行策略

       1. 依赖性任务。提交的任务需要依赖其他任务,此时需要小心维护这些执行策略以避免产生活跃性问题

       2. 使用线程封闭机制的任务。任务要求其执行所在的Executor是线程安全的

       3. 对响应时间敏感的任务。

       4. 使用ThreadLocal的任务。ThreadLocal使每个线程都可以拥有某个变量的一个私有"版本"

  • 只有当任务都是同类型的且相互独立时,线程池的性能才能达到最佳

线程饥饿死锁:

  • 在线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁
/**
 * 在单线程Executor中任务发生死锁 
 */
public class ThreadDeadLock {
	ExecutorService exec = Executors.newSingleThreadExecutor();
	
	public class RenderPageTask implements Callable<String>{
		@Override
		public String call() throws Exception {
			Future<String> header, footer; //页眉, 页脚
			header = exec.submit(new LoadFileTask("header.html"));
			footer = exec.submit(new LoadFileTask("footer.html"));
			String body = renderBody();
			//有可能发生死锁---任务等待子任务完成
			return header.get() + body + footer.get();
		}
                ...
	}
}
  • 每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程"饥饿"死锁,因此需要在代码或配置Executor地配置文件中记录线程池地大小限制或配置限制。

运行时间较长的任务:

  • 避免等待运行时间较长的任务而阻塞过长时间,可以使用阻塞方法的超时版本,如Thread.join, BlockingQueue.put, CutDownLatch.await, Selector.select等。

设置线程池的大小:

  • 线程池的理想大小取决于被提交任务的类型及所部署系统的特性
  • 可根据计算任务类型进行线程池大小:如CPU密集型则可采用Runtime.avaliableProcesses()+1个线程;对于I/O密集型,由于阻塞操作多,可使用更多的线程,如2倍cpu核数。

配置ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, 
                          long keepAliveTime,
                          TimeUnit unit, 
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

线程的创建与销毁:

  • CorePoolSize: 线程池基本大小,即线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程
  • MaxmumPooSize: 线程池最大大小表示可同时活动的线程数量的上限。若某个线程的空闲时间超过了keepAliveTime, 则被标记为可回收的,并且当前线程池的大小大于基本大小,这个线程将被终止。
  • newFixedThreadPool: CorePoolSize = MaxmumPoolSize。
  • newCachedThreadPool: CorePoolSize=0, MaxmumPoolSize=Integer.MAX_VALUE,比较适合执行短时间任务
  • newSingleThreadPool: CorePoolSize=MaxmumPoolSize=1,其不可被重配置
  • newScheduledThreadPool: 只能设置CorePoolSize。内部实现不同于其他的ThreadPoolExecutor, 而是SchedduleThreadPoolExecutor。可执行定时任务或者隔时任务

管理队列任务:

  • 对于Executor, newCachedThreadPool工厂方法时一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像接受网络客户请求的服务应用程序中,如果不进行限制,那么很容易发生过载问题。
  • 只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列可能导致线程"饥饿"死锁问题。此时应该使用无界的线程池,如newCachedThreadPool

饱和策略:

  • 当有界队列被填满后,饱和策略开始发挥作用。
  • jdk提供的几种饱和策略

      1. AbortPolicy(中止策略),默认的饱和策略。会抛出RejectedExecutionException异常。

      2. DiscardPolicy(抛弃策略): 会抛弃该任务。

      3. DiscardOldestPolicy:会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。最好不和优先级队列一起使用,因为它会抛弃优先级最高的任务。

      4. CallerRunsPolicy(调用者运行策略):将任务回退给调用者。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用execute的线程中执行该任务。

/**
 * 创建一个固定大小的线程池,
 * 并采用有界队列与"调用者运行"饱和策略
 */
public void intThreadPool() {
	ThreadPoolExecutor executor = 
			new ThreadPoolExecutor(N_THREADS, N_THREADS,
					0L, TimeUnit.MILLISECONDS, 
					new LinkedBlockingQueue<Runnable>(CAPACITY));
	executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
 * 使用Semaphore来控制任务的提交速率
 */
public class BoundedExecutor {
	private final Executor exec;
	private final Semaphore semaphore;
	
	public BoundedExecutor(Executor exec, int bound) {
		this.exec = exec;
		this.semaphore = new Semaphore(bound);
	}
	
	public void submitTask(final Runnable command){
		try {
			semaphore.acquire(); //提交任务前请求信号量
			exec.execute(new Runnable() {
				@Override
				public void run() {
					try{
						command.run();
					} finally{
						semaphore.release(); //执行完释放信号
					}
				}
			});
		} catch (InterruptedException e) {
			// handle exception
		}
	}
}

线程工厂:

  • 我们可以通过定制线程工厂,从而定制线程池中创建的线程,这样可以实现些扩展功能,如调试信息,设置UncaughtExceptionHandler等。
/**
 * 自定义的线程工厂
 */
public class MyThreadFactory implements ThreadFactory {
	private final String poolName;
	
	public MyThreadFactory(String poolName) {
		super();
		this.poolName = poolName;
	}

	@Override
	public Thread newThread(Runnable r) {
		return new MyAppThread(r);
	}
}

public class MyAppThread extends Thread {
	public static final String DEFAULT_NAME="MyAppThread";
	private static volatile boolean debugLifecycle = false;
	private static final AtomicInteger created = new AtomicInteger();
	private static final AtomicInteger alive = new AtomicInteger();
	private static final Logger log = Logger.getAnonymousLogger();
	
	public MyAppThread(Runnable r) {
		this(r, DEFAULT_NAME);
	}

	public MyAppThread(Runnable r, String name) {
		super(r, name+ "-" + created.incrementAndGet());
		setUncaughtExceptionHandler( //设置未捕获的异常发生时的处理器
				new Thread.UncaughtExceptionHandler() {
					@Override
					public void uncaughtException(Thread t, Throwable e) {
						log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
					}
				});
	}

	@Override
	public void run() {
		boolean debug = debugLifecycle;
		if (debug) 
			log.log(Level.FINE, "running thread " + getName());
		try {
			alive.incrementAndGet();
			super.run();
		} finally {
			alive.decrementAndGet();
			if (debug) 
				log.log(Level.FINE, "existing thread " + getName());
		}
	}
}
  • 当应用需要利用安全策略来控制某些特殊代码库的访问权,可以利用PrivilegedThreadFactory来定制自己的线程工厂,以免出现安全性异常。

在调用构造函数后再定制ThreadPoolExecutor:

  • 可以在创建线程池后,再通过Setter方法设置其基本属性。

扩展ThreadPoolExecutor:

/**
 * 增加日志和记时等功能的线程池
 */
public class TimingThreadPoolExecutor extends ThreadPoolExecutor {
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任务执行开始时间
	private final Logger log = Logger.getAnonymousLogger();
	private final AtomicLong numTasks = new AtomicLong(); //统计任务数
	private final AtomicLong totalTime = new AtomicLong(); //线程池运行总时间

	public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected void beforeExecute(Thread t, Runnable r) {
		super.beforeExecute(t, r);
		log.fine(String.format("Thread %s: start %s", t, r));
		startTime.set(System.nanoTime());
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		try{
			long endTime = System.nanoTime();
			long taskTime = endTime - startTime.get();
			numTasks.incrementAndGet();
			totalTime.addAndGet(taskTime);
			log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
		} finally{
			super.afterExecute(r, t);
		}
	}

	@Override
	protected void terminated() {
		try{
			//任务执行平均时间
			log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
		}finally{
			super.terminated();
		}
	}
}

递归算法的并行性:

书中的一个谜题实例:

/**
 * 串行的谜题解答题
 */
public class SequentialPuzzleSolver<P, M> implements Puzzle<P, M>{
	private final Puzzle<P, M> puzzle;
	private final Set<P> seen = new HashSet<>();
	
	public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
		this.puzzle = puzzle;
	}

	public List<M> solve(){
		P pos = puzzle.initialPosition();
		return search(new Node<P, M>(pos, null, null));
	}
	
	private List<M> search(Node<P, M> node) {
		if (!seen.contains(node.pos)){
			seen.contains(node.pos);
			if (puzzle.isGoal(node.pos)){//找到了目标位置
					return node.asMoveList();
			}
			for (M move: puzzle.legalMoves(node.pos)){
				P pos = puzzle.move(node.pos, move);
				Node<P, M> child = new Node<P, M>(pos, move, node);
				List<M> result = search(child); //递归搜索
				if (result != null) 
					return result;
			}
		}
		return null;
	}
       ..,
}
上面解决器的并发实现:
/**
 * 并发的谜题解答器
 */
public class ConcurrentPuzzleSolver<P, M> {
	private final Puzzle<P, M> puzzle;
	private final ExecutorService exec;
	private final ConcurrentHashMap<P, Boolean> seen;
	final ValueLatch<Node<P, M>> solution
	                = new ValueLatch<Node<P, M>>(); //存放答案
       ...
	public List<M> solve() throws InterruptedException{
		P p = puzzle.initialPosition();
		exec.execute(newTask(p, null, null));
		//阻塞直到找到答案
		Node<P, M> solvNode = solution.getValue();
		return solvNode == null ? null : solvNode.asMoveList();
	}

	private Runnable newTask(P p, M m, Node<P, M> n) {
		return new SolverTask(p, m, n);
	}
	
	class SolverTask extends Node<P, M> implements Runnable {

		public SolverTask(P pos, M move, Node<P, M> prev) {
			super(pos, move, prev);
		}

		@Override
		public void run() {
			if (solution.isSet() //若已经找到了答案,阻止其他线程继续再找
					|| seen.putIfAbsent(pos, true) != null){
				return; 
			}
			if (puzzle.isGoal(pos)){  //找到了
				solution.setValue(this);
			} else{
				for (M m : puzzle.legalMoves(pos)){
					//继续找
					exec.execute(newTask(puzzle.move(pos, m), m, this));
				}
			}
		}
	}
}

/**
 * 携带结果的闭锁
 */
public class ValueLatch<T> {
	private T value = null;
	private final CountDownLatch done = new CountDownLatch(1);
	
	public T getValue() throws InterruptedException {
		done.await(); //阻塞直到设置了值
		synchronized (this) {
			return value;
		}
	}

	public boolean isSet() {
		return done.getCount() == 0;
	}

	public synchronized void setValue(T newValue) {
		if (!isSet()){
			value = newValue;
			done.countDown();
		}
	}
}

但是并发的解决器,对于未找到答案不是很好处理,可以通过计数来实现:

/**
 * 在解决器中找不到解答时
 */
public class PuzzleSolver<P, M> extends ConcurrentPuzzleSolver<P, M> {
	private final AtomicInteger taskCount = new AtomicInteger(); //通过计数来标志是否找到解答
       ...
	@Override
	protected Runnable newTask(P p, M m, Node<P, M> n) {
		return new CountingSolverTask(p, m, n);
	}

	class CountingSolverTask extends SolverTask{
		
		public CountingSolverTask(P pos, M move, Node<P, M> prev) {
			super(pos, move, prev);
			taskCount.incrementAndGet();
		}

		@Override
		public void run() {
			try{
				super.run();
			} finally{
				if (taskCount.decrementAndGet() == 0){
					solution.setValue(null);
				}
			}
		}
	}
}

不吝指正。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
10 收藏
1
分享
返回顶部
顶部