文档章节

聊聊resilience4j的bulkhead

go4it
 go4it
发布于 07/13 10:49
字数 1169
阅读 1
收藏 0
点赞 0
评论 0

本文主要研究一下resilience4j的bulkhead

Bulkhead

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/Bulkhead.java

/**
 *  A Bulkhead instance is thread-safe can be used to decorate multiple requests.
 *
 * A {@link Bulkhead} represent an entity limiting the amount of parallel operations. It does not assume nor does it mandate usage
 * of any particular concurrency and/or io model. These details are left for the client to manage. This bulkhead, depending on the
 * underlying concurrency/io model can be used to shed load, and, where it makes sense, limit resource use (i.e. limit amount of
 * threads/actors involved in a particular flow, etc).
 *
 * In order to execute an operation protected by this bulkhead, a permission must be obtained by calling {@link Bulkhead#isCallPermitted()}
 * If the bulkhead is full, no additional operations will be permitted to execute until space is available.
 *
 * Once the operation is complete, regardless of the result, client needs to call {@link Bulkhead#onComplete()} in order to maintain
 * integrity of internal bulkhead state.
 *
 */
public interface Bulkhead {

    /**
     * Dynamic bulkhead configuration change.
     * NOTE! New `maxWaitTime` duration won't affect threads that are currently waiting for permission.
     * @param newConfig new BulkheadConfig
     */
    void changeConfig(BulkheadConfig newConfig);

    /**
     * Attempts to acquire a permit, which allows an call to be executed.
     *
     * @return boolean whether a call should be executed
     */
    boolean isCallPermitted();

    /**
     * Records a completed call.
     */
    void onComplete();

    /**
     * Returns the name of this bulkhead.
     *
     * @return the name of this bulkhead
     */
    String getName();

    /**
     * Returns the BulkheadConfig of this Bulkhead.
     *
     * @return bulkhead config
     */
    BulkheadConfig getBulkheadConfig();

    /**
     * Get the Metrics of this Bulkhead.
     *
     * @return the Metrics of this Bulkhead
     */
    Metrics getMetrics();

    /**
     * Returns an EventPublisher which subscribes to the reactive stream of BulkheadEvent and
     * can be used to register event consumers.
     *
     * @return an EventPublisher
     */
    EventPublisher getEventPublisher();

    //......

    /**
     * Returns a callable which is decorated by a bulkhead.
     *
     * @param bulkhead the bulkhead
     * @param callable the original Callable
     * @param <T> the result type of callable
     *
     * @return a supplier which is decorated by a Bulkhead.
     */
    static <T> Callable<T> decorateCallable(Bulkhead bulkhead, Callable<T> callable){
        return () -> {
            BulkheadUtils.isCallPermitted(bulkhead);
            try {
                return callable.call();
            }
            finally {
                bulkhead.onComplete();
            }
        };
    }

    /**
     * Returns a supplier which is decorated by a bulkhead.
     *
     * @param bulkhead the bulkhead
     * @param supplier the original supplier
     * @param <T> the type of results supplied by this supplier
     *
     * @return a supplier which is decorated by a Bulkhead.
     */
    static <T> Supplier<T> decorateSupplier(Bulkhead bulkhead, Supplier<T> supplier){
        return () -> {
            BulkheadUtils.isCallPermitted(bulkhead);
            try {
                return supplier.get();
            }
            finally {
                bulkhead.onComplete();
            }
        };
    }

    interface Metrics {


        /**
         * Returns the number of parallel executions this bulkhead can support at this point in time.
         *
         * @return remaining bulkhead depth
         */
        int getAvailableConcurrentCalls();
    }

    /**
     * An EventPublisher which can be used to register event consumers.
     */
    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<BulkheadEvent> {

        EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> eventConsumer);

        EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> eventConsumer);

        EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> eventConsumer);
    }

    //......
}
  • 这个接口定义了isCallPermitted以及onComplete方法
  • 之后还定义了许多decorate开头的方法,主要是在调用之前先执行BulkheadUtils.isCallPermitted(bulkhead),然后在执行完成调用bulkhead.onComplete();
  • decorate的方法有decorateCheckedSupplier、decorateCompletionStage、decorateCheckedRunnable、decorateCallable、decorateSupplier、decorateConsumer、decorateCheckedConsumer、decorateRunnable、decorateFunction、decorateCheckedFunction。
  • 另外还定义了Metrics接口及EventPublisher接口。

BulkheadUtils.isCallPermitted

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/utils/BulkheadUtils.java

public final class BulkheadUtils {

    public static void isCallPermitted(Bulkhead bulkhead) {
        if(!bulkhead.isCallPermitted()) {
            throw new BulkheadFullException(String.format("Bulkhead '%s' is full", bulkhead.getName()));
        }
    }
}

通过bulkhead.isCallPermitted()进行判断,不通过则抛出BulkheadFullException

SemaphoreBulkhead

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java

/**
 * A Bulkhead implementation based on a semaphore.
 */
public class SemaphoreBulkhead implements Bulkhead {

    private final String name;
    private final Semaphore semaphore;
    private final Object configChangesLock = new Object();
    private volatile BulkheadConfig config;
    private final BulkheadMetrics metrics;
    private final BulkheadEventProcessor eventProcessor;

    /**
     * Creates a bulkhead using a configuration supplied
     *
     * @param name           the name of this bulkhead
     * @param bulkheadConfig custom bulkhead configuration
     */
    public SemaphoreBulkhead(String name, BulkheadConfig bulkheadConfig) {
        this.name = name;
        this.config = bulkheadConfig != null ? bulkheadConfig
                : BulkheadConfig.ofDefaults();
        // init semaphore
        this.semaphore = new Semaphore(this.config.getMaxConcurrentCalls(), true);

        this.metrics = new BulkheadMetrics();
        this.eventProcessor = new BulkheadEventProcessor();
    }

    /**
     * Creates a bulkhead with a default config.
     *
     * @param name the name of this bulkhead
     */
    public SemaphoreBulkhead(String name) {
        this(name, BulkheadConfig.ofDefaults());
    }

    /**
     * Create a bulkhead using a configuration supplier
     *
     * @param name           the name of this bulkhead
     * @param configSupplier BulkheadConfig supplier
     */
    public SemaphoreBulkhead(String name, Supplier<BulkheadConfig> configSupplier) {
        this(name, configSupplier.get());
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void changeConfig(final BulkheadConfig newConfig) {
        synchronized (configChangesLock) {
            int delta =  newConfig.getMaxConcurrentCalls() - config.getMaxConcurrentCalls();
            if (delta < 0) {
                semaphore.acquireUninterruptibly(-delta);
            } else if (delta > 0) {
                semaphore.release(delta);
            }
            config = newConfig;
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public boolean isCallPermitted() {

        boolean callPermitted = tryEnterBulkhead();

        publishBulkheadEvent(
                () -> callPermitted ? new BulkheadOnCallPermittedEvent(name)
                        : new BulkheadOnCallRejectedEvent(name)
        );

        return callPermitted;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void onComplete() {
        semaphore.release();
        publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name));
    }

    boolean tryEnterBulkhead() {

        boolean callPermitted = false;
        long timeout = config.getMaxWaitTime();

        if (timeout == 0) {
            callPermitted = semaphore.tryAcquire();
        } else {
            try {
                callPermitted = semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException ex) {
                callPermitted = false;
            }
        }
        return callPermitted;
    }

    private void publishBulkheadEvent(Supplier<BulkheadEvent> eventSupplier) {
        if (eventProcessor.hasConsumers()) {
            eventProcessor.consumeEvent(eventSupplier.get());
        }
    }

    private final class BulkheadMetrics implements Metrics {
        private BulkheadMetrics() {
        }

        @Override
        public int getAvailableConcurrentCalls() {
            return semaphore.availablePermits();
        }
    }

    //......
}
  • SemaphoreBulkhead是使用信号量实现的Bulkhead
  • Semaphore的大小为BulkheadConfig的maxConcurrentCalls
  • isCallPermitted方法会调用tryEnterBulkhead方法,然后发布BulkheadOnCallPermittedEvent事件
  • tryEnterBulkhead方法主要是对semaphore进行tryAcquire,如果配置的maxWaitTime不为0,则按指定时间timeout获取
  • onComplete方法主要是释放信号量,然后发布一个BulkheadOnCallFinishedEvent事件
  • 使用publishBulkheadEvent发布事件,是委托给eventProcessor.consumeEvent处理,这个processor是BulkheadEventProcessor
  • BulkheadMetrics重写了getAvailableConcurrentCalls接口,返回的是semaphore.availablePermits()

BulkheadEventProcessor

resilience4j-bulkhead-0.13.0-sources.jar!/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java

    private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> implements EventPublisher, EventConsumer<BulkheadEvent> {

        @Override
        public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
            registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
            registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
            return this;
        }

        @Override
        public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
            registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
            return this;
        }

        @Override
        public void consumeEvent(BulkheadEvent event) {
            super.processEvent(event);
        }
    }
  • BulkheadEventProcessor继承了EventProcessor,实现了EventPublisher及EventConsumer接口

小结

  • resilience4j的bulkhead,本质上是对方法进行分并发调用的控制,SemaphoreBulkhead采用的是信号量的实现,信号量大小为maxConcurrentCalls,执行之前获取下信号量,获取不到抛出异常,获取到的话,在执行之后释放信号量
  • Bulkhead定义了一系列decorate开头的静态方法来对callable、runnable、supplier等进行包装,植入对信号量的获取及释放逻辑。

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 50
博文 671
码字总数 468409
作品 0
深圳
resilience4j小试牛刀

序 本文主要研究下resilience4j的基本功能 maven CircuitBreaker CircuitBreaker主要是实现针对接口异常的断路统计以及断路处理 Timelimiter 主要是实现超时的控制 Bulkhead Bulkhead目前来看...

go4it
07/09
0
0
聊聊resilience4j的Retry

序 本文主要研究一下resilience4j的Retry Retry resilience4j-retry-0.13.0-sources.jar!/io/github/resilience4j/retry/Retry.java 这个类定义了一些工厂方法,最后new的是RetryImpl 还定义...

go4it
07/14
0
0
聊聊resilience4j的CircuitBreakerStateMachine

序 本文主要研究一下resilience4j的CircuitBreakerStateMachine CircuitBreakerStateMachine resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/int......

go4it
07/12
0
0
聊聊resilience4j的CircuitBreakerConfig

序 本文主要研究一下resilience4j的CircuitBreakerConfig CircuitBreakerConfig resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreakerC......

go4it
07/10
0
0
聊聊resilience4j的fallback

序 本文主要研究一下resilience4j的fallback 使用实例 Try vavr-0.9.2-sources.jar!/io/vavr/control/Try.java 这个Try继承了Value接口 另外就是提供了一些静态工厂方法,ofSupplier方法会触...

go4it
07/15
0
0
聊聊resilience4j的CircuitBreaker

序 本文主要研究一下resilience4j的CircuitBreaker CircuitBreaker resilience4j-circuitbreaker-0.13.0-sources.jar!/io/github/resilience4j/circuitbreaker/CircuitBreaker.java 这里重点......

go4it
07/11
0
0
使用.NetCore 控制台演示 熔断 降级(polly)

1、熔断降级的概念:     熔断:我这里有一根长度一米的钢铁,钢铁的熔点1000度(假设),现在我想用力把这根钢铁折弯,但是人的力有限达不到折弯的点,然后我使用火给钢铁加热,每隔一段...

乐途
07/12
0
0
聊聊并发系列_Index

聊聊并发系列 聊聊并发(一)深入分析Volatile的实现原理 聊聊并发(二)Java SE1.6中的Synchronized 聊聊并发(三)Java线程池的分析和使用 聊聊并发(四)深入分析ConcurrentHashMap 聊聊并...

陶邦仁
2016/01/04
450
0
聊聊远程通信_Index

聊聊远程通信 Java远程通讯技术及原理分析 聊聊Socket、TCP/IP、HTTP、FTP及网络编程 RMI原理及实现 RPC原理及实现 轻量级分布式 RPC 框架 使用 RMI + ZooKeeper 实现远程调用框架 聊聊同步、...

陶邦仁
2016/02/23
1K
0
腾讯—iOS社招面试

丢了几份简历给腾讯的iOS,很多都给标为不合适,倒是有个MIG部门让我去面。 约了下午3点,在大族大厦12楼一面(面试官看着好实诚): 面试内容: 高工面 1.property本质-->property关键字 2....

gdxz110
2016/03/23
821
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

about git flow

  昨天元芳做了git分支管理规范的分享,为了拓展大家关于git分支的认知,这里我特意再分享这两个关于git flow的链接,大家可以看一下。 Git 工作流程 Git分支管理策略   git flow本质上是...

qwfys
今天
2
0
Linux系统日志文件

/var/log/messages linux系统总日志 /etc/logrotate.conf 日志切割配置文件 参考https://my.oschina.net/u/2000675/blog/908189 dmesg命令 dmesg’命令显示linux内核的环形缓冲区信息,我们可...

chencheng-linux
今天
1
0
MacOS下给树莓派安装Raspbian系统

下载镜像 前往 树莓派官网 下载镜像。 点击 最新版Raspbian 下载最新版镜像。 下载后请,通过 访达 双击解压,或通过 unzip 命令解压。 检查下载的文件 ls -lh -rw-r--r-- 1 dingdayu s...

dingdayu
今天
1
0
spring boot使用通用mapper(tk.mapper) ,id自增和回显等问题

最近项目使用到tk.mapper设置id自增,数据库是mysql。在使用通用mapper主键生成过程中有一些问题,在总结一下。 1、UUID生成方式-字符串主键 在主键上增加注解 @Id @GeneratedValue...

北岩
今天
2
0
告警系统邮件引擎、运行告警系统

告警系统邮件引擎 cd mail vim mail.py #!/usr/bin/env python#-*- coding: UTF-8 -*-import os,sysreload(sys)sys.setdefaultencoding('utf8')import getoptimport smtplibfr......

Zhouliang6
今天
1
0
Java工具类—随机数

Java中常用的生成随机数有Math.random()方法及java.util.Random类.但他们生成的随机数都是伪随机的. Math.radom()方法 在jdk1.8的Math类中可以看到,Math.random()方法实际上就是调用Random类...

PrivateO2
今天
3
0
关于java内存模型、并发编程的好文

Java并发编程:volatile关键字解析    volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在...

DannyCoder
昨天
1
0
dubbo @Reference retries 重试次数 一个坑

在代码一中设置 成retries=0,也就是调用超时不用重试,结果DEBUG的时候总是重试,不是0吗,0就不用重试啊。为什么还是调用了多次呢? 结果在网上看到 这篇文章才明白 https://www.cnblogs....

奋斗的小牛
昨天
2
0
数据结构与算法3

要抓紧喽~~~~~~~放羊的孩纸回来喽 LowArray类和LowArrayApp类 程序将一个普通的Java数组封装在LowArray类中。类中的数组隐藏了起来,它是私有的,所以只有类自己的方法才能访问他。 LowArray...

沉迷于编程的小菜菜
昨天
1
0
spring boot应用测试框架介绍

一、spring boot应用测试存在的问题 官方提供的测试框架spring-boot-test-starter,虽然提供了很多功能(junit、spring test、assertj、hamcrest、mockito、jsonassert、jsonpath),但是在数...

yangjianzhou
昨天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部