多线程之生产者消费者(Sychronized、wait、notify)

原创
2021/11/19 16:32
阅读数 60

生产者消费者可以使用对象的wait、notify、notifyAll方法来实现。 这三个方法使用是必须在获取到对象锁之后才可以使用。

简单介绍方法:

  1. wait方法 当一个线程调用一个共享变量的wait方法时,改调用线程会被阻塞挂起直到以下两个条件才返回: 1)其他线程调用了该共享对象的notify或者notifyAll方法 2)其他线程调用了该线程的interrupt方法,该线程抛出InterruptedException异常 wait使用如下:
sychronized(obj){
	obj.wait();
}
  1. notify方法 一个线程调用共享对象的notify方法后,会唤醒一个在该共享变量上调用wait系列方法后被挂起的线程,如果阻塞队列上有多个线程,至于唤醒哪个是随机的。
  2. notifyAll方法 会唤醒所有在该共享变量上由于调用wait系列方法而被挂起的线程。

生产者消费者案例

生产者:

package com.pimee.thread.waitandnotify;
import java.util.Random;
import java.util.concurrent.BlockingDeque;
/**
 * 生产者
 */
public class ProducerTask extends Thread {
    public final static int MAX_SIZE = 100;
    private BlockingDeque<Integer> queue;

    public ProducerTask(BlockingDeque<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        synchronized (queue) {
            // 当消费队满了,则等待队列空闲
            while (queue.size() == MAX_SIZE) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("Producer "+Thread.currentThread().getName() + "," + queue.size());
            // 空闲则生成元素,并通知消费者线程
            queue.add(new Random().nextInt(100));
            queue.notifyAll();
        }
    }
}

消费者:

package com.pimee.thread.waitandnotify;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * 消费者
 */
public class ConsumerTask extends Thread {
    private BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
    public ConsumerTask(BlockingDeque<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        synchronized (queue){
            // 当消费队列为空,则等待队列生成
            while (queue.size() == 0){
                try {
                    // 挂起当前线程,并释放通过同步块获取的queue上的锁,让生产者线程可以获取该锁,将生产元素放入队列
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Integer result = queue.take();
                System.out.println(Thread.currentThread().getName() + " result:" + result + ", size:" + queue.size());
                queue.notifyAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试类:

package com.pimee.thread.waitandnotify;

import java.util.Random;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * 生产者-消费者 有序生产、消费
 */
public class WaitAndNotifyTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
        for (int i = 0; i < 100; i++) {
            ProducerTask producerTask = new ProducerTask(queue);
            producerTask.start();
            producerTask.join();
        }

        System.out.println("queue size is " + queue.size());
        for (int i = 0; i < 100; i++) {
            ConsumerTask consumerTask = new ConsumerTask(queue);
            consumerTask.start();
            consumerTask.join();
        }
    }
}
展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部