文档章节

java阻塞队列BlockingQueue

tsmyk0715
 tsmyk0715
发布于 07/19 21:36
字数 5393
阅读 12
收藏 0

相关文章

Lock锁源码分析

ConcurrentLinkedQueue源码分析

HashMap源码分析-jdk1.6和jdk1.8的区别

LinkedList源码分析

ArrayList源码分析

前言

java阻塞队列BlockingQueue就是在插入元素的时候和移除元素的时候,会阻塞操作。在插入的时候,如果此时队列已经满了,则会阻塞插入操作,直到队列变为可用;同样,在移除元素的时候,如果此时队列为空,是不能进行移除的,会一直阻塞到队列中有元素为止。java中,使用BlockingQueue 来定义阻塞队列的一系列操作,如插入元素的方法:add(),offer(),put(),移除元素的方法:take(),poll(),peek(),remove()等方法。下面看一下 BlockingQueue 中的方法:

public interface BlockingQueue<E> extends Queue<E> {
    // 添加元素,会用调用 offer() 方法添加
    boolean add(E e);
    // 添加元素
    boolean offer(E e);
    // 超时添加元素,如果在规定的时间内不能添加,之间返回
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    // 添加元素,可响应中断
    void put(E e) throws InterruptedException;
    // 移除元素
    E take() throws InterruptedException;
    // 移除元素
    E poll();
    // 超时移除元素
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    // 获取元素
    E peek();
    // 返回队列中还有多少容量
    int remainingCapacity();
    // 删除元素
    boolean remove(Object o);
    // 队列是否包含元素
    public boolean contains(Object o);
    // 把队列中元素转移到集合中
    int drainTo(Collection<? super E> c);
    // 把队列中元素转移到集合中,指定最大元素个数
    int drainTo(Collection<? super E> c, int maxElements);
}

在以上的接口定义中看到,添加元素有几个方法,获取元素也有几个方法,但是它们的效果是不同的;

add() 方法:会调用 offer() 方法给队列添加元素,如果队列满了,会抛出异常 IllegalStateException,添加失败

offer()方法:如果队列满了,在直接返回false,添加失败,还有个重载的超时方法,offer(E e, timeout, unit)

put()方法:如果队列满了,会阻塞,一直到队列可用为止,可响应中断

poll()方法:移除队头元素,并返回,如果队列为空,则返回 null,不会阻塞,还有个重载的超时方法poll(timeout, unit)

take()方法:移除队头元素,并返回,如果队列为空,则会阻塞,一直等待队列有值为止,可响应中断

peek()方法:返回队头元素,但是不删除,如果队列为空,则返回 null,不会阻塞

接下来看下 BlockingQueue 的类图,看下它有哪些实现类:

BlockingQueue 类图

它一共有7个子类,也就是说,在开发的过程中,根据业务需要的不同,可以采用不同的阻塞队列来实现我们需求。

下面以 LinkedBlockingQueue 来分析以下阻塞队列的实现过程:

LinkedBlockingQueue 源码分析

1.类的定义

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    // 链表节点
    static class Node<E> {
        // 节点中的元素
        E item;
        // 下一个节点
        Node<E> next;
        // 节点构造器
        Node(E x) { item = x; }
    }
    // 队列的容量,认为为 Integer.MAX_VALUE
    private final int capacity;
    // 队列中元素的个数
    private final AtomicInteger count = new AtomicInteger();
    // 头节点,头节点的元素为空,head.item == null
    transient Node<E> head;
    // 尾节点
    private transient Node<E> last;
    // 插入元素的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    // 队列不为空的条件,当插入一个元素的时候,此时队列肯定不为空,所以会唤醒消费者取获取元素
    private final Condition notEmpty = takeLock.newCondition();
    // 添加元素的锁
    private final ReentrantLock putLock = new ReentrantLock();
    // 队列未满的条件,如果消费者已经移除了一个元素,此时队列可用,会唤醒生产者添加元素
    private final Condition notFull = putLock.newCondition();
}

从上面类的定义可以看到,LinkedBlockingQueue 是以链表的形式来实现的,而链表节点的定义是它的一个内部类Node,而且该链表是一个单向链表。在初始化的时候,我们可以指定队列的容量,如果不指定的话,默认容量为 Integer.MAX_VALUE;而队列中元素的个数 count 被AtomicInteger 修饰,它是一个原子操作,可以在多线程环境下操作而不会出现问题;此外,LinkedBlockingQueue 是使用两把锁来实现并发访问的。

2.方法

接下来,看下该类的一些方法实现

(1).构造方法

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

在创建 LinkedBlockingQueue 实例的时候,会初始化队列的容量,还会构造一个初始链表,链表的头节点和尾节点都指向同一个节点,并且节点的元素为null,以后添加元素的时候,会更新尾节点,而头节点中的元素永远为null,构造成功后,初始链表如下所示:

(2).add()方法

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

可以看到,add()方法会调用 offer()方法来插入元素,如果插入元素失败,则抛出异常。

(3).offer()方法

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        // 此时队列中元素的个数
        final AtomicInteger count = this.count;
        // 如果此时队列满了,则直接返回false,
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 队列未满,插入
        Node<E> node = new Node<E>(e);
        // 获取插入锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) { // 队列未满,插入
                enqueue(node); // 插入
                // 队列元素个数加1
                c = count.getAndIncrement();
                // 如果队列还没有满,则会唤醒等待插入的线程,队列可用
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock(); // 解锁
        }
        // 如果添加元素成功,此时队列不为空,会唤醒等待take元素的线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    // 向队列的尾部添加元素
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

从以上代码可以看到,如果队列已满,则返回false,而不会抛出异常,且也不会阻塞。此外,还有一个重载的方法 offer(E e, long timeout, TimeUnit unit),表示如果在规定的时间内不能插入成功,则再返回。

过程如下:

(4).put()方法

    public void put(E e) throws InterruptedException {
        // 链表中不能存 null 元素,否则抛出空指针异常
        if (e == null) throw new NullPointerException();
        int c = -1;
        // 创建要插入的节点
        Node<E> node = new Node<E>(e);
        // 获取锁
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 如果队列满了,则会阻塞当前线程,直到队列可用,由 take() 唤醒
            while (count.get() == capacity) {
                notFull.await();
            }
            // 队列未满,入队
            enqueue(node);
            c = count.getAndIncrement();
            // 如果队列未满,则唤醒等待插入的线程,队列可用
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock(); // 解锁
        }
        if (c == 0) // 如果队列不为空,则唤醒等待获取元素的线程
            signalNotEmpty();
    }

put方法在插入元素的时候,如果队列满了的话,它会阻塞当前线程,直到队列可用,当其他线程 take() 移除元素的时候,会唤醒该线程队列可用。

(5).poll()方法

    public E poll() {
        // 队列此时元素的个数
        final AtomicInteger count = this.count;
        // 如果队列为空,则返回 Null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        // 获取锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 如果队列有元素,则出队
            if (count.get() > 0) {
                x = dequeue(); // 出队,并删除第一个元素
                // 队列元素个数减1
                c = count.getAndDecrement();
                // 如果队列还有元素,则会唤醒等待获取元素的线程
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock(); // 解锁
        }
        if (c == capacity)
            signalNotFull();
        // 返回出队元素
        return x;
    }

poll()方法会返回队列的第一个元素,并删除;如果队列为空,则返回null,并不会阻塞当前线程;出队的逻辑调用的是 dequeue()方法,此外,它还有一个重载的方法,poll(long timeout, TimeUnit unit),如果队列为空,则会等待一段时间。

(6).dequeue()方法

    private E dequeue() {
        // 头节点,元素为空
        Node<E> h = head;
        // 第一个节点
        Node<E> first = h.next;
        // 把头节点的下一个节点指向它自己,把它与链表断开
        h.next = h; // help GC
        // 第一个节点设置为它节点
        head = first;
        // 返回第一个节点的元素
        E x = first.item;
        // 把第一个节点的元素设为null,即头节点元素为null
        first.item = null;
        return x;
    }

dequeue() 方法是用来删除队列的第一个节点,并把第一个节点中的元素返回,它的过程如下:

初始队列:

poll()第一个节点item1:

(7).take()方法


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 获取队列元素个数
        final AtomicInteger count = this.count;
        // 获取锁
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 如果队列为空,此时不能获取元素,会阻塞到队列可用
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 队列可用,出队并删除
            x = dequeue();
            c = count.getAndDecrement();
            // 如果队列中还有元素,则唤醒等待获取的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock(); // 解锁
        }
        if (c == capacity)
            signalNotFull(); // 队列不为空,唤醒等待插入的线程
        return x;
    }

可以看到,take() 方法相比于 poll() 方法来说,如果队列为空,则会一直阻塞到队列可用为止。

(8).peek()方法

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }

peek() 方法相比于 take()和poll()方法来说,它只是返回队列的第一个元素,但是并不删除,调用 peek() 方法可以多次获取第一个元素;此外,如果队列为空,它直接返回null, 而不是阻塞。

以上就是LinkedBlockingQueue类常用方法的实现原理,原始大部分阻塞队列的实现原来,底层基本上都是使用 Lock 来实现并发控制的。下面写个例子测试以下上面的方法:

例子

测试 add() 方法,如果队列满了,则抛出异常:

    public void testAdd(){
        LinkedBlockingQueue queue = new LinkedBlockingQueue(3);
        boolean flag = queue.add(1);
        if(flag){
            System.out.println("1添加成功");
        }
        flag = queue.add(2);
        if(flag){
            System.out.println("2添加成功");
        }
        flag = queue.add(3);
        if(flag){
            System.out.println("3添加成功");
        }
        flag = queue.add(4);
        if(flag){
            System.out.println("4添加成功");
        }
    }

///////////////////////////////

Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at blockQueue.TestArrayBlockingQueue.testAdd(TestArrayBlockingQueue.java:34)
	at blockQueue.TestArrayBlockingQueue.main(TestArrayBlockingQueue.java:17)
1添加成功
2添加成功
3添加成功

测试 offer() 方法,如队列满了,则直接返回 false,不会抛出异常

    public void testOffer(){
        LinkedBlockingQueue queue1 = new LinkedBlockingQueue(10);
        for (int i = 0; i < 12; i++) {
            new Thread(() -> {
                boolean isFull = queue1.offer(++num);
                if (isFull){
                    System.out.println("入队成功...");
                }else{
                    System.out.println("队列已满...");
                }
            }).start();
        }
    }

///////////////////////////////////

入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
入队成功...
队列已满...
队列已满...

测试 put() 方法,如队列满了,则会阻塞直到队列可用:

    public void testPut(){
        LinkedBlockingQueue queue1 = new LinkedBlockingQueue(10);
        for (int i = 0; i < 12; i++) {
            new Thread(() -> {
                try {
                    int tmp = ++num;
                    queue1.put(tmp);
                    System.out.println(tmp + " " + "入队成功..." + "队列剩余空间:" + queue1.remainingCapacity());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

可以看到队列已经满了,程序还没停到,一直在阻塞。

测试 poll() 方法,如果队列为空,则之间返回 Null,不会阻塞

    public void testPoll(){
        LinkedBlockingQueue queue1 = new LinkedBlockingQueue(3);
        for (int i = 1; i <= 3; i++) {
            queue1.add(i);
        }
        System.out.println("从队列中取的值:" +  queue1.poll());
        System.out.println("从队列中取的值:" +  queue1.poll());
        System.out.println("从队列中取的值:" +  queue1.poll());
        // 队列为空,则再去取值,返回 null
        System.out.println("从队列中取的值:" +  queue1.poll());
    }

//////////////////////////////

从队列中取的值:1
从队列中取的值:2
从队列中取的值:3
从队列中取的值:null

测试 take() 方法,如果队列为空,则会阻塞直到队列可用:

    public void testTake() throws InterruptedException {
        LinkedBlockingQueue queue1 = new LinkedBlockingQueue(3);
        for (int i = 1; i <= 3; i++) {
            queue1.add(i);
        }
        System.out.println("从队列中取的值:" +  queue1.take());
        System.out.println("从队列中取的值:" +  queue1.take());
        System.out.println("从队列中取的值:" +  queue1.take());
        // 队列为空,阻塞
        System.out.println("从队列中取的值:" +  queue1.take());
    }

可以看到,队列为空,则程序一直阻塞。

以上就是 LinkedBlockingQueue 的一个实现原理,接下来看一下 阻塞队列 BlockingQueue 其他的一个实现类。

BlockingQueue 其他实现类

除了 LinkedBlockingQueue 类,BlockingQueue 还有其他的实现类,这些实现类在某些情况下非常的有用,可以根据需求不同,采用不同的阻塞队列。

ArrayBlockingQueue

PriorityBlockingQueue

DelayQueue

SynchronousQueue

LinkedTransferQueue

LinkedBlockingDeque

ArrayBlockingQueue

ArrayBlockingQueue 是一个有界的阻塞队列,内部使用一个数组来存放元素,也是通过锁来实现并发访问的,也是按照 FIFO 的原则对元素进行排列。


    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }    

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

通过上面的构造方法可以看到,是通过  ReentrantLock 来实现并发的控制,而 ReentrantLock 提供了公平锁与非公平锁的实现(具体参考Lock锁源码分析),所以 ArrayBlockingQueue 也提供了公平与非公平的访问方式,默认情况下,不保证线程线程公平的访问队列。

所谓的公平是指阻塞的线程,按照阻塞的先后顺序访问队列,非公平是指当队列可用的时候,阻塞的线程都可以有争夺线程访问的资格,有可能先阻塞的线程最后才能访问队列。

使用方式和LinkedBlockingQueue差不多。

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列(最大长度为Integer.MAX_VALUE - 8,内部使用数组来存放元素)。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。

 private transient Comparator<? super E> comparator;

PriorityBlockingQueue 内部有一个比较器,可以用来对元素进行排序,如果为null,则按照自然顺序排列。

    public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

从上面的构造方法可以看到采用的是非公平的访问方式。

接下来看看它是如何保证元素有序的,offer() 方法:

offer()方法

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // 如果元素大小超过队列容量,则数组进行扩容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap); // 数组扩容,
        try {
            // 比较器
            Comparator<? super E> cmp = comparator;
            // 如果比较器为空,则使用自然顺序排序
            if (cmp == null)
                siftUpComparable(n, e, array);
            // 否则使用给定的比较器进行排序
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

上面添加元素的方法中,主要有三个步骤,一个是判断是否需要扩容,而是使用自然顺序排序,三是使用给定的比较器进行排序;在初始化链表的实话,链表大小并不是最大的值 Integer.MAX_VALUE - 8,如果不指定初始容量的话,初始容量为 11。所有需要进行判断添加的元素是否达到最大容量;扩容的方法为 tryGrow(array, cap) 。

tryGrow(array, cap) 方法如下 :

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // 先释放锁,没有必要锁住整个方法,只是在复制数据的时候才需要加锁
        Object[] newArray = null;
        // 也相当于一个锁
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 数组新容量
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                // 如果新容量超过最大值,则抛出异常
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 使用新容量创建新的数组
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // 如果此时创建失败,则让出CPU,让其他线程创建
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 再次获取锁
        lock.lock();
        // 复制数据 
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

对添加的元素进行排序,就是说 PriorityBlockingQueue 在添加元素的时候,就已经保证了元素的有序。

使用自然顺序排序: siftUpComparable(int k, T x, Object[] array),在 k 的位置插入 x:

    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        // 循环
        while (k > 0) {
            // 父节点的位置
            int parent = (k - 1) >>> 1;
            // 父节点中的元素
            Object e = array[parent];
            // 如果要添加的元素大于等于父元素,则直接返回,在当前位置添加元素
            if (key.compareTo((T) e) >= 0)
                break;
            // 如果添加的元素小于父元素,则把父元素移到当前的位置,
            array[k] = e;
            // 再次循环判断父元素的父元素
            k = parent;
        }
        array[k] = key;
    }

要理解上述父方法,先来看看元素在数组的一个排序规则,PriorityBlockingQueue 内部使用数组来存放元素,而元素在数组中是按照二叉树的形式来存放的,如果创建一个初始容量的实例,则初始容量为11,下面是它的一个排列方式:

如果在 8 的位置插入元素,则会先获取 8 位置的父节点的位置 3 :

int parent = (k - 1) >>> 1;
Object e = array[parent];

之后,判断当前位置 8 的元素和父位置 3 的元素进行比较,如果当前位置 8 的元素 >= 父位置 3 的元素,则退出循环,直接把新加的元素插入到 8 的位置即可:

if (key.compareTo((T) e) >= 0)
     break;

如果当前位置 8 的元素 < 父位置 3 的元素,则把父位置 3 中的元素放到 8 位置上来,之后再次循环,再次比较 3 位置中的元素和 3 位置的父位置 1 进行比较,之后再次比较 位置 1 和 位置 1 的父元素 0 进行比较,直到找到合适的位置存放添加的元素。

使用比较器添加元素的逻辑和上面是一致的,通过上面的过程,就可以把一个元素添加到队列中,并且保证了队列的有序。

例子

/**
 * @ Author:tsmyk0715
 * @ Date:Created in 上午 11:22 2018/7/21 0021
 */
public class TestPriorityBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue queue = new PriorityBlockingQueue();
        queue.offer(1);
        queue.offer(3);
        queue.offer(2);
        queue.offer(5);
        queue.offer(4);
        // 自然顺序 : 1 2 3 4 5 
        for (int i = 0; i < 5; i++) { 
            System.out.print(queue.take() + " "); 
        }
        System.out.println();
        queue = new PriorityBlockingQueue(11, new Comparator<Integer>(){
            @Override
            public int compare(Integer o1, Integer o2) {
                return o2.compareTo(o1);
            }
        });
        queue.offer(1);
        queue.offer(3);
        queue.offer(2);
        queue.offer(5);
        queue.offer(4);
        // 指定比较器顺序 : 5 4 3 2 1 
        for (int i = 0; i < 5; i++) {
            System.out.print(queue.take() + " ");
        }
    }
}

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列,和 PriorityBlockingQueue一样, 内部使用 PriorityQueue 来实现;队列中的元素必须实现 Delay 接口,在创建队列元素的时候,可以指定多久才能从队列中获取当前元素,只有在延时期满了之后,才能从队列中获取该元素。

DelayQueue 可以运用在以下应用场景:

1.缓存:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

2.定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。如订单的定时取消

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素,也就是说,队列中只有一个元素存在。此外,它海支持公平和非公平的访问策略,默认为非公平访问策略,可以用在线程间传递数据使用。

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

例子:

/**
 * @ Author:tsmyk0715
 * @ Date:Created in 下午 3:25 2018/7/21 0021
 */
public class TestSynchronousQueue {
    public static void main(String[] args) {
        SynchronousQueue queue = new SynchronousQueue();
        new Thread(() -> {
            try {
                for(;;)
                {
                    int data = (int) (Math.random() * 100);
                    System.out.println("put = " + data);
                    queue.put(data);
                    Thread.sleep(1000L);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
               for(;;){
                   int data = (int) queue.take();
                   System.out.println("take = " + data);
               }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

由此可以看到,获取元素之后,才能存放下一个元素。

LinkedTransferQueue

LinkedTransferQueue 是也是一个无界的队列,使用的是单向链表来表示,类图如下:

根据类图可以看到,LinkedTransferQueue 除了实现其他阻塞队列都实现的接口外,还实现了 TransferQueue 接口,TransferQueue接口方法如下:

    boolean tryTransfer(E e);

    void transfer(E e) throws InterruptedException;

transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返

tryTransfer方法
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,可以在队列和队列插入元素和获取元素。

例子

/**
 * @ Author:tsmyk0715
 * @ Date:Created in 下午 4:09 2018/7/21 0021
 */
public class TestLinkedBlockingDeque {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingDeque deque = new LinkedBlockingDeque();
        deque.addFirst(1);
        deque.addFirst(2);
        deque.addLast(3);

        int data = (int) deque.takeFirst();
        System.out.println(data); // 2
        data = (int) deque.takeLast();
        System.out.println(data); // 3
        data = (int) deque.take();
        System.out.println(data); // 1
    }
}

以上就是java阻塞队列的实现,可以根据不同需要使用不同的队列。


 

© 著作权归作者所有

共有 人打赏支持
tsmyk0715
粉丝 34
博文 65
码字总数 143556
作品 0
成都
程序员
私信 提问
Java并发编程利用 Condition 实现阻塞队列

什么是阻塞队列 BlockingQueue 队列是一种数据结构,它的特点是先进先出(First In First Out),它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。队列在多线程应用中,...

行走在旅途中
2017/11/07
0
0
使用Java的BlockingQueue实现生产者-消费者

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。 BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类 1、ArrayBlockingQueue:一个由数组支持的有界阻...

mrliuze
2015/08/11
0
0
Java 并发工具包 java.util.concurrent 用户指南

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html。 本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友...

pior
2015/10/26
0
0
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞...

凯文加内特
2014/07/31
0
0
并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞...

F风向标F
2013/12/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

php获取客户端IP

php获取客户端IP 首先先阅读关于IP真实性安全的文章:如何正確的取得使用者 IP? 「任何從客戶端取得的資料都是不可信任的!」 HTTP_CLIENT_IP头是有的,但未成标准,不一定服务器都实现。 ...

DrChenXX
昨天
0
0
. The valid characters are defined in RFC 7230 and RFC 问题

通过这里的回答,我们可以知道: Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。 具体来说,就是添加了些规则去限制HTTP头的规范性 参考这里 具体来说: org.apache.tom...

west_coast
昨天
1
0
刷leetcode第704题-二分查找

今天双十一买的算法书到货了,路上刷到有人说的这个题,借(chao)鉴(xi)一下别人的思路,这个是C++标准库里面的经典方法,思路精巧,优雅好品味 int search(int* nums, int numsSize, in...

锟斤拷烫烫烫
昨天
3
0
【分享实录】BANCOR算法详解及代码实现

1 活动基本信息 1)主题:【区块链技术工坊22期】BANCOR算法详解及代码实现 2)议题: BANCOR算法的特点和优劣势 BANCOR算法和举例 如何加入BANCOR.NETWORK交易所 如何开发自己的BANCOR去中心...

HiBlock
昨天
2
0
微信小程序(2)

开始看微信小程序的教程了。刚刚看完官方教程的视图层部分。这里摘录一些自己认为的部分关键点。 1.直接修改数值无法重新渲染,需要使用setData()方法; 2.列表渲染中:wx:key用于保持项目在...

MKjy
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部