文档章节

java并发备忘

Funcy1122
 Funcy1122
发布于 08/15 00:45
字数 6489
阅读 3
收藏 0

不安全的“先检查后执行”,代码形式如下:

	if(条件满足){   //这里容易出现线程安全问题
		//doSomething
	}else{
		//doOther
	}

读取-修改-写入

原子操作:使用CAS技术,即首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子的方式将V中的值由A变为B(只要这期间内没有任何线程将V的值修改成其他值)。

voliate修饰符:

java.util.concurrent.atomic包中的原子类,如

  • AtomicInteger:int的原子操作
  • AtomicLong:long的原子操作
  • AtomicBoolean:boolean原子操作
  • AtomicReference<V>:引用类型的原子操作

利用CAS实现无阻塞栈:

  	public class ConcurrentStack<E> {
  		AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
  		public void push(E item) {
  			Node<E> newHead = new Node<E>(item);
  			Node<E> oldHead;
  			do {
  				oldHead = top.get();
  				newHead.next = oldHead;
  			}while(!top.compareAndSet(oldHead, newHead));
  		}
  		public E pop() {
  			Node<E> oldHead;
  			Node<E> newHead;
  			do {
  				oldHead = top.get();
  				if(oldHead == null){
  					return null;
  				}
  				newHead = oldHead.next;
  			}while(!top.compareAndSet(oldHead, newHead));
  			return oldHead.item;
  		}
  		private static class Node<E> {
  			public final E item;
  			public Node<E> next;
  			public Node(E item) {
  				this.item = item;
  			}
  		}
  	}

synchronized细粒度控制代码块:

	public class CachedFactorizer implements Servlet {
		@GuardedBy("this") private BigInteger lastNumber;
		@GuardedBy("this") private BigInteger[] lastFactors;
		@GuardedBy("this") private long hits;
		@GuardedBy("this") private long cacheHits;

		public synchronized long getHits() {
			return hits;
		}

		public synchroized double getCacheHitRatio() {
			return (double) cacheHits / (double) hits;
		}

		public void service(ServletRequest req, SevletResponse resp) {
			BigInteger i = extractFromRequest(req);
			BigInteger[] factors = null;
			synchroized (this) {
				++ hits;
				if(i.equals(lastNumber)) {
					++ cacheHits;
					factors = lastFactors.clone();
				}
			}

			if(factors == null) {
				factors = factor(i);
				synchronized (this) {
					lastNumber = i;
					lastFactors = factors.clone();
				}
			}
			encodeIntoResponse(resp, factors);
		}

	}

jsp内置对象的线程安全

jsp中提供的8个类变量中,

  • OUT,REQUEST,RESPONSE,SESSION,CONFIG,PAGE,PAGECONXT是线程安全的,
  • 因为每个线程对应的request,response对象都是不一样的,
  • 不存在共享问题,APPLICATION在整个系统内被使用,所以不是线程安全的。

其他对象线程安全

  • 成员变量:成员变量在堆栈中分配,并被属于该实例的所有线程共享,所以不是线程安全的
  • 局部变量:局部变量在堆栈中分配,因为每个线程都有自己的堆栈,所以是线程安全的
  • 静 态 类:静态类不用被实例化就可以直接使用,也不是线程安全的
  • 外部资源:在程序中会有多个线程同时操作同一资源(如数据库访问),此时也要注意同步问题。

java内存模型:JMM

jmm主要是为了规定线程和内存之间的一些关系。 根据jmm的设计,

  • 系统存在一个主内存(main memory),java中所有实例变量都存储在主存中,对于所有线程都是共享的。
  • 每条线程都有自己的工作内存(Working Memory),工作内存由缓存和堆栈两部分组成,
  • 缓存中保存的是主存中变量的拷贝,缓存可能并不总和主存同步,也就是缓存中变量的修改可能没有立刻写到主存中;
  • 堆栈中保存的是线程的局部变量,线程之间无法相互直接访问堆栈中的变量。

synchronized关键字:

synchronized是Java语言的关键字,当修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。

  1. 当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行。另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块。
  2. 然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(this)同步代码块。
  3. 尤其关键的是,当一个线程访问object的一个synchronized(this)同步代码块时,其他线程对object中所有其它synchronized(this)同步代码块的访问将被阻塞。
  4. 第3点同样适用其它同步代码块。也就是说,当一个线程访问object的一个synchronized(this)同步代码块时,它就获得了这个object的对象锁。结果,其它线程对该object对象所有同步代码部分的访问都被暂时阻塞。
  5. 以上规则对其它对象锁同样适用.
需要明确几点
  1. 无论synchronized关键字加在方法上还是对象上,它取得的锁都是对象,而不是把一段代码或函数当作锁。而且同步方法很可能还会被其他线程的对象访问。
  2. 每个对象只有一个锁(lock)与之相关联。
  3. 实现同步是要很大的系统开销作为代价的,甚至可能造成死锁,所以尽量避免无谓的同步控制。
  4. 针对每个类,也有一个锁,所以synchronized static方法可以在类的范围内防止对static数据的并发访问。

线程池

Executors的静态方法,请求时即响应,减少了线程的创建时间,提高了响应速度,当程序中有“new Thread(runnable).start()”时,可以考虑使用Execute来代替Thread。

为每个请求启动一个新线程的Executor:

public class ThreadPerTaskExecutor implemets Executor {
	public void execute(Runnable r){
		new Thread(r).start();
	}
}

jdk内置的几大线程池:

  1. newCachedThreadPool():执行时,先查找线程池中是否有相同的线程,有则用线程池中的线程,否则创建一个新线程。线程执行完成后,一定时间内再无其他操作,线程将会被终结。
  2. newFixedThreadPool(int nThreads):创建指定数量的线程,执行时若线程池数量达到上线,程序将会等待其他线程执行完成再执行。
class TaskExecutionWebServer {
	private static final int NTHREADS = 100;
	private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);
	public static void main(String[] args){
		ServerSocket socket = new ServerSocket(80);
		while(true) {
			final Socket connection = socket.accpet();
			Runnable task = new Runnable() {
				public void run() {
					handleRequest(connection);
				}						
			};
			exec.execute(task);
		}
	}
}
  1. newSingleThreadExecutor():相当于线程数量为1的FixedThreadPool,单线程执行。
  2. newScheduledThreadPool():创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer.

DelayQueue:延时队列

CompletionService:Executor与BlockingQueue

CompletionService将Executor与BlockingQueue功能融合在一起,可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService并将计算部分委托给一个Executor.

public abstract class Renderer {
    private final ExecutorService executor;
    public Renderer(ExecutorService executor) {
        this.executor = executor;
    }
    void renderPage(CharSequence source) {
        final List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService =
                new ExecutorCompletionService<ImageData>(executor);
        for (final ImageInfo imageInfo : info)
            completionService.submit(new Callable<ImageData>() {
                public ImageData call() {
                    return imageInfo.downloadImage();
                }
            });
        renderText(source);
        try {
            for (int t = 0, n = info.size(); t < n; t++) {
                Future<ImageData> f = completionService.take();   	//获得Future对象
                ImageData imageData = f.get();						//线程会在此阻塞,等待下载完成再运行
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }			
}

线程池创建示例:

	ExecutorService exec = Executors.newCachedThreadPool();
	exec.xxxXxxx();       //调用相关方法
  • ExecutorService接口:任务执行的生命周期管理方法:
  • shutdown方法:将执行平缓的关闭过程:不再接受新任务,同时等待已经提交的任务
  • shutdownNow方法:执行粗暴的关闭过程:尝试取消所有运行中的任务,并且不再启动队列中尚未开始的任务。
示例:支持关闭操作的web服务器:
class ListcycleWebserver {
	private final ExecutorSevice exec = ... ;
	public void start() throws IOException {
		ServerSocket socket = new ServerSocket(80);
		while (! exec.isShutdown()){
			try{
				final Socket conn = socket.accep();
				exec.execute(new Runnable(){
					public void run(){
						handleRequest(conn);
					}
				});
			}catch(RejectedExecutionException e){
				if(!exec.isShutdown()){
					log("task submission rejected", e);
				}
			}
		}
	}
	public void stop() {
		exec.showdown();
	}
	void handleRequest(Socket connection) {
		Request req = readRequest(connection);
		if(isShutdownReqeust(req)){
			stop();
		}else{
			dispatchRequest(req);
		}
	}
}

从任务中产生返回值

Runnable是执行工作的独立任务,不返回任何值。如果希任务完成时返回一个值,则可以实现Callable接口,覆盖call()方法,使用ExecutorService.submit()方法调用,submit()方法返回Future对象,示例如下:

	class TaskWithResult implements Callable<String> {
		private int id;
		public TaskWithResult(int id){
			this.id = id;
		}
		public String call() {
			return "result of TaskWithResult:" + id;
		}
	}
	public class CallableDemo {
		public static void main(String[] args) {
			ExecutorService exec = Executors.newCachedThreadPool();
			ArrayList<Future<String>> results = new ArrayList<Future<String>> ();
			for(int i = 0; i < 10; i++){
				results.add(exec.submit(new TaskWithResult(i)));
			}
			for(Future<String> fs : results) {
				try{
					System.out.println(fs.get());
				}catch(Exception e){
					e.printStackTrace();
				}finally{
					exec.shutdown();
				}
			}
		}
	}

解决共享资源竞争——给代码加锁

使用Lock类加锁时,必须在finally代码块中释放锁,防止异常时锁不释放。

Lock lock = new ReentrantLock();  
lock.lock();  
try {   
  // update object state  
}finally {  
  lock.unlock();  //一定要释放锁,一般使用try-finally释放 
}  

ReentrantLock也支持其他锁

1.定时锁
使用tryLock(long timeout, TimeUnit unit)方法可以避免死锁:指定时间内若未获得所需要的全部锁,则释放所获得的所有锁:
public boolean trySendOnSharedLine(String message, long timeout, TimeUnit unit) throw Exception {
	long nanosToLock =  unit.toNanos(timeout) - estimatedNanosToSend(message);
	if(!lock.tryLock(nanosToLock, NANOSECONDES)) {
		return false;
	}
	try{
		return sendOSharedLine(message);
	}finally{
		lock.unlock();
	}
}
2.可中断的锁获取操作
可中断的锁获取操作同样能在可取消的操作中使用加锁,lockInterruptibly方法能够在获取锁的同时保持对中断的响应:
public boolean sendOnsharedLine(String message) throws Exception {
	lock.lockInterruptibly();
	try{
		return cancellableSendOnSharedLine(message);
	}finally{
		lock.unlock();
	}
}
private boolean cancellableSendOnSharedLine(String message) throws Exception {
	...
}
3.读写锁--ReentrantReadWriteLock
与ReentrantLock类似,ReentrantReadWriteLock在创建时也可选择是一个非公平锁还是公平锁。在公平的锁中,等待时间最长的线程将优先获得锁。如果这个锁由读线程持有,而另一个线程请求写入锁,那么其他读线程都不能获得读取锁,直到写线程使用完并且释放了写入锁。在非公平的锁中,线程获得许可的顺序是不确定的,写线程降级为读线程是可以的,但从读线程升级为写线程是不可以的。

示例:用读写锁来包装Map

public class ReadWriteMap<K, V>{
	private final Map<K, V> map;
	private final ReadWriteLock lock = new ReentrantReadWriteLock();
	private final Lock r = lock.readLock();
	private final Lock w = lock.writeLock();
	public ReadWriteMap(Map<K, V> map) {
		this.map = map;
	}
	public V put(K key, Value value) {
		w.lock();
		try{
			return map.put(key, value);
		}finally{
			w.unclock();
		}
	}
	public V get(Object key) {
		r.lock();
		try{
			return map.get(key);
		}finally{
			r.unlock();
		}
	}
}
4.容器遍历时,可以克隆一个副本,并在副本上进行遍历来解决线程不安全。
  • 如果直接在容器上加锁,当容器规模很大时,效率会很低。
  • 容器隐式遍历发生位置:hashCode和equals等方法会间接地执行迭代操作,当容器作为另一个元素或键值时,就会出现这种情况。同样,containsAll/removeAll和retainAll等方法,以及把容器作为参数的构造方法,都会对空串进行迭代。
5.高并发情况下,集合类的代替方案
  • java.util.concurrent包中的ConcurrentHashMap来代替Map
  • java.util.concurrent包中的CopyOnWriteArrayList代替以遍历为主的的List
  • java.util.concurrent包中的CopyOnWriteArraySet代替以遍历为主的的Set
  • java.util.concurrent包中的ConcurrentSkipListMap代替SortedMap
  • java.util.concurrent包中的ConcurrentSkipListSet代替SortedSet

BlockingQueue的多种实现

  • LinkedBlockingQueue、ArrayBlockingQueue是FIFO队列,二者分别与LinkedList、ArrayList类似。
  • PriorityBlockingQueue是一个按优先级排序的队列,可以按某种顺序而不是FIFO来处理元素。
  • SynchronousQueue:不是一个真正的队列,不维护元素的存储空间,而是维护一组线程,由线程来操作元素。

ConcurrentHasMap

ConcurrentHasMap使用分段锁来进行线程访问控制,单线程环境下只损失非常小的性能。不会抛出ConcurrentModificationException非“及时失败”,并发环境下得到的size和isEmpty可能是过期的。

闭锁

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,这扇门永远保持打开。

1) CountDownLatch

一种灵活的闭锁实现,可以使一个或多个线程等待一组事件发生,闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而awit方法会一直阻塞直到计数器达到零,或者等待的线程中断,或者等待超时。

public long timeTasks(int nThreads, final Runnable task) throws Exception {
	final CountDownLatch startGate = new CountDownLatch(1);
	final CountDownLatch endGate = new CountDownLatch(nThreads);
	for(int i = 0; i < nThreads; i++) {
		Thread t = new Thread() {
			public void run() {
				try{
					startGate.await(); //startGate阻塞
					try{
						task.run();
					}finally{
						endGate.countDown();
					}
				}catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		};
		t.start();
	}
	long start = System.nanoTime();
	startGate.countDown();  //startGate的count降为0,创建的线程开始执行
	endGate.await();  		//endGate阻塞,直到创建的线程运行完成为止
	long end = System.nanoTime();
	return end - start;
}
2) FutureTask

表示一种抽象的可生成结果的计算,FutureTask表示的计算结果是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行、正在运行和运行完成。当FutureTask进入完成状态后,它会永远停止在这个状态上。

  • Future.get的行为取决于任务的状态,如果任务已经完成,那么get会立即返回结果,否则get会阻塞直到任务进入完成状态,然后返回结果或者抛出异常。
class Preloader {
	//定义一个future,从数据库中获取产品信息,并用call返回获得结果
	private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo> (new Callable<ProductInfo>(){
		public ProductInfo call() throws Exception {    //此方法可以返回获取结果
			return loadProdectInfo();
		}

	});
	private final Thread thread = new Thread (future);   //放进线程中执行
	public void start() {
		thread.start();
	}
	public ProductInfo get() throws Exception {  //获取产品信息,如果没有加载完成,则会阻塞
		return future.get();
	}
}
3) Semaphore-信号量

计数信号量用来控制同时访问某个特定资源的操作数量,或同时执行某个指定操作的数量,还可以实现某种资源池,或对容器施加边界。

  • acquire():获得许可,只有获得许可的线程才可以执行,否则会阻塞直到获得一个许可。
  • release():释放许可。
public class SemaphoreTest {
	private final Set<T> set;
	private final Semaphore sem;		
	public SemaphoreTest(int bound) {
		this.set = Collections.synchronizedSet(new HashSet<T>());
		sem = new Semaphore(bound);
	}			
	public boolean add(T o) throws Exception{
		sem.acquire();                 //获得一个许可,若许可不足,则阻塞直到得到许可
		boolean wasAdded = false;
		try {
			wasAdded = set.add(o);
			return wasAdded;
		} finally {
			if(!wasAdded) {
				sem.release();			//释放许可
			}
		}
	}			
	public boolean remvoe(Object o) {
		boolean wasRemove = set.remove(o);
		if(wasRemove){
			sem.release();
		}
		return wasRemove;
	}			
}
4) 栅栏--CyclicBarrier

CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。

CyclicBarrier初始时还可带一个Runnable的参数 此Runnable任务在CyclicBarrier的数目达到后,在所有其它线程被唤醒前被执行。

public class CyclicBarrierTest {
	public static class ComponentThread implements Runnable {
		CyclicBarrier barrier;// 计数器
		int ID; // 组件标识
		int[] array; // 数据数组
		// 构造方法
		public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
			this.barrier = barrier;
			this.ID = ID;
			this.array = array;
		}
		public void run() {
			try {
				array[ID] = new Random().nextInt(100);
				System.out.println("Component " + ID + " generates: " + array[ID]);
				// 在这里等待Barrier处
				System.out.println("Component " + ID + " sleep");
				barrier.await();
				System.out.println("Component " + ID + " awaked");
				// 计算数据数组中的当前值和后续值
				int result = array[ID] + array[ID + 1];
				System.out.println("Component " + ID + " result: " + result);
			} catch (Exception ex) {
			}
		}
	}
	// 测试CyclicBarrier的用法
	public static void testCyclicBarrier() {
		final int[] array = new int[3];
		CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
			// 在所有线程都到达Barrier时执行
			public void run() {
				System.out.println("testCyclicBarrier run");
				array[2] = array[0] + array[1];
			}
		});
		// 启动线程
		new Thread(new ComponentThread(barrier, array, 0)).start();
		new Thread(new ComponentThread(barrier, array, 1)).start();
	}
	public static void main(String[] args) {
		CyclicBarrierTest.testCyclicBarrier();
	}
}
```	

7.Future表示一个任务的生命周期,并提供了相应的方法判断是否已经完成或取消,以及
  获取任务的结果和取消任务等。
  	指定时间内等待:

Page renderPageWithAd() throws InterruptedException { long endNanos = System.nanoTime() + TIME_BUDGET; Future<Ad> f = exec.submit(new FetchAdTask()); Page page = renderPageBody(); Ad ad; try{ long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS); }catch(ExecutionException e){ ad = DEFAULT_AD; }catch(TimeoutException e){ ad = DEFAULT_AD; f.cancel(ture); } page.setAd(ad); return page; }

  	创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,可以使用invokeAll方法:

private class QuoteTask implements Callable<TravelQuote> { //实现了Callable接口 private final TravelCompany company; private final TravelInfo traveInfo; ...... public TravelQuote call() throws Exception { //重写call()方法 return company.solicitQuote(travelInfo); } } publc List<TravelQuote> getRankedTravelQuotes (TravelInfo travelInfo, Set<TravelCompany> companies, Comparator<TravelQuote> ranking, long time, TimeUtil unit) throws InterruptedException { List<QuoteTask> task = new ArrayList<QuoteTask>(); for(TravelCompany company : companies) { tasks.add(new QuoteTask(company, travelInfo)); //将任务添加到一个List中 } List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit); //调用全部任务 List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size()); Iterator<QuoteTask> taskIter = tasks.iterator(); for(Future<TravelQuote> f : futures) { QuoteTask task = taskIter.next(); try{ quotes.add(f.get()); //查询任务执行结果,若未执行完成或未超时,则线程会阻塞 }catch(ExecutionException e){ quotes.add(task.getFailureQuote(e.getCause())); }catch(CancellationException e){ quotes.add(task.getTimeoutQuote(e)); } } Collections.sort(quotes, ranking); reutrn quotes; }


8.锁
	1)自旋锁:为了让线程等待,可以让线程执行一个忙循环(自旋),这项技术就是所谓的自旋锁。
	
9.线程的取消
	(1)通过中断来取消
	```
class PrimeProducer extends Thread {
	private final BlockingQueue<BigInteger> queue;
	PrimeProducer(BolckingQueue<BigInteger> queue) {
		this.queue = queue;
	}
	public void run() {
		try{
			BigInteger p = new BigInteger.ONE;
			while(!Thread.currentThread().isInterrupted()) {
				queue.put(p = p.nextProbablePrime());
			}
		}catch(InterruptedException e) {
			/* 允许线程退出 */
		}
	}
	public void cancel() {
		interrupt();
	}
}
(2)通过Future来实现取消
public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
	Future<?> task = taskExec.submit(r);
	try{
		task.get(timeout, unit);
	}catch(TimeoutException e) {
		//接下来的任务被取消
	}catch(ExecutionException e) {
		//如果在任务中抛出了异常,那么重新抛出该异常
		throw launderThrowable(e.getCause);
	}finally{
		//如果任务已经结束,那么执行取消操作也不会带来任何影响
		task.cancel(true);	//如果任务正在运行,那么将被中断
	}
}
(3)日志服务的关闭示例:
public class LogService {
	private final BlockingQueue<String> queue;
	private final LoggerThread loggerThread;
	private final PrintWriter writer;
	
	private boolean isShutdown;
	private int reservations;
	
	public LogService(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>();
        this.loggerThread = new LoggerThread();
        this.writer = new PrintWriter(writer);
    }
	
	public void start() {
		loggerThread.start();
	}
	
	public void stop() {
		synchronized(this) {
			isShutdown = true;
		}
		loggerThread.interrupt();
	}
	
	public void log(String msg) throws InterruptedException {
		synchronized (this) {
			if(isShutdown) {
				throw new IllegalStateException("...");
			}
			++reservations;
		}
		queue.put(msg);
	}
	
	private class LoggerThread extends Thread {
		public void run() {
			try{
				while(true) {
					try{
						synchronized(LogService.this) {
							if(isShutdown && reservations == 0) {
								break;
							}
						}
						String msg = queue.take();
						synchronized(LogService.this){
							--reservations;
						}
						writer.println(msg);
					}catch(InterruptedException e) {
						//doSomething
					}
				}
			}finally{
				writer.close();
			}
		}
	}
}

10.设置线程池的大小 要想正确地设置线程池大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个cpu,多大的内存,任务是计算密集型、IO密集型还是二者皆可,它们是否需要像jdbc连接这样稀缺的资源,如果需要执行不同类型的任务,并且它们之间的行为差别相差很大的话,那么应该考虑使用多个线程池,从而合每个线程池可以根据各自的工作负载来调整。 对于计算密集型任务:在拥有n个cpu的系统上,当线程池的大小为n+1时,通常能实现最优的利用率。 对于包含IO操作或者其他阻塞操作的任务:由于线程不会一址执行,因此线程池的规模应该会更大,要正确地设置线程池的大小,必须估 算出任务的等待时间与计算时间的比值。这种估算不需要很精确,且可以通过一些分析或监控工具来获得。要达到期望的使用率,可 以用以下方式计算:nThread = nCPU * u * (1 + w/c)其中,nThread表示线程最优数量,nCpu表示cpu数量,u表示cpu的使用率, w表示等待时间,c表示计算时间(cpu使用时间)

11.扩展ThreadPoolExecutor ThreadPoolExecutor是可扩展的,可以重写beforeExecutor/afterExecute/terminated扩展ThreadPoolExecutor的行为。

public class TimingThreadPool extends ThreadPoolExecutor {
	private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
	private final Logger log = Logger.getLogger(TimingThreadPool.class);
	private final AtomicLong numTasks = new AtomicLong();
	private final AtomicLong totalTime = new AtomicLong();
	
	public TimingThreadPool() {
        super(1, 1, 0L, TimeUnit.SECONDS, null);
    }
	
	protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.info(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.info(String.format("Thread %s: end %s, time=%dns",
                    t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }			
	protected void terminated() {
		try {
			log.info(String.format("Terminated: avg time = %dns", totalTime.get() / numTasks.get()));
		}finally{
			super.terminated();
		}
	}
	
}

12.死锁 利用公开调用来避免死锁

  --死锁示例:
class Taxi {
	private Point location, destination;
	private final Dispatcher dispatcher;
	...
	public synchronized Point getLocation() {
		return location;
	}
	public synchronized void setLocation(Point location) {
		this.location = location;		
		if(location.equals(destionation)) {
			dispatcher.notifyAvailable(this);
		}
	}
}	
class Dispatcher {
	private final Set<Taxi> taxis;
	private final Set<Taxi> avaiableTaxis;
	...
	public synchronized void notifyAvailable(Taxi taxi) {
		availableTaxis.add(taxi);
	}
	public synchronized Image getImage(){
		Image image = new Image();
		for(Taxi t : taxis) {
			image.drawMarket(t.getLocation);
		}
		return image;
	}
}
	因为setLocation和notifyAvailable都是同步方法,因此调用setLocation的线程将首先获取Taxi的锁,然后获取Dispatcher。同样,调用getImage的线程将首先获取Dispatcher锁,然后再获取每一个Taxi锁。两个线程按照不同的顺序来获取两个锁,由此可能产生死锁。如果在持有锁的情况下调用 某个外部方法,那么就需要警惕死锁。
  • 利用公开调用来避免死锁示例
class Taxi {
	private Point location, destination;
	private final Dispatcher dispatcher;
	...
	public synchronized Point getLocation() {
		return location;
	}
	public void setLocation(Point location) {
		boolean readchDestination;
		synchronized(this) {  //加锁的不再是方法而是代码块
			this.location = location;
			readchDestination = location.equals(destionation);
		}
		if(readchDestination) {
			dispatcher.notifyAvailable(this);
		}
	}
}	
class Dispatcher {
	private final Set<Taxi> taxis;
	private final Set<Taxi> avaiableTaxis;
	...
	public synchronized void notifyAvailable(Taxi taxi) {
		availableTaxis.add(taxi);
	}
	public Image getImage(){
		Set<Taxi> copy;
		synchronized(this) {
			copy = new HashSet<Taxi>(taxis);  //复制一份
		}
		Image image = new Image();
		for(Taxi t : copy) {
			image.drawMarket(t.getLocation);
		}
		return image;
	}
}
	如果在调用某个方法时不需要持有锁,那么这种调用被称为开放调用。这种方式的核心思想:不在一个同步块中调用另一个同步块

13.优化锁性能: 1)锁分解技术: 锁定this对象:

public class ServerStatus {
	public final Set<String> users;
	public final Set<String> queries;
	public synchronized void addUser(String u) {
		users.add(u);
	}
	public synchronized void addQuery(String q) {
		queries.add(q);
	}
	public synchronized void removeUser(String u) {
		users.remove(u);
	}
	public synchronized void removeQuery(String q) {
		queries.remove(q);
	}
}
  对锁进行分解:在ServerStatus中使用了两个成员变量:users和queries,在使用时,可以分别将这两个对象进行锁定。
public class ServerStatus {
	public final Set<String> users;
	public final Set<String> queries;
	public void addUser(String u) {
		synchronized(users){
			users.add(u);
		}
	}
	public void addQuery(String q) {
		synchronized(queries){
			queries.add(q);
		}				
	}
	public void removeUser(String u) {
		synchronized(users){
			users.remove(u);
		}
	}
	public void removeQuery(String q) {
		synchronized(queries){
			queries.remove(q);
		}
	}
}
2)锁分段技术
	在某些情况下,可以将锁分解技术进一步扩展为对一组独立对象上的锁进行分解,这种情况被称为锁分段,例如在CurrentHashMap的实现中使用了一个包含16个锁的数组,每个锁保护所有散列桶的1/16,其中第N个散列桶由第N/mod 16个锁来保护:
public class StripedMap {
	//同步策略:buckets[n]由locks[n % N_LOCKS]来保护
	private static final int N_LOCKS = 16;
	private final Node[] buckets;
	private final Object[] locks;
	private static class Node{...}
	public StripedMap(int numBuckets) {
		buckets = new Node[numBuckets];
		locks = new Object[N_LOCKS];
		for(int i = 0; i < N_LOCKS; i++) {
			locks[i] = new Object();
		}
	}
	private final int hash(Object key) {
		return Math.abs(key.hashCode % buckets.length);
	}
	public Object get(Object key) {
		int hash = hash(key);
		synchronized(locks[hash % N_LOCKS]) {  //加锁
			for(Node m = buckets[hash]; m != null; m = m.next()) {
				if(m.key.equals(key)) {
					return m.value();
				}
			}
		}
		return null;
	}
	public void clear() {
		for(int i = 0; i < buckets.length; i++){
			synchronized(locks[i % N_LOCKS]) {  //加锁
				buckets[i] = null;
			}
		}
	}
}

13.有界缓存实现的基类

public abstract class BaseBoundedBuffer<V> {
	private final V[] buf;
	private int tail;
	private int head;
	private int count;
	protect BaseBoundedBuffer(int capacity) {
		this.buf = (V[]) new Object[capacity];
	}
	protected synchronized final void doPut(V v) {
		buf[tail] = v;
		if(++tail == buf.length) {
			tail = 0;
		}
		++count;
	}
	protected synchronized final V doTake() {
		V v = buf[head];
		buf[head] = null;
		if(++head == buf.length) {
			head = 0;
		}
		--count;
		return v;
	}
	public synchronized final boolean isFull() {
		return count == buf.length;
	}
	public synchronized final boolean isEmpty() {
		return count == 0;
	}
}

阻塞实现缓存:当多个线程竞争时,运行依然慢

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public SleepyBoundedBuffer(int size) {super(size);}
	public void put(V v) throws InterruptedException {
		while(true) {
			synchronized(this) {
				if(!isFull()){
					doPut(v);
					return;
				}
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}
	public V take() throws InterruptedException {
		while(true) {
			synchronized(this) {
				if(!isEmpty()){
					doTake();
					return;
				}
			}
			Thread.sleep(SLEEP_GRANULARITY);
		}
	}
}

条件队列(优于上一个实现): 阻塞实现缓存:当多个线程竞争时,运行依然慢

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public BoundedBuffer(int size) {super(size);}
	public synchronized void put(V v) throws InterruptedException {
		while(isFull()) {
			wait();
		}
		doput(v);
		notifyAll();
	}
	public synchronized V take() throws InterruptedException {
		while(isEmpty()) {
			wait();
		}
		V v = doTake();
		notifyAll();
		return v;
	}
}

使用条件通知来实现阻塞队列:条件队列中的notifyAll()可能会很低效,特别是当所有线程唤起后,如里只有一条线程满足条件,那么其他线程就会再次等待,这样唤醒-等待需要大量的线程切换开销,因此可以使用条件队列来进行控制:当队列为空或者满时才通知其他线程。

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
	public BoundedBuffer(int size) {super(size);}
	public synchronized void put(V v) throws InterruptedException {
		while(isFull()) {
			wait();
		}
		boolean wasEmpty = isEmpty();  	//这两条语句的顺序很重要要
		doput(v);						//颠倒顺序可能会发现死锁
		if(wasEmpty){     //线程为空时,才通知其他线程,其他线程唤醒后,执行put方法的线程执行,执行take方法的线程再次等待
			notifyAll();
		}		
	}
	public synchronized V take() throws InterruptedException {
		while(isEmpty()) {
			wait();
		}
		boolean wasFull = isFull();   	//这两条语句的顺序很重要要
		V v = doTake();					//颠倒顺序可能会发现死锁
		if(wasFull){	  //线程为满时,才通知其他线程,其他线程唤醒后,执行take方法的线程执行,执行put方法的线程再次等待
			notifyAll();			
		}
		return v;
	}
}

使用显示条件变量的有界缓存 一个Condiction和一个Lock关联在一起,就像一个队列和一个内置锁关联一样。在Condition对象中,与wait、notify和notifyAll方法对应的分别是await、signal和signalAll.但是,Condition对Object进行了扩展,因而它也包含awit和notify方法。一定要确保使用正确的版本。

public class ConditionBoundedBuffer<T> {
	protected final Lock lock = new ReentrantLock();
	private final Condition notFull = lock.newCondition();
	private final Condition notEmpty = lock.newCondition();
	private final T[] items = (T[]) new Object[BUFFER_SIZE];
	private int tail, head, count;
	public void put(T x) throws InterruptedExcetpion {    //阻塞直到notFull
		lock.lock();
		try{
			while(count == items.length){
				notFull.await();
			}
			item[tail] = x;
			if(++tail == items.length) {
				tail = 0;
			}
			++count;
			notEmpty.signal();
		}finally{
			lock.unlock();
		}
	}
	public T take() throws InterruptedExcetpion {		//阻塞直到notEmpty
		lock.lock();
		try{
			while(count == 0) {
				notEmpty.await();
			}
			T x = items[head];
			items[head] = null;
			if(++ head == items.length) {
				head =0l
			}
			--count;
			notFull.signal();
			return x;
		}finally{
			lock.unlock();
		}
	}
}

14.Synchronizer剖析 使用Lock来实现信号量

public class SemaporeOnLock {
	private final Lock lock = new ReentrantLock();
	private final Condition permitsAvailable = lock.newCondition();
	private int permits;
	SemaporeOnLock(int initialPermits) {
		lock.lock();
		try{
			permits = initialPermits;
		}finally{
			lock.unlock();
		}
	}
	public void acquire() throws InterruptedException {
		lock.lock();
		try{
			while(permits <= 0){
				permitsAvailable.await();
			}
			--permits;
		}finally{
			lock.unlock();
		}
	}
	public void release(){
		lock.lock();
		try{
			++permits;
			permitsAvailable.signal();
		}finally{
			lock.unlock();
		}
	}
}

© 著作权归作者所有

共有 人打赏支持
Funcy1122
粉丝 7
博文 77
码字总数 96914
作品 0
广州
后端工程师
私信 提问
jdbctemplate调用存储过程

项目需要使用原生态的jdbc调用存储过程,写法如下,备忘一下 首先声明一个存储过程 CREATE DEFINER = @ PROCEDURE (in par1 varchar(1000),in par2 varchar(100),in par3 varchar(200),in p...

王小明123
2013/06/17
0
0
Java 下一代: Groovy、Scala 和 Clojure

在与 Martin Fowler 共同参加的一次主题演讲中,他提供了一个敏锐的观察报告: Java 的遗产是 平台,不是 语言。 最初的 Java 技术工程师曾做过一个了不起的决定,将语言从运行时中分离出来,...

一只死笨死笨的猪
2014/10/23
0
0
备忘小算法:Java将一维数组数据绘制成N行M列矩阵(如九宫格)

 备忘小算法:Java将一维数组数据绘制成N行N列矩阵(如九宫格) 一个小算法的备忘:用Java将一维数组数据绘制成N行M列的矩阵。特别的,如果刚好9个数据,则列数即为3,行数即为3,那么就是典...

开开心心过
2015/09/11
0
0
Android之联系人PinnedHeaderListView使用

Android联系人中联系人列表页的ListView做得用户体验非常好的,于是想把它从源码中提取出来,以便日后使用。写了一个简单的例子,一方面算是给自己备忘,另一方面跟大家分享一下。 好了,先来...

JayPark不作死
2014/10/27
0
0
Java面试:投行的15个多线程和并发面试题

本文由ImportNew -一杯哈希不加盐 翻译自dzone。欢迎加入翻译小组。转载请见文末要求。 多线程和并发问题已成为各种 Java 面试中必不可少的一部分。如果你准备参加投行的 Java 开发岗位面试,...

ImportNew
08/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

iframe里弹出的层显示在整个网页上

通过在iframe页面添加js脚本,动态给父窗体创建一个div,然后设置让其显示在最顶层这样就可以了 在文件夹中创建两个文件,一个iframe页面,一个父页面index。

少年已不再年少
28分钟前
1
0
聊聊storm trident spout的_maxTransactionActive

序 本文主要研究一下storm trident spout的_maxTransactionActive MasterBatchCoordinator storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java ......

go4it
37分钟前
1
0
js时间函数getTime() 在苹果手机上返回NaN的问题

一、出现问题 var newStartDate = new Date('2017-08-30');var newStartTime = newStartDate.getTime(); 获取到的时间戳,在Android手机正常,在IPhone中返回NaN。 问题说明: 在苹果手机...

tianma3798
39分钟前
1
0
访问日志不记录静态文件、切割和静态元素过期时间

11月16日任务 11.22 访问日志不记录静态文件 11.23 访问日志切割 11.24 静态元素过期时间 11.22、 访问日志不记录静态文件 网站大多元素为静态文件,如图片、css、js等,这些元素可以不用记录...

zgxlinux
45分钟前
1
0
爬虫教程」Python做一个简单爬虫,小白也能看懂的教程

俗话说“巧妇难为无米之炊”,除了传统的数据源,如历史年鉴,实验数据等,很难有更为简便快捷的方式获得数据,在目前互联网的飞速发展写,大量的数据可以通过网页直接采集,“网络爬虫”应运...

糖宝lsh
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部