文档章节

聊聊Elasticsearch的ReleasableLock

go4it
 go4it
发布于 06/13 22:19
字数 1391
阅读 13
收藏 0

本文主要研究一下Elasticsearch的ReleasableLock

ReleasableLock

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

public class ReleasableLock implements Releasable {
    private final Lock lock;


    // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
    private final ThreadLocal<Integer> holdingThreads;

    public ReleasableLock(Lock lock) {
        this.lock = lock;
        if (Assertions.ENABLED) {
            holdingThreads = new ThreadLocal<>();
        } else {
            holdingThreads = null;
        }
    }

    @Override
    public void close() {
        lock.unlock();
        assert removeCurrentThread();
    }


    public ReleasableLock acquire() throws EngineException {
        lock.lock();
        assert addCurrentThread();
        return this;
    }

    private boolean addCurrentThread() {
        final Integer current = holdingThreads.get();
        holdingThreads.set(current == null ? 1 : current + 1);
        return true;
    }

    private boolean removeCurrentThread() {
        final Integer count = holdingThreads.get();
        assert count != null && count > 0;
        if (count == 1) {
            holdingThreads.remove();
        } else {
            holdingThreads.set(count - 1);
        }
        return true;
    }

    public boolean isHeldByCurrentThread() {
        if (holdingThreads == null) {
            throw new UnsupportedOperationException("asserts must be enabled");
        }
        final Integer count = holdingThreads.get();
        return count != null && count > 0;
    }
}
  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数

ReleasableLockTests

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java

public class ReleasableLockTests extends ESTestCase {

    /**
     * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
     * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
     * not its second entrance.
     *
     * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
     * @throws InterruptedException   if awaiting on the synchronization barrier is interrupted
     */
    public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {
        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
        final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

        final int numberOfThreads = scaledRandomIntBetween(1, 32);
        final int iterations = scaledRandomIntBetween(1, 32);
        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
        final List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfThreads; i++) {
            final Thread thread = new Thread(() -> {
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
                for (int j = 0; j < iterations; j++) {
                    if (randomBoolean()) {
                        acquire(readLock, writeLock);
                    } else {
                        acquire(writeLock, readLock);
                    }
                }
                try {
                    barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            threads.add(thread);
            thread.start();
        }

        barrier.await();
        barrier.await();
        for (final Thread thread : threads) {
            thread.join();
        }
    }

    private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {
        try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
            try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {
                assertTrue(lockToAcquire.isHeldByCurrentThread());
                assertFalse(otherLock.isHeldByCurrentThread());
            }
            // previously there was a bug here and this would return false
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
        }
        assertFalse(lockToAcquire.isHeldByCurrentThread());
        assertFalse(otherLock.isHeldByCurrentThread());
    }

}
  • ReleasableLockTests使用多线程随机执行acquire,该方法断言lockToAcquire被当前线程持有,而otherLock不被当前线程持有

Cache.CacheSegment

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java

    private static class CacheSegment<K, V> {
        // read/write lock protecting mutations to the segment
        ReadWriteLock segmentLock = new ReentrantReadWriteLock();

        ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
        ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());

        Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();

        SegmentStats segmentStats = new SegmentStats();

        /**
         * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
         * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
         *
         * @param key       the key of the entry to get from the cache
         * @param now       the access time of this entry
         * @param isExpired test if the entry is expired
         * @param onExpiration a callback if the entry associated to the key is expired
         * @return the entry if there was one, otherwise null
         */
        Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = readLock.acquire()) {
                future = map.get(key);
            }
            if (future != null) {
                Entry<K, V> entry;
                try {
                    entry = future.get();
                } catch (ExecutionException e) {
                    assert future.isCompletedExceptionally();
                    segmentStats.miss();
                    return null;
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
                if (isExpired.test(entry)) {
                    segmentStats.miss();
                    onExpiration.accept(entry);
                    return null;
                } else {
                    segmentStats.hit();
                    entry.accessTime = now;
                    return entry;
                }
            } else {
                segmentStats.miss();
                return null;
            }
        }

        /**
         * put an entry into the segment
         *
         * @param key   the key of the entry to add to the cache
         * @param value the value of the entry to add to the cache
         * @param now   the access time of this entry
         * @return a tuple of the new entry and the existing entry, if there was one otherwise null
         */
        Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {
            Entry<K, V> entry = new Entry<>(key, value, now);
            Entry<K, V> existing = null;
            try (ReleasableLock ignored = writeLock.acquire()) {
                try {
                    CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
                    if (future != null) {
                        existing = future.handle((ok, ex) -> {
                            if (ok != null) {
                                return ok;
                            } else {
                                return null;
                            }
                        }).get();
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
            return Tuple.tuple(entry, existing);
        }

        /**
         * remove an entry from the segment
         *
         * @param key       the key of the entry to remove from the cache
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.remove(key);
            }
            if (future != null) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        /**
         * remove an entry from the segment iff the future is done and the value is equal to the
         * expected value
         *
         * @param key the key of the entry to remove from the cache
         * @param value the value expected to be associated with the key
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            boolean removed = false;
            try (ReleasableLock ignored = writeLock.acquire()) {
                future = map.get(key);
                try {
                    if (future != null) {
                        if (future.isDone()) {
                            Entry<K, V> entry = future.get();
                            if (Objects.equals(value, entry.value)) {
                                removed = map.remove(key, future);
                            }
                        }
                    }
                } catch (ExecutionException | InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }

            if (future != null && removed) {
                segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        private static class SegmentStats {
            private final LongAdder hits = new LongAdder();
            private final LongAdder misses = new LongAdder();
            private final LongAdder evictions = new LongAdder();

            void hit() {
                hits.increment();
            }

            void miss() {
                misses.increment();
            }

            void eviction() {
                evictions.increment();
            }
        }
    }
  • CacheSegment使用ReentrantReadWriteLock的readLock及writeLock创建了两个ReleasableLock,一个为readLock,一个为writeLock;由于ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

小结

  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数

ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1098
码字总数 1036409
作品 0
深圳
私信 提问
聊聊springboot elasticsearch autoconfigure

序 本文主要研究一下springboot elasticsearch autoconfigure ElasticsearchAutoConfiguration spring-boot-autoconfigure-2.1.4.RELEASE-sources.jar!/org/springframework/boot/autoconfi......

go4it
04/17
59
0
聊聊Elasticsearch的Releasables

序 本文主要研究一下Elasticsearch的Releasables Releasable elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/lease/Releasable.java Releasable继承了java.io.Closeab......

go4it
06/14
23
0
聊聊Elasticsearch的ConcurrentMapLong

序 本文主要研究一下Elasticsearch的ConcurrentMapLong ConcurrentMapLong elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ConcurrentMapLong.java Co......

go4it
06/03
8
0
CentOS7 部署 ElasticSearch 集群

环境 主机名 IP 操作系统 ES 版本 es227 192.168.1.227 CentOS7.5 6.5.4 es228 192.168.1.228 CentOS7.5 6.5.4 es229 192.168.1.229 CentOS7.5 6.5.4 下载 elasticsearch-6.5.4.tar.gz --- 各......

俊赛潘安-才比管乐
2018/12/27
465
0
聊聊Elasticsearch的SizeBlockingQueue

序 本文主要研究一下Elasticsearch的SizeBlockingQueue SizeBlockingQueue elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java Si......

go4it
06/01
17
0

没有更多内容

加载失败,请刷新页面

加载更多

好程序员Java学习路线分享MyBatis之线程优化

  好程序员Java学习路线分享MyBatis之线程优化,我们的项目存在大量用户同时访问的情况,那么就会出现大量线程并发访问数据库,这样会带来线程同步问题,本章我们将讨论MyBatis的线程同步问...

好程序员官方
26分钟前
6
0
IDEA 自定义方法注解模板

IDEA 自定义方法注解模板 1、使用效果 /*** 计算交易费用* @Author wangjiafang* @Date 2019/9/11* @param feeComputeVo* @return*/@PostMapping("/v1/fee_compute")public ApiResp......

小白的成长
26分钟前
6
0
转:进程 线程 协程 管程 纤程 概念对比理解

引言 不知道是不是我自己本身就有那么一丝丝的密集恐惧,把这么一大堆看起来很相似很相关的概念放在一起,看起来是有点麻,捋一捋感觉舒服多了。 相关概念 任务、作业(Job,Task,Schedule)...

xiaomin0322
37分钟前
6
0
前端数组转化成字符串

val=val.join(","); 转化后:“1,2,3”

郭周园
40分钟前
4
0
Spring Boot Admin配置详解

Client端配置 参数 默认值 说明 spring.boot.admin.client.enabled true 是否启用springbootAdmin客户端 spring.boot.admin.client.url 要注册的server端的url地址。如果要同时在多个server端...

兜兜毛毛
40分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部