Java可阻塞队列的两种实现方式 (传统wait/notify和jdk1.5以后的lock)

原创
2015/03/10 23:29
阅读数 1.3K

在Java中,对于Lock和Condition可以理解为对传统的synchronized和wait/notify机制的替代。

wait/notify有个限制,调用wait/notify的线程必须持有对象的锁。

This method should only be called by a thread that is the owner of this object's monitor. See the notify method for a description of the ways in which a thread can become the owner of a monitor.

Throws:

IllegalMonitorStateException - if the current thread is not the owner of this object's monitor.

通常使用wait/notify的代码是这个样子的:

synchronized (obj) {
         while (<condition does not hold>)
             obj.wait();
         ... // Perform action appropriate to condition
     }

在Condition接口的javadoc中,有一个经典的Condition例子,用Condition实现了一个可阻塞队列。这里仿照javadoc简单实现了一个可阻塞队列。为了简单,没有进行try/catch,同时加入了一些注释。

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[2]; // 阻塞队列
int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
    System.out.println("进入put");
    lock.lock();
    System.out.println("put lock 锁住");
    try {
        while (count == items.length) { // 如果队列满了,notFull就一直等待
            System.out.println("put notFull 等待");
            notFull.await(); // 调用await的意思取反,及not notFull -> Full
        }
        items[putptr] = x; // 终于可以插入队列
        if (++putptr == items.length)
            putptr = 0; // 如果下标到达数组边界,循环下标置为0
        ++count;
        System.out.println("put notEmpty 唤醒");
        notEmpty.signal(); // 唤醒notEmpty
    } finally {
        System.out.println("put lock 解锁");
        lock.unlock();
    }
}

public Object take() throws InterruptedException {
    lock.lock();
    System.out.println("take lock 锁住");
    try {
        while (count == 0) {
            System.out.println("take notEmpty 等待");
            notEmpty.await();
        }
        Object x = items[takeptr];
        if (++takeptr == items.length)
            takeptr = 0;
        --count;
        System.out.println("take notFull 唤醒");
        notFull.signal();
        return x;
    } finally {
        lock.unlock();
        System.out.println("take lock 解锁");
    }
}

public static void main(String[] args) throws InterruptedException {
    final BoundedBuffer bb = new BoundedBuffer();
    System.out.println(Thread.currentThread()+","+bb);

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread()+","+bb);
                bb.put("xx");
                bb.put("yy");
                bb.put("zz");
                                    bb.put("zz");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
    bb.take();
}
}

如果不使用JUC,大概是这样的:

public class BoundedBuffer_Synchronized {
private Object[] items = new Object[2];
private Object notEmpty = new Object();
private Object notFull = new Object();
int count,putidx,takeidx;

public  void put(Object obj) throws InterruptedException{
    synchronized(notFull){
        while(count == items.length){
            notFull.wait();
        }
    }
    items[putidx] = obj;
    if(++putidx == items.length){
        putidx = 0;
    }
    count ++;
    synchronized (notEmpty) {
        notEmpty.notify();
    }
}
public Object take() throws InterruptedException{
    synchronized(notEmpty){
        while(count == 0){ // 啥也没有呢 取啥
            notEmpty.wait();
        }
    }
    Object x = items[takeidx];
    System.out.println("取第"+takeidx+"个元素"+x);
    if(++takeidx == items.length){
        takeidx = 0; 
    }
    count --;
    synchronized (notFull) {
        notFull.notify();
    }
    return x;
}
public static void main(String[] args) throws InterruptedException {
    final BoundedBuffer_Synchronized bb = new BoundedBuffer_Synchronized();
    System.out.println(Thread.currentThread()+","+bb);

    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread()+","+bb);
                bb.put("xx");
                bb.put("yy");
                bb.put("zz");
                bb.put("zz");
                bb.put("zz");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }).start();
    bb.take();
    bb.take();
}
}

从功能上来讲,两者实现了可阻塞队列的基本业务需求。Condition是配合Lock使用的,而wait/notify是配合synchronized使用的。比较两种实现方式,其实就是比较Lock和synchronized两种同步机制的区别。关于这方面,可以参考Java 理论与实践: JDK 5.0 中更灵活、更具可伸缩性的锁定机制


展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部