生产者消费者可以使用对象的wait、notify、notifyAll方法来实现。 这三个方法使用是必须在获取到对象锁之后才可以使用。
简单介绍方法:
- wait方法 当一个线程调用一个共享变量的wait方法时,改调用线程会被阻塞挂起直到以下两个条件才返回: 1)其他线程调用了该共享对象的notify或者notifyAll方法 2)其他线程调用了该线程的interrupt方法,该线程抛出InterruptedException异常 wait使用如下:
sychronized(obj){
obj.wait();
}
- notify方法 一个线程调用共享对象的notify方法后,会唤醒一个在该共享变量上调用wait系列方法后被挂起的线程,如果阻塞队列上有多个线程,至于唤醒哪个是随机的。
- notifyAll方法 会唤醒所有在该共享变量上由于调用wait系列方法而被挂起的线程。
生产者消费者案例
生产者:
package com.pimee.thread.waitandnotify;
import java.util.Random;
import java.util.concurrent.BlockingDeque;
/**
* 生产者
*/
public class ProducerTask extends Thread {
public final static int MAX_SIZE = 100;
private BlockingDeque<Integer> queue;
public ProducerTask(BlockingDeque<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
synchronized (queue) {
// 当消费队满了,则等待队列空闲
while (queue.size() == MAX_SIZE) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Producer "+Thread.currentThread().getName() + "," + queue.size());
// 空闲则生成元素,并通知消费者线程
queue.add(new Random().nextInt(100));
queue.notifyAll();
}
}
}
消费者:
package com.pimee.thread.waitandnotify;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 消费者
*/
public class ConsumerTask extends Thread {
private BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
public ConsumerTask(BlockingDeque<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
synchronized (queue){
// 当消费队列为空,则等待队列生成
while (queue.size() == 0){
try {
// 挂起当前线程,并释放通过同步块获取的queue上的锁,让生产者线程可以获取该锁,将生产元素放入队列
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Integer result = queue.take();
System.out.println(Thread.currentThread().getName() + " result:" + result + ", size:" + queue.size());
queue.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
测试类:
package com.pimee.thread.waitandnotify;
import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 生产者-消费者 有序生产、消费
*/
public class WaitAndNotifyTest {
public static void main(String[] args) throws InterruptedException {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
for (int i = 0; i < 100; i++) {
ProducerTask producerTask = new ProducerTask(queue);
producerTask.start();
producerTask.join();
}
System.out.println("queue size is " + queue.size());
for (int i = 0; i < 100; i++) {
ConsumerTask consumerTask = new ConsumerTask(queue);
consumerTask.start();
consumerTask.join();
}
}
}