Java并发编程初级篇(十六):Lock+Condition实现生产者消费者问题
Java并发编程初级篇(十六):Lock+Condition实现生产者消费者问题
阿拉德大陆的魔法师 发表于11个月前
Java并发编程初级篇(十六):Lock+Condition实现生产者消费者问题
  • 发表于 11个月前
  • 阅读 18
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

摘要: 使用锁实现阻塞式生产者消费者问题

之前我们在“Java并发编程初级篇(十二):使用wait和notify生产者消费者问题”,已经使用Java提供的synchronized关键字和wait(),notify(),notifyAll()方法实现过来生产者消费者问题。Java API还为我们提供了锁的解决方案。

使用锁解决阻塞要用到Condition,它是通过Lock.newCondition()来获得的。就像wait()和notify()必须在synchronized块内一样,Condition.await()和Condition.singialAll()方法也必须在Lock.lock()和Lock.unlock()内执行。

代码示例:

首先我们实现一个数据缓冲区,缓冲区中定义了maxSize变量来代表缓冲区大小,LinkedList来模拟缓冲区。然后添加一把锁,并用这把锁来新建两个Condition:producer(控制生产者挂起和唤醒)和consumer(控制消费者挂起和唤醒)。当生产者发现缓冲区满的情况下调用producer.await()挂起,等待消费者消费数据后调用producer.singialAll()方法来唤醒,并重新判断缓冲区状态。当消费者发现缓冲区空的情况下调用consumer.await()挂起,等待生产者向缓冲区中放入数据并调用consumer.singialAll()方法唤醒并重新判断缓冲区状态。

public class DataBuffer {
    private int maxSize;
    private LinkedList<Date> buffer;

    private Lock lock;
    private Condition producer;
    private Condition consumer;

    public DataBuffer(int maxSize) {
        this.maxSize = maxSize;
        buffer = new LinkedList<Date>();

        lock = new ReentrantLock();

        producer = lock.newCondition();
        consumer = lock.newCondition();
    }

    public void put() {
        try {
            lock.lock();

            while (buffer.size() == this.maxSize) {
                producer.await();
            }

            buffer.add(new Date());

            System.out.printf("%s: Add one %s. Buffer size is %d.\n",
                    Thread.currentThread().getName(),
                    buffer.peekLast(),
                    buffer.size());

            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        try {
            lock.lock();

            while (buffer.size() == 0) {
                consumer.await();
            }

            System.out.printf("%s: Get one %s. Buffer size is %d.\n",
                    Thread.currentThread().getName(),
                    buffer.pollFirst(),
                    buffer.size());

            producer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

创建两个线程用于模拟生产者和消费者

public class Producer implements Runnable {
    private DataBuffer buffer;

    public Producer(DataBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.put();
    }
}

public class Consumer implements Runnable {
    private DataBuffer buffer;

    public Consumer(DataBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        buffer.get();
    }
}

创建两个任务类线程启动10个生产者和消费者,生产者生产速度>消费者消费速度。

public class ProducerTask implements Runnable {
    private DataBuffer buffer;

    public ProducerTask(DataBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            new Thread(new Producer(buffer), "Producer-" + i).start();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class ConsumerTask implements Runnable {
    private DataBuffer buffer;

    public ConsumerTask(DataBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            new Thread(new Consumer(buffer), "Consumer-" + i).start();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

主方法类中启动两个任务类线程,模拟生产者和消费者,因为生产者速度快,最终会导致生产者处于阻塞状态。

public class Main {
    public static void main(String[] args) {
        DataBuffer buffer = new DataBuffer(5);

        new Thread(new ProducerTask(buffer), "ProducerTask").start();
        new Thread(new ConsumerTask(buffer), "ConsumerTask").start();
    }
}

日志,因为缓冲区大小为5,所以当缓冲区内容达到5个之后,生产者进入阻塞状态,并等待消费者消费数据后被唤醒并向缓冲区中插入数据。

Producer-0: Add one Fri Nov 25 15:36:21 CST 2016. Buffer size is 1.
Consumer-0: Get one Fri Nov 25 15:36:21 CST 2016. Buffer size is 0.
Producer-1: Add one Fri Nov 25 15:36:22 CST 2016. Buffer size is 1.
Producer-2: Add one Fri Nov 25 15:36:23 CST 2016. Buffer size is 2.
Producer-3: Add one Fri Nov 25 15:36:24 CST 2016. Buffer size is 3.
Producer-4: Add one Fri Nov 25 15:36:25 CST 2016. Buffer size is 4.
Consumer-1: Get one Fri Nov 25 15:36:22 CST 2016. Buffer size is 3.
Producer-5: Add one Fri Nov 25 15:36:26 CST 2016. Buffer size is 4.
Producer-6: Add one Fri Nov 25 15:36:27 CST 2016. Buffer size is 5.
Consumer-2: Get one Fri Nov 25 15:36:23 CST 2016. Buffer size is 4.
Producer-7: Add one Fri Nov 25 15:36:31 CST 2016. Buffer size is 5.
Consumer-3: Get one Fri Nov 25 15:36:24 CST 2016. Buffer size is 4.
Producer-8: Add one Fri Nov 25 15:36:36 CST 2016. Buffer size is 5.
Consumer-4: Get one Fri Nov 25 15:36:25 CST 2016. Buffer size is 4.
Producer-9: Add one Fri Nov 25 15:36:41 CST 2016. Buffer size is 5.
Consumer-5: Get one Fri Nov 25 15:36:26 CST 2016. Buffer size is 4.
Consumer-6: Get one Fri Nov 25 15:36:27 CST 2016. Buffer size is 3.
Consumer-7: Get one Fri Nov 25 15:36:31 CST 2016. Buffer size is 2.
Consumer-8: Get one Fri Nov 25 15:36:36 CST 2016. Buffer size is 1.
Consumer-9: Get one Fri Nov 25 15:36:41 CST 2016. Buffer size is 0.

 

共有 人打赏支持
粉丝 21
博文 91
码字总数 83019
×
阿拉德大陆的魔法师
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: