文档章节

源于HystrixCommandStartStream和RollingCommandMaxConcurrencyStream 的 RxJava demo

专业写BUG的程序员
 专业写BUG的程序员
发布于 07/02 20:19
字数 1480
阅读 855
收藏 0

「深度学习福利」大神带你进阶工程师,立即查看>>>

其实,最近在工作之余看Hystrix源代码已经有一个多月了,  除了对 HystrixCommandProperties  ,HystrixCommand 和AbstractCommand  几个类比较了解以外,其余看山不是山,比较懵,  主要是因为Hystrix基于RxJava 来实现, 而RxJava 则又是函数式编程,跳跃性很大, 习惯了面向过程和面向对象编程,看RxJava则有点吃力。 但昨晚有所顿悟, 因为我看懂了 Hystrix 是如何统计并发量(currentConcurrentExecutionCount)和最近10s最大并发量(rollingMaxConcurrentExecutionCount)的。   Hystrix的所有统计工作在 com.netflix.hystrix.metric 这个package中,初看十分吃力, 十分复杂。 我把统计最近10s 最大并发量抽象成如下需求,并用rxJava 来实现。

 

有n个线程不停的产生整数,速率不固定,求最近10s内产生的最大整数。 昨晚在断网的情况下写了如下实现:使用rxJava的 window 功能来实现。

import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class Main1 {

    //  线程安全   SerializedSubject.onNext  调用SerializedObserver.onNext ,而SerializedObserver.onNext 使用了synchronized
    private static Subject<String, String> writeOnlySubject = new SerializedSubject<String, String>(PublishSubject.<String>create());

    //  共享,能被多个消费者处理
    private static Observable<String> readOnlyStream = writeOnlySubject.share();


    public static void main(String[] args) throws Exception{

        new Thread(new ProduceStream()).start();
        new Thread(new ProduceStream()).start();

        // 计算最近10s产生数据中的最大值
        new Thread(new ConsumerMax()).start();
        // 计算最近10s产生数据中的最小值
        //  new Thread(new ConsumerMin()).start();

    }

    //  只产生数据,不停的产生数据就会形成流
    private static class ProduceStream implements Runnable {

        @Override
        public void run() {

            while (true){
                writeOnlySubject.onNext(String.valueOf(new Random().nextInt(10000)));
                try {
                    Thread.sleep(new Random().nextInt(200));
                }catch (Exception e){

                }

            }
        }

    }

    /*   消费者,观察者  */
    private static class ConsumerMax implements Runnable {

        private static final Func1<String, Integer> getConcurrencyCountFromEvent = new Func1<String, Integer>() {
            @Override
            public Integer call(String event) {
                // 当前的并发量
                int value = Integer.parseInt(event);
                return value;
            }
        };

        private static final Func1<Observable<Integer>, Observable<Integer>> reduceStreamToMax = new Func1<Observable<Integer>, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Observable<Integer> observedConcurrency) {
                return observedConcurrency.reduce(0, reduceToMax);
            }
        };

        private static final Func2<Integer, Integer, Integer> reduceToMax = new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer a, Integer b) {
                return Math.max(a, b);
            }
        };


        private final static BehaviorSubject<Integer> rollingMax = BehaviorSubject.create(0);

        private static Observable<Integer> rollingMaxStream;



        @Override
        public void run() {

            final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
            for (int i = 0; i < 10; i++) {
                emptyRollingMaxBuckets.add(0);
            }

            // 对readOnlyStream进行处理
            rollingMaxStream =  readOnlyStream
                    .map(getConcurrencyCountFromEvent)
                    .window(1000, TimeUnit.MILLISECONDS)
                    .flatMap(reduceStreamToMax)
                    .startWith(emptyRollingMaxBuckets)
                    .window(10, 1)
                    .flatMap(reduceStreamToMax)
                    .share()
                    .onBackpressureDrop();

            //  订阅处理的结果
            rollingMaxStream.subscribe(rollingMax) ;


            while (true) {

                if (rollingMax.hasValue()) {
                    // 输出最近10s最大的并发量
                    System.out.println("最近10s内产生的最大整数为:\t" + rollingMax.getValue());
                }

                try {
                    Thread.sleep(1000);
                } catch (Exception e) {

                }
            }
        }
    }
}

 

输出如下

 

这个几乎是面向过程的编程, 好理解,但是不好维护和扩展。 今天又把这个demo整理成了面向对象编程的dome 。

主类:

public class Main {

    public static void main(String[] args) throws Exception{
        Metrics metrics = new Metrics() ;
        Work work = new Work(metrics) ;

        new Thread(work).start();
        new Thread(work).start();

        while (true){
            System.out.println(metrics.getMax());
            Thread.sleep(1000);
        }
    }

}

 

指标监控类

public class Metrics {

    private IntegerStream integerStream;

    private ConsumerMaxStream consumerMaxStream;

    public Metrics() {
        this.integerStream = new IntegerStream();
        this.consumerMaxStream = new ConsumerMaxStream(integerStream);
    }


    public void add(Integer item) {
        integerStream.write(item);
    }

    public int getMax() {
        return consumerMaxStream.getMax();
    }
}

 

整型数据流类    之所以会形成Stream,是因为 writeOnlySubject 不停的有数据加进来。

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
 * @Classname IntegerStream
 * @Since 2020/7/2 18:17
 * @Created by lizhifeng
 * @Desc
 * @see
 */

public class IntegerStream {


    //  线程安全   SerializedSubject.onNext  调用SerializedObserver.onNext ,而SerializedObserver.onNext 使用了synchronized
    private final Subject<String, String> writeOnlySubject;

    //  共享,能被多个消费者处理
    private final Observable<String> readOnlyStream;

    public IntegerStream() {
        this.writeOnlySubject = new SerializedSubject<String, String>(PublishSubject.<String>create());
        this.readOnlyStream = writeOnlySubject.share();
    }


    /*   这使 IntegerStream变成被观察者 */
    public Observable<String> observe() {
        return readOnlyStream;
    }


    public void write(Integer item) {
        writeOnlySubject.onNext(String.valueOf(item));
    }

}

 

在流上进行统计

import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Classname ConsumerMaxStream
 * @Since 2020/7/2 18:18
 * @Created by lizhifeng
 * @Desc
 * @see
 */


/*   消费者,观察者  */
public class ConsumerMaxStream {

    private final Func1<String, Integer> getConcurrencyCountFromEvent = new Func1<String, Integer>() {
        @Override
        public Integer call(String event) {
            // 当前的并发量
            int value = Integer.parseInt(event);
            return value;
        }
    };

    private final Func1<Observable<Integer>, Observable<Integer>> reduceStreamToMax = new Func1<Observable<Integer>, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Observable<Integer> observedConcurrency) {
            return observedConcurrency.reduce(0, reduceToMax);
        }
    };

    private final Func2<Integer, Integer, Integer> reduceToMax = new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer a, Integer b) {
            return Math.max(a, b);
        }
    };


    private final BehaviorSubject<Integer> rollingMax = BehaviorSubject.create(0);

    private Observable<Integer> rollingMaxStream;

    public ConsumerMaxStream(IntegerStream integerStream) {

        final List<Integer> emptyRollingMaxBuckets = new ArrayList<Integer>();
        for (int i = 0; i < 10; i++) {
            emptyRollingMaxBuckets.add(0);
        }

        // 对readOnlyStream进行处理
        rollingMaxStream = integerStream.observe()
                .map(getConcurrencyCountFromEvent)
                .window(1000, TimeUnit.MILLISECONDS)
                .flatMap(reduceStreamToMax)
                .startWith(emptyRollingMaxBuckets)
                .window(10, 1)
                .flatMap(reduceStreamToMax)
                .share()
                .onBackpressureDrop();

        //  订阅处理的结果
        rollingMaxStream.subscribe(rollingMax);
    }


    public int getMax() {
        if (rollingMax.hasValue()) {
            return rollingMax.getValue();
        }
        return 0;
    }
}

 

Work 类

import java.util.Random;

/**
 * @Classname Work
 * @Since 2020/7/2 18:15
 * @Created by lizhifeng
 * @Desc
 * @see
 */
public class Work implements Runnable {

    private Metrics metrics ;
    public Work(Metrics metrics){
        this.metrics = metrics ;
    }

    @Override
    public void run() {

        while (true){
            metrics.add(new Random().nextInt(10000));

            try{
                Thread.sleep(new Random().nextInt(200));
            }catch (Exception e){

            }
        }
    }
}

每个类都足够的简单,足够的专注,是吧? 如果看源码你则又会陷入迷茫,

public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    
    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    public abstract Observable<Output> observe();

}
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output>
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts>

为啥又是泛型,又是抽象继承,让人头疼。  

 

 

 

纵观整个 com.netflix.hystrix.metric  package , 在重复使用这个套路,来完成各个指标的统计。 dome 中使用是一个Integer流, 但到实际项目中一般都是事件流,不停的产生事件,形成流。

 

HystrixThreadEventStream 是其他四个流的源头, 是一个 ThreadLocal 变量。  因为在 Hystrix-Timer线程内部执行, 因此不需要考虑线程安全问题。

HystrixThreadEventStream  又会write下面四个流,  从字面上很好理解,   命令开始执行流, 命令结束流。 

HystrixCommandStartStream    HystrixCommandCompletionStream    HystrixThreadPoolStartStream    HystrixThreadPoolCompletionStream
如果ExecutionIsolationStrategy为SEMAPHORE  只会产生前面两个流。

 

每个流有若干消费者 consumer

HystrixCommandStartStream  的消费者有  RollingCommandMaxConcurrencyStream   ,   滚动计算最近10s内的最大并发量。

 

HystrixCommandCompletionStream  有五个消费者:

 

HealthCountsStream      健康统计,作为是否熔断的判断。
CumulativeCommandEventCounterStream    
RollingCommandEventCounterStream
RollingCommandLatencyDistributionStream             统计执行每个请求花费的时间,  比如99%的请求都在15ms内完成了 。
RollingCommandUserLatencyDistributionStream           

 

HystrixDashboard 的数据来源便是上传的各种 ConsumerStream 计算而来的。

 

 

 

 

 

 

 

 

 

专业写BUG的程序员

专业写BUG的程序员

粉丝 10
博文 161
码字总数 53617
作品 0
海淀
程序员
私信 提问
加载中
请先登录后再评论。
Touch Visualizer

在界面上加上用户点击、划动屏幕时的轨迹。当用户的手指在屏幕上触摸或点击时(触发任意点击事件),会在手指点击的地方加上慢慢扩展消失的红色圆环,并且在手指触摸的地方加上半透明的圆形效...

匿名
2013/01/26
420
0
Javascript图元绘制库--ternlight

基于HTML CANVAS API的Javascript库,提供在HTML页面上绘制图元——如流程图的能力。 目前已支持简单的矩形图元和图元间的连线(直线、直角连线两种),拖拽图元等能力。 该javascript librar...

fancimage1
2013/02/07
6.3K
1
WSGI Web服务器--UV-Web

uv-web是一个轻量级的支持高并发的WSGI Web服务器,基于libuv构建,部分代码源于开源项目bjoern,本质是python的C扩展,所以适用于部署绝大部分 python web应用(如 Django) 特性 兼容 HTTP 1...

Jone.x
2013/03/04
1.8K
0
iOS 语音识别

OpenEars是一个开源的iOS类库,用于在iPhone和iPad实现语音识别功能。本demo利用此开源类库实现了简单的语音识别。可以识别:CHANGE、LEFT、RIGHT、FORWARD、BACKWARD、GO等英文,其他语素需...

匿名
2013/03/15
6.3K
0
django-c10k-demo

这是一个演示程序,用来实现同时 10000 个并发连接到 Django 。涉及的概念包括:the C10k problem, the WebSocket protocol, the Django web framework, and Python's upcoming asynchronou......

匿名
2013/03/27
1.7K
0

没有更多内容

加载失败,请刷新页面

加载更多

【实用技巧】MAC苹果电脑怎么远程?

一般电脑远程包括三方面: 1、如果从外部windows或者linux终端连接到mac苹果电脑; 2、从苹果电脑内如何远程外面的windows、linux和mac等; 3、苹果和安卓手机怎么远程连接苹果/Windows电脑。...

osc_doeya1ck
57分钟前
22
0
虚拟主机和VPS主机有哪些不同点呢

虚拟主机是一种在单一主机或主机群上,实现多网域服务的方法,可以运行多个网站或服务的技术。vps主机是将一台服务器分割成多个虚拟专享服务器的服务。实现VPS的技术分为容器技术和虚拟机技术...

osc_b88oux8w
58分钟前
19
0
合理的使用MySQL乐观锁与悲观锁

针对 MySQL的乐观锁与悲观锁的使用,基本都是按照业务场景针对性使用的。针对每个业务场景,对应的使用锁。 但是两种锁无非都是解决并发所产生的问题。下面我们来看看如何合理的使用乐观锁与...

php开源社区
58分钟前
25
0
fusionpbx 中文 汉化

  自己以前有从事过呼叫中心的工作经验,然而由于自己是从事后端开发,对于前端界面的开发还是有些吃力,但是自己却又想自己搭建一套呼叫中心,所以购买了一台云服务器并克隆了FusionPBX的...

osc_ydeb2o99
今天
12
0
关于大O表示法和小O表示法

上节课老师讲了一下各种表示法,当时没咋听懂,后来查了一些资料弄懂了,记录一下。 主要是从维基百科上看的。http://en.wikipedia.org/wiki/Big_O_notation 大O表示法: f(x) = O(g(x)) 表示...

osc_3mzamgkq
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部