文档章节

ArrayBlockingQueue与LinkedBlockingQueue

go4it
 go4it
发布于 2017/09/10 13:52
字数 1268
阅读 5
收藏 0

本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。

对比

queue阻塞与否是否有界线程安全保障适用场景注意事项
ArrayBlockingQueue阻塞有界一把全局锁生产消费模型,平衡两边处理速度用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue阻塞可配置存取采用2把锁生产消费模型,平衡两边处理速度无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
ConcurrentLinkedQueue非阻塞无界CAS对全局的集合进行操作的场景size() 是要遍历一遍集合,慎用

内存方面

  • ArrayBlockingQueue 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
  • LinkedBlockingQueue 用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。

有界无界

  • ArrayBlockingQueue 有界,适合已知最大存储容量的场景
  • LinkedBlockingQueue 可有界可以无界

吞吐量

LinkedBlockingQueue在大多数并发的场景下吞吐量比ArrayBlockingQueue,但是性能不稳定。

Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

输入图片说明

测试结果表明,LinkedBlockingQueue的可伸缩性要高于ArrayBlockingQueue。初看起来,这个结果有些奇怪:链表队列在每次插入元素时,都必须分配一个链表节点对象,这似乎比基于数组的队列执行了更多的工作。然而,虽然它拥有更好的内存分配与GC等开销,但与基于数组的队列相比,链表队列的put和take等方法支持并发性更高的访问,因为一些优化后的链接队列算法能将队列头节点的更新操作与尾节点的更新操作分离开来。由于内存分配操作通常是线程本地的,因此如果算法能通过多执行一些内存分配操作来降低竞争程度,那么这种算法通常具有更高的可伸缩性。

并发方面

  • ArrayBlockingQueue 采用一把锁,两个condition
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

此外还支持公平锁

/**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  • LinkedBlockingQueue 头尾各1把锁
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

应用实例

Executors

里头用了LinkedBlockingQueue

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

使用LinkedBlockingQueue实现logger

public class BungeeLogger extends Logger {

    private final ColouredWriter writer;
    private final Formatter formatter = new ConciseFormatter();
//    private final LogDispatcher dispatcher = new LogDispatcher(this);

    private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();

    volatile boolean running = true;

    Thread recvThread = new Thread(){
        @Override
        public void run() {
            while (!isInterrupted() && running) {
                LogRecord record;
                try {
                    record = queue.take();
                } catch (InterruptedException ex) {
                    continue;
                }

                doLog(record);
            }
            for (LogRecord record : queue) {
                doLog(record);
            }
        }
    };

    public BungeeLogger() throws IOException {
        super("BungeeCord", null);
        this.writer = new ColouredWriter(new ConsoleReader());

        try {
            FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);
            handler.setFormatter(formatter);
            addHandler(handler);
        } catch (IOException ex) {
            System.err.println("Could not register logger!");
            ex.printStackTrace();
        }
        recvThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                running = false;
            }
        });
    }

    @Override
    public void log(LogRecord record) {
        if (running) {
            queue.add(record);
        }
    }

    void doLog(LogRecord record) {
        super.log(record);
        writer.print(formatter.format(record));
    }
}

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 53
博文 705
码字总数 506188
作品 0
深圳
Java集合--阻塞队列(LinkedBlockingQueue)

1. LinkedBlockingQueue 上篇中,说到了ArrayBlockingQueue阻塞队列。在ArrayBlockingQueue中,底层使用了数组结构来实现。 那么,提到数组了就不得不提及链表。作为两对成双成对的老冤家,链...

贾博岩
2017/12/10
0
0
并发十三:并发容器Queue实现分析

Queue J.U.C中分为阻塞队里和非阻塞队列。 阻塞队列在满时进行入列操作会被阻塞,空时进行出列操作会被阻塞,很适合并发编程中最常见的生产者-消费者模式。 非阻塞队使用CAS无锁算法避免锁竞...

wangjie2016
04/14
0
0
BlockingQueue详解

本例再次实现前面介绍的篮子程序,不过这个篮子中最多能放得苹果数不是1,可以随意指定。当篮子满时,生产者进入等待状态,当篮子空时,消费者等待。 BlockingQueue定义的常用方法如下: ad...

wow863597
2016/05/09
53
0
集合框架 Queue---BlockingQueue详解

本例介绍一个特殊的队列:BlockingQueue,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是...

长平狐
2012/11/28
587
0
JDK容器学习之Queue:LinkedBlockingQueue

基于链表阻塞队列LinkedBlockingQueue 基于链表的无边界阻塞队列,常用与线程池创建中作为任务缓冲队列使用 I. 底层数据结构 先看一下内部定义,与 做一下对比,顺带看下这两者的区别及不同的...

小灰灰Blog
2017/11/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

八大包装类型的equals方法

先看其中一个源码 结论:八大包装类型的equals方法都是先判断类型是否相同,不相同则是false,相同则判断值是否相等 注意:包装类型不能直接用==来等值比较,否则编译报错,但是数值的基本类型...

xuklc
36分钟前
1
0
NoSQL , Memcached介绍

什么是NoSQL 非关系型数据库就是NoSQL,关系型数据库代表MySQL 对于关系型数据库来说,是需要把数据存储到库、表、行、字段里,查询的时候根据条件一行一行地去匹配,当量非常大的时候就很耗...

TaoXu
昨天
0
0
890. Find and Replace Pattern - LeetCode

Question 890. Find and Replace Pattern Solution 题目大意:从字符串数组中找到类型匹配的如xyy,xxx 思路: 举例:words = ["abc","deq","mee","aqq","dkd","ccc"], pattern = "abb"abc ......

yysue
昨天
0
0
Linux | Redis

写在前面的话 常言道,不作笔记不读书。在下是深有体会啊,所以,跟我一起做下本节的笔记吧,或许多年以后,你一定会感谢今天的你。 安装 在官网的下载页 Redis Download 直接写了在Linux的安...

冯文议
昨天
1
0
NoSQL-memcached

NoSQL介绍 NoSQL叫非关系型数据库。而关系型数据库代表有MySQL。对于关系型数据库来说,是需要把数据存储到库、表、行、字段里,查询的时候根据条件一行一行地去匹配,当量非常大的时候就很...

ln97
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部