Java并发(四)BlockingQueue的使用
Java并发(四)BlockingQueue的使用
摆渡者 发表于2年前
Java并发(四)BlockingQueue的使用
  • 发表于 2年前
  • 阅读 384
  • 收藏 5
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

摘要: 本文使用BlockingQueue来演示如何不使用wait()和notifyAll()来使多线程协同工作(消除使用wait()和notifyAll()时对类的耦合性)。本文摘自《Java编程思想(第4版)》中的21.5.4节:生产者-消费者与队列。

    wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作的问题,即每次交互时都需要握手。在许多情况下,你可以瞄准更高的抽象级别,使用同步队列来解决任务协作的问题。同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无届队列,你还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

    如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时回复消费者任务。阻塞队列可以解决非常大的问题,而其方式与wait()和notifyAll()相比,则要简单并可靠许多。

    考虑下面这个BlockingQueue的示例,有一台机器具有三个任务:一个制作吐司,一个给吐司抹黄油,还有一个给吐司涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序:

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

class Toast {
    /**
     * 吐司的状态:
     * DRY: 烘干的
     * BUTTERED: 涂了黄油的
     * JAMMED: 涂了果酱的
     * <p>吐司的状态只能由DRY->BUTTERED->JAMMED转变
     */
    public enum Status {DRY, BUTTERED, JAMMED}
    private Status status = Status.DRY;//默认状态为DRY
    private final int id;
    public Toast(int id) { this.id =  id;}
    public void butter() {status = Status.BUTTERED;}
    public void jam() {status = Status.JAMMED;}
    public Status getStatus() {return status;}
    public int getId() {return id;}
    public String toString() {
        return "Toast id: " + id + ", status: " + status;
    }
}

@SuppressWarnings("serial")
class ToastQueue extends LinkedBlockingQueue<Toast> {}

/**
 * 生产吐司的任务。
 */
class Toaster implements Runnable {
    private ToastQueue toastQueue;
    private int count = 0;
    private Random random = new Random(47);
    public Toaster(ToastQueue queue) {
        this.toastQueue = queue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                TimeUnit.MILLISECONDS.sleep(300 + random.nextInt(500));
                //生产一片吐司,这些吐司是有序的
                Toast toast = new Toast(count++);
                System.out.println(toast);
                //放到toastQueue中
                toastQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Toaster interrupted.");
        }
        System.out.println("Toaster off.");
    }
}

/**
 * 涂黄油的任务。
 */
class Butterer implements Runnable {
    private ToastQueue dryQueue;
    private ToastQueue butteredQueue;
    public Butterer(ToastQueue dryQueue, ToastQueue butteredQueue) {
        this.dryQueue = dryQueue;
        this.butteredQueue = butteredQueue;
    }
    
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = dryQueue.take();
                toast.butter();
                System.out.println(toast);
                butteredQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Butterer interrupted.");
        }
        System.out.println("Butterer off.");
        
    }
}

/**
 * 涂果酱的任务。
 */
class Jammer implements Runnable {
    private ToastQueue butteredQueue;
    private ToastQueue finishedQueue;
    public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
        this.butteredQueue = butteredQueue;
    }
    
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = butteredQueue.take();
                toast.jam();
                System.out.println(toast);
                finishedQueue.put(toast);
            }
        } catch (InterruptedException e) {
            System.out.println("Jammer interrupted.");
        }
        System.out.println("Jammer off.");
    }
}

/**
 * 吃吐司的人,消费者。
 */
class Eater implements Runnable {
    private ToastQueue finishedQueue;
    private int count = 0;
    public Eater (ToastQueue finishedQueue) {
        this.finishedQueue = finishedQueue;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                //在取得下一个吐司之前会一直阻塞
                Toast toast = finishedQueue.take();
                //验证取得的吐司是有序的,而且状态是JAMMED的
                if (toast.getId() != count++ || 
                        toast.getStatus() != Toast.Status.JAMMED) {
                    System.out.println("Error -> " + toast);
                    System.exit(-1);
                } else {
                    //吃掉吐司
                    System.out.println(toast + "->Eaten");
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Eater interrupted.");
        }
        System.out.println("Eater off.");
    }
}

public class ToastOMatic {
    public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue();
        ToastQueue butteredQueue = new ToastQueue();
        ToastQueue finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}

执行结果(可能的结果):

Toast id: 0, status: DRY
Toast id: 0, status: BUTTERED
Toast id: 0, status: JAMMED
Toast id: 0, status: JAMMED->Eaten
Toast id: 1, status: DRY
Toast id: 1, status: BUTTERED
Toast id: 1, status: JAMMED
Toast id: 1, status: JAMMED->Eaten
Toast id: 2, status: DRY
Toast id: 2, status: BUTTERED
Toast id: 2, status: JAMMED
Toast id: 2, status: JAMMED->Eaten
Toast id: 3, status: DRY
Toast id: 3, status: BUTTERED
Toast id: 3, status: JAMMED
Toast id: 3, status: JAMMED->Eaten
Toast id: 4, status: DRY
Toast id: 4, status: BUTTERED
Toast id: 4, status: JAMMED
Toast id: 4, status: JAMMED->Eaten
Toast id: 5, status: DRY
Toast id: 5, status: BUTTERED
Toast id: 5, status: JAMMED
Toast id: 5, status: JAMMED->Eaten
Toast id: 6, status: DRY
Toast id: 6, status: BUTTERED
Toast id: 6, status: JAMMED
Toast id: 6, status: JAMMED->Eaten
Toast id: 7, status: DRY
Toast id: 7, status: BUTTERED
Toast id: 7, status: JAMMED
Toast id: 7, status: JAMMED->Eaten
Eater interrupted.
Eater off.
Butterer interrupted.
Toaster interrupted.
Toaster off.
Jammer interrupted.
Jammer off.
Butterer off.

    Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock对象或者synchronized关键字的同步),因为同步已经由队列和系统的设计隐式的管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动的挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。

标签: 并发 线程 协作
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 296
博文 169
码字总数 205794
评论 (0)
×
摆渡者
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: