文档章节

聊聊Elasticsearch的SizeBlockingQueue

go4it
 go4it
发布于 06/01 12:08
字数 1017
阅读 10
收藏 1

本文主要研究一下Elasticsearch的SizeBlockingQueue

SizeBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java

public class SizeBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

    private final BlockingQueue<E> queue;
    private final int capacity;

    private final AtomicInteger size = new AtomicInteger();

    public SizeBlockingQueue(BlockingQueue<E> queue, int capacity) {
        assert capacity >= 0;
        this.queue = queue;
        this.capacity = capacity;
    }

    @Override
    public int size() {
        return size.get();
    }

    public int capacity() {
        return this.capacity;
    }

    @Override
    public Iterator<E> iterator() {
        final Iterator<E> it = queue.iterator();
        return new Iterator<E>() {
            E current;

            @Override
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override
            public E next() {
                current = it.next();
                return current;
            }

            @Override
            public void remove() {
                // note, we can't call #remove on the iterator because we need to know
                // if it was removed or not
                if (queue.remove(current)) {
                    size.decrementAndGet();
                }
            }
        };
    }

    @Override
    public E peek() {
        return queue.peek();
    }

    @Override
    public E poll() {
        E e = queue.poll();
        if (e != null) {
            size.decrementAndGet();
        }
        return e;
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = queue.poll(timeout, unit);
        if (e != null) {
            size.decrementAndGet();
        }
        return e;
    }

    @Override
    public boolean remove(Object o) {
        boolean v = queue.remove(o);
        if (v) {
            size.decrementAndGet();
        }
        return v;
    }

    /**
     * Forces adding an element to the queue, without doing size checks.
     */
    public void forcePut(E e) throws InterruptedException {
        size.incrementAndGet();
        try {
            queue.put(e);
        } catch (InterruptedException ie) {
            size.decrementAndGet();
            throw ie;
        }
    }


    @Override
    public boolean offer(E e) {
        while (true) {
            final int current = size.get();
            if (current >= capacity()) {
                return false;
            }
            if (size.compareAndSet(current, 1 + current)) {
                break;
            }
        }
        boolean offered = queue.offer(e);
        if (!offered) {
            size.decrementAndGet();
        }
        return offered;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        // note, not used in ThreadPoolExecutor
        throw new IllegalStateException("offer with timeout not allowed on size queue");
    }

    @Override
    public void put(E e) throws InterruptedException {
        // note, not used in ThreadPoolExecutor
        throw new IllegalStateException("put not allowed on size queue");
    }

    @Override
    public E take() throws InterruptedException {
        E e;
        try {
            e = queue.take();
            size.decrementAndGet();
        } catch (InterruptedException ie) {
            throw ie;
        }
        return e;
    }

    @Override
    public int remainingCapacity() {
        return capacity() - size.get();
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        int v = queue.drainTo(c);
        size.addAndGet(-v);
        return v;
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        int v = queue.drainTo(c, maxElements);
        size.addAndGet(-v);
        return v;
    }

    @Override
    public Object[] toArray() {
        return queue.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return (T[]) queue.toArray(a);
    }

    @Override
    public boolean contains(Object o) {
        return queue.contains(o);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return queue.containsAll(c);
    }
}
  • SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数
  • SizeBlockingQueue有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数
  • 其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException

ResizableBlockingQueue

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ResizableBlockingQueue.java

final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {

    private volatile int capacity;

    ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
        super(queue, initialCapacity);
        this.capacity = initialCapacity;
    }

    @Override
    public int capacity() {
        return this.capacity;
    }

    @Override
    public int remainingCapacity() {
        return Math.max(0, this.capacity());
    }

    /** Resize the limit for the queue, returning the new size limit */
    public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
        assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
        assert optimalCapacity >= 0 : "desired capacity cannot be negative";
        assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
        assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";

        if (optimalCapacity == capacity) {
            // Yahtzee!
            return this.capacity;
        }

        if (optimalCapacity > capacity + adjustmentAmount) {
            // adjust up
            final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else if (optimalCapacity < capacity - adjustmentAmount) {
            // adjust down
            final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
            this.capacity = newCapacity;
            return newCapacity;
        } else {
            return this.capacity;
        }
    }
}
  • ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity

EsExecutors

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

public class EsExecutors {
	//......

    public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
                                                ThreadFactory threadFactory, ThreadContext contextHolder) {
        BlockingQueue<Runnable> queue;
        if (queueCapacity < 0) {
            queue = ConcurrentCollections.newBlockingQueue();
        } else {
            queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
        }
        return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
            queue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
                                                         int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
                                                         ThreadFactory threadFactory, ThreadContext contextHolder) {
        if (initialQueueCapacity <= 0) {
            throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
                            initialQueueCapacity);
        }
        ResizableBlockingQueue<Runnable> queue =
                new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
        return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
                queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
                new EsAbortPolicy(), contextHolder);
    }

	//......
}
  • EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

小结

  • SizeBlockingQueue继承了AbstractQueue,同时实现了BlockingQueue接口;它的构造器要求输入blockingQueue及capacity参数;它有个AtomicInteger类型的size参数用于记录queue的大小,它在poll、remove、offer、take等方法都会维护这个size参数;其中offer方法会判断当前size是否大于等于capacity,如果大于等于则直接返回false;而put方法则直接抛出IllegalStateException
  • ResizableBlockingQueue继承了SizeBlockingQueue,它提供了一个线程安全的adjustCapacity方法,用于resize队列的capacity
  • EsExecutors的newFixed创建的是使用SizeBlockingQueue的EsThreadPoolExecutor,而newAutoQueueFixed创建的是使用ResizableBlockingQueuede的QueueResizingEsThreadPoolExecutor

doc

© 著作权归作者所有

go4it
粉丝 86
博文 1039
码字总数 994160
作品 0
深圳
私信 提问
聊聊Elasticsearch的EsThreadPoolExecutor

序 本文主要研究一下Elasticsearch的EsThreadPoolExecutor EsThreadPoolExecutor elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecuto......

go4it
06/02
0
0
聊聊Elasticsearch的FixedExecutorBuilder

序 本文主要研究一下Elasticsearch的FixedExecutorBuilder FixedExecutorBuilder elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/threadpool/FixedExecutorBuilder.java Fixed......

go4it
06/15
0
0
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
0
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
0
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

数据库管理哪家强?Devart VS Navicat 360°全方位对比解析

今天小编向大家推荐的是两个开发环节的主流数据库管理品牌,那么你知道这两款数据库管理软件品牌与 数据库引擎配套的管理软件有什么区别吗?小编这就360°全方位为您解答: ★ 品牌介绍 Deva...

FILA6666
11分钟前
0
0
Leetcode PHP题解--D107 453. Minimum Moves to Equal Array Elements

D107 453. Minimum Moves to Equal Array Elements 题目链接 453. Minimum Moves to Equal Array Elements 题目分析 给定一个数组,对数组中的N-1个数组每次加1,返回最少需要多少步才能使得...

skys215
13分钟前
0
0
Spring注解——同一接口有多个实现类,如何注入

https://blog.csdn.net/u010476994/article/details/80986435

Java搬砖工程师
17分钟前
0
0
java高并发(二)并发与高并发基本概念

并发 同时拥有两个或者多个线程,如果程序在单核处理器上运行,多个线程将交替地换入或者换出内存,这些线程是同时“存在”的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上...

Vincent-Duan
21分钟前
1
0
Fundebug:JavaScript插件支持错误采样

Fundebug的付费套餐主要是根据错误事件数制定的,这是因为每一个发送到我们服务器的事件,都会消耗一定的CPU、内存、磁盘以及带宽资源,尤其当错误事件数非常大时,会对我们的计算资源造成很...

Fundebug
22分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部