生产者-消费者问题

原创
2017/09/18 20:40
阅读数 55

话不多说,直接上代码。

其中有几个注意点:

 1. 始终应该使用wait循环模式调用wait方法,即在while循环中调用wait方法(参见《Effective Java 2nd Edition》第69条);

2. wait方法和notify方法都只应由作为此对象监视器所有者的线程来调用,线程成为对象监视器的所有者的途径有三:

  • 通过执行此对象的同步实例方法。
  • 通过执行在此对象上进行同步的 synchronized 语句的正文。
  • 对于 Class 类型的对象,可以通过执行该类的同步静态方法。

参见Java API文档关于Object中notify和wait的方法说明.  简单地说,就是形如

synchronized(xx) { xx.wait(); } 这样的形式.

3. 使用notifyAll 而不是 notify. 这是由于notify方法的唤醒线程具有不确定性,并且只能唤醒单一等待线程,因此有种说法是,你总是应该使用notifyAll. (同样地,可以参见《Effective Java 2nd Edition》第69条)

import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        Vector vector = new Vector();
        int size = 5;
        Thread producer = new Thread(new Producer(vector, size), "producer");
        Thread consumer = new Thread(new Consumer(vector, size), "consumer");

        producer.start();
        consumer.start();
    }
}


class Producer implements Runnable {

    private final Vector shareQueue;
    private final int size;

    public Producer(Vector shareQueue, int size) {
        this.shareQueue = shareQueue;
        this.size = size;
    }

    @Override
    public void run() {
        for (int i = 0; i < size + 5; i++) {
            System.out.println("produced "+ i);
            try {
                producer(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void producer(Object obj) throws InterruptedException {

            while (shareQueue.size() == size) {
                synchronized (shareQueue) {
                    System.out.println("the queue is full "+
                            Thread.currentThread().getName() + " is waiting, size:" + shareQueue.size());
                    shareQueue.wait();
                }
            }

            synchronized (shareQueue) {
                shareQueue.add(obj);
                //add the element and notify the consumer
                shareQueue.notifyAll();
            }
    }
}


class Consumer implements Runnable {

    private final Vector shareQueue;
    private final int size;

    public Consumer(Vector shareQueue, int size) {
        this.shareQueue = shareQueue;
        this.size = size;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("consuming..." +  consumer());
                TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private int consumer() throws InterruptedException {

        while (shareQueue.isEmpty()) {

            synchronized (shareQueue) {
                System.out.println("the queue is empty..." + Thread.currentThread().getName()
                        + " is waiting, size:" + shareQueue.size());
                shareQueue.wait();
            }
        }


        synchronized (shareQueue) {
            //to notify the waiting producer
            shareQueue.notifyAll();
            return (Integer) shareQueue.remove(0);
        }
    }
}

控制台的输出结果如下:

produced 0
consuming...0
produced 1
produced 2
produced 3
produced 4
produced 5
produced 6
the queue is full producer is waiting, size:5
consuming...1
produced 7
the queue is full producer is waiting, size:5
produced 8
the queue is full producer is waiting, size:5
consuming...2
produced 9
the queue is full producer is waiting, size:5
consuming...3
consuming...4
consuming...5
consuming...6
consuming...7
consuming...8
consuming...9
the queue is empty...consumer is waiting, size:0

当然,我们可以用BlockingQueue来简化上述代码。

public class Main {

    public static void main (String[] args) {
        int size = 5;
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(size);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }


}


class Producer implements  Runnable {
    private final BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("produced:" + i);

            try {
                producer(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void producer(int i) throws InterruptedException {

        queue.put(i);
    }
}

class Consumer implements Runnable {

    private final BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while ( true ) {
            try {
                System.out.println("consume:" + consumer());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private Integer consumer() throws InterruptedException {
        return (Integer) queue.take();
    }
}

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部