文档章节

JDK容器学习之Queue:LinkedBlockingQueue

小灰灰Blog
 小灰灰Blog
发布于 2017/11/03 01:49
字数 1948
阅读 36
收藏 1

基于链表阻塞队列LinkedBlockingQueue

基于链表的无边界阻塞队列,常用与线程池创建中作为任务缓冲队列使用

I. 底层数据结构

先看一下内部定义,与 ArrayBlockingQueue做一下对比,顺带看下这两者的区别及不同的应用场景

/** 队列的容量, or Integer.MAX_VALUE if none */
private final int capacity;

/** 队列中实际的个数 */
private final AtomicInteger count = new AtomicInteger();

/**
 * 队列头,但其中没有有效数据,它的下一个才保存实际的数据
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * 队列尾,其内包含有效的数据
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** 出队的锁, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** 进队的锁, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();


static class Node<E> {
  // 存放在队列中的数据; 队列头的item为null
  E item;
  
  // 队列中,该节点的下一个节点,队列尾的next为null
  Node<E> next;

  Node(E x) { item = x; }
}

说明

  1. 底层结构为单向链表,其中队列头不包含有效数据;
  2. 队列长度有界,为初始化时指定的容量大小;没指定时,默认为int最大值
  3. count实时表示队列中元素的个数,采用原子进行+/-1
  4. 进队和出队是两个锁,也就是说出队和进队可以并发进行

对比下ArrayBlockingQueue,主要区别为两个地方

  1. LinkedBlockingQueue底层为链表;ArrayBlockingQueue底层为数组(貌似有点多余,命名上就可以看出)
  2. LinkedBlockingQueue出队和入队是两个锁,而ArrayBlockingQueue是一个锁进行控制;即前者出队和入队可以并发执行;而后者会出现锁的竞争

II. 阻塞实现原理

0. Prefer

分析阻塞原理之前,先通过注释解释下LinkedBlockingQueue的使用场景

  • 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
  • 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
  • 队列不支持空元素

1. 进队

public void put(E e) throws InterruptedException {
    // 队列中不能存在null
    if (e == null) throw new NullPointerException();
    
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 若队列已满,则等待`notFull(出队后,队列未满时).signal()`唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node); // 进队
        c = count.getAndIncrement(); // 计数+1,并获取队列的实际元素个数
        if (c + 1 < capacity) // 若进队后,队列依然没有满,则释放一个信号 (why?)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0) // 表示队列从空到有一个数据,唤醒因为队列为空被阻塞的线程
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}
// 唤醒阻塞的出队线程,注意使用姿势,Condition的使用必须放在对应的锁中间,否则会报错
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

进队逻辑:

  1. null不允许入队
  2. 加入队锁
  3. 判断队列是否已满,若是,则阻塞线程
  4. 待其他线程出队时被唤醒,将元素挂在队列尾
  5. 如果队列之前为空,此时入队成功之后,需要执行 notEmpty.singal(),唤醒因为队列空被阻塞的出队线程

2. 出队

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 如果队列为空,则阻塞,等待入队之后,被唤醒
        while (count.get() == 0) {     notEmpty.await(); }
        x = dequeue(); // 进队
        c = count.getAndDecrement();
        if (c > 1) // 如果队列依然非空,则唤醒其他因为队列为空被阻塞的线程
            notEmpty.signal();
    } finally { takeLock.unlock();}
    if (c == capacity) 
    // 原来队列为满的,此时出队一个后,正好非满,唤醒因为队列满被阻塞的线程
        signalNotFull();
    return x;
}
// 出队逻辑,实现逻辑是把出队Node节点设置为新的head,释放老的head节点 
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

// 唤醒阻塞的出队线程,注意使用姿势,Condition的使用必须放在对应的锁中间,否则会报错
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

出队逻辑

  1. 出队锁
  2. 判断队列是否非空,为空时阻塞出队线程
  3. 其他线程入队成功,唤醒因队列为空被阻塞的线程
  4. 若出队之前,队列为满的,则唤醒因为队列满无法入队而阻塞的线程

查看上面的源码时,还发现一个非常有意思的地方,出队成功之后,会判断如果之前的队列中元素的个数大于1(即出队之后,还有元素),会执行notEmpty.signal();,唤醒被阻塞的出队线程,为什么要这么干?

假设一种场景,一个空队列,两个线程(A,B)都执行出队,被阻塞;

此时线程C执行入队,入队完成,因为队列由空到非空,会唤醒一个被阻塞的出队线程(假设为A);

因为出队和入队是可以并发的,现在在线程A执行`c = count.getAndDecrement();`之前,若线程D又入队成功一个,因为此时队列非空,所以不会调用`signalNotEmpty`

现在如果线程A执行出队之后,获取到的c应该为2,如果不执行`notEmpty.signal();`,就会导致线程B一直被阻塞,显然不符合我们的预期

3. 其他方法

除了出队和入队的方法之外,还有几个有意思的方法,如队列中元素以数组形式输出,判断队列是否有元素,这两个操作,都会竞争出队和入队锁,确保在执行这个方法时,队列不会被其他线程修改

public boolean contains(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> p = head.next; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        fullyUnlock();
    }
}

public Object[] toArray() {
    fullyLock();
    try {
        int size = count.get();
        Object[] a = new Object[size];
        int k = 0;
        for (Node<E> p = head.next; p != null; p = p.next)
            a[k++] = p.item;
        return a;
    } finally {
        fullyUnlock();
    }
}

III. 经典case

链表阻塞队列的经典使用case,基本上用过线程池就会用到这个了,如jdk中自带的

// java.util.concurrent.Executors
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

IV. 对比小结

1. 底层结构

ArrayBlockingQueue : 底层存储结构为数组,直接将数据存入数组中

LinkedBlockingQueue : 底层存储结构为单向链表,会将数据封装到Node对象作为链表的节点,且链表头中不包含实际的元素信息

2. 锁的分离

ArrayBlockingQueue : 出队入队公用一把锁,即两者无法并发

LinkedBlockingQueue: 出队和入队各一把锁,因此出队和入队可并发执行

因此在线程池的创建中,一般是使用LinkedBlockingQueue,至少线程在进入等待队列中时,出队和进队不会相互阻塞,但是两者之间有关联

  • 出队时,若队列之前为满队列时,会唤醒因为队列满被阻塞的入队线程
  • 进队时,若队列之前为空队列时,会唤醒因为队列空被阻塞的出队线程

3. 出队和进队的操作不同

ArrayBlockingQueue : 是直接将对象插入或移除

LinkedBlockingQueue: 需要把枚举对象转换为Node<E>进行插入或移除,其中会将出队的Node节点作为新的队列头,返回并置空Node的item元素

4. 队列大小初始化

ArrayBlockingQueue : 必须指定队列的容量

LinkedBlockingQueue: 可以指定队列的容量,不指定时,容量为 Integer.MAX_VALUE

扫描关注,java分享

https://static.oschina.net/uploads/img/201710/13203703_6IVg.jpg

© 著作权归作者所有

小灰灰Blog

小灰灰Blog

粉丝 208
博文 237
码字总数 423468
作品 0
武汉
程序员
私信 提问
Java并发编程之JUC容器概述

今天开始学习JUC容器。JUC提供了用于多线程上下文中的Collection实现与高效的、可伸缩的、线程安全的非阻塞FIFO队列。参考JDK1.8,画出下图。 List JUC容器中List的实现只有CopyOnWriteArra...

潘威威的博客
2017/12/21
0
0
通俗易懂,JDK 并发容器总结

该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k)。地址:https://github.com/Snailclimb/JavaGuide. 一 JDK 提供的并发容器总...

snailclimb
2018/12/10
0
0
多线程学习(6)ThreadPoolExecutor 线程池学习-1

threadpool模型: 调用方通过调用api将任务,装进queue里,然后会有一个机制监事queue里有没有task,如果有task,就分配给某个worker去执行。workers代表线程池的话.worker就是某条线程了。 ...

太猪-YJ
04/14
11
0
阻塞队列_BlockingQueue

阻塞队列_BlockingQueue 1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,...

秋风醉了
2016/06/08
149
0
ThreadPoolExecutor使用和思考-线程池大小设置与BlockingQueue的三种

工作中多处接触到了ThreadPoolExecutor。趁着现在还算空,学习总结一下。 前记: jdk官方文档(javadoc)是学习的最好,最权威的参考。 文章分上中下。上篇中主要介绍ThreadPoolExecutor接受...

浮躁的码农
2015/07/28
635
0

没有更多内容

加载失败,请刷新页面

加载更多

一次看懂 Https 证书认证

TLS > 传输层安全性协定 TLS(Transport Layer Security),及其前身安全套接层 SSL(Secure Sockets Layer)是一种安全协议,目的是为网际网路通信,提供安全及数据完整性保障。 如图,TLS...

极客收藏夹
39分钟前
4
0
https证书买哪家好?有哪些供应商

在选购https证书前除了要了解类型外,还需要了解https证书供应商,毕竟不同的供应商,提供的产品质量与服务也是有差异的。今天小编就为大家讲讲https证书供应商方面的内容,希望各位会喜欢。...

安信证书
41分钟前
5
0
Zuul 配置

概述:zuul底层是基于servlet,是由一系列的filter链构成。 1、路由配置 a、单例serverId映射 zuul: routes: client-a: path: /client/** serviceId: client-a 意思是...

java框架开发者
59分钟前
3
0
zk中FinalRequestProcessor解析

是处理器最后一个环节 FinalRequestProcessor implements RequestProcessor 处理器链最后一个环节处理事务和非事务请求最后一个环节 构造器 public FinalRequestProcessor(ZooKeeperServer z...

writeademo
59分钟前
4
0
Axios 详解

首先祝广大程序猿们节日快乐! 一、axios简介 基于promise,用于浏览器和node.js的http客户端 二、特点 支持浏览器和 node.js 支持 promise 能拦截请求和响应 能转换请求和响应数据 能取消请求...

张兴华ZHero
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部