文档章节

java9之Reactive Streams

大头鬼_yc
 大头鬼_yc
发布于 04/11 11:16
字数 2620
阅读 51
收藏 2

Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。

  如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Streams的简短说明。RxJava和Akka Streams一直是十分优秀的响应流实现库。现在java 9已经通过java.util.concurrent.Flow API 引入了响应流支持。

Java 9 Reactive Streams

  Reactive Streams是关于流的异步处理,因此应该有一个发布者(Publisher)和一个订阅者(Subscriber)。发布者发布数据流,订阅者使用数据。

这里写图片描述

  有时我们必须在Publisher和Subscriber之间转换数据。处理器(Processor)是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者能理解它。我们可以拥有一系列(chain )处理器。

这里写图片描述

  从上面的图中可以清楚地看出,Processor既可以作为订阅者也可以作为发布者。

Java 9 Flow API

  Java 9 Flow API实现了Reactive Streams规范。Flow API是IteratorObserver模式的组合。Iterator在pull模型上工作,用于应用程序从源中拉取项目;而Observer在push模型上工作,并在item从源推送到应用程序时作出反应。

  Java 9 Flow API订阅者可以在订阅发布者时请求N个项目。然后将项目从发布者推送到订阅者,直到推送玩所有项目或遇到某些错误。 
   
Java 9 Flow API

Java 9 Flow API类和接口

让我们快速浏览一下Flow API类和接口。

  • java.util.concurrent.Flow:这是Flow API的主要类。该类封装了Flow API的所有重要接口。这是一个final类,我们不能扩展它。
  • java.util.concurrent.Flow.Publisher:这是一个功能接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。
  • java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口。订阅者中的方法以严格的顺序进行调用。此接口有四种方法: 
    • onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request开始从处理器(Processor)接收项目。
    • onNext:当从发布者收到项目时调用此方法,这是我们实现业务逻辑以处理流,然后从发布者请求更多数据的方法。
    • onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。
    • onComplete:这就像finally方法,并且在发布者没有发布其他项目发布者关闭时调用。我们可以用它来发送流成功处理的通知。
  • java.util.concurrent.Flow.Subscription:这用于在发布者和订阅者之间创建异步非阻塞链接。订阅者调用其request方法来向发布者请求项目。它还有cancel取消订阅的方法,即关闭发布者和订阅者之间的链接。
  • java.util.concurrent.Flow.Processor:此接口同时扩展了PublisherSubscriber接口,用于在发布者和订阅者之间转换消息。
  • java.util.concurrent.SubmissionPublisher:一个Publisher实现,它将提交的项目异步发送给当前订阅者,直到它关闭为止。它使用Executor框架,我们将在响应流示例中使用该类来添加订阅者,然后向其提交项目。

Java 9响应流示例

  让我们从一个简单的例子开始,我们将实现Flow API Subscriber接口并使用SubmissionPublisher来创建发布者和发送消息。

Stream Data

  假设我们有一个Employee类,用于创建从发布者发送到订阅者的流消息。

package com.journaldev.reactive.beans;

public class Employee {

    private int id;
    private String name;

    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }

    public Employee(int i, String s) {
        this.id = i;
        this.name = s;
    }

    public Employee() {
    }

    @Override
    public String toString() {
        return "[id="+id+",name="+name+"]";
    }
}

  我们还有一个实用程序类来为我们的示例创建一个员工列表。

package com.journaldev.reactive_streams;

import java.util.ArrayList;
import java.util.List;

import com.journaldev.reactive.beans.Employee;

public class EmpHelper {

    public static List<Employee> getEmps() {

        Employee e1 = new Employee(1, "Pankaj");
        Employee e2 = new Employee(2, "David");
        Employee e3 = new Employee(3, "Lisa");
        Employee e4 = new Employee(4, "Ram");
        Employee e5 = new Employee(5, "Anupam");

        List<Employee> emps = new ArrayList<>();
        emps.add(e1);
        emps.add(e2);
        emps.add(e3);
        emps.add(e4);
        emps.add(e5);

        return emps;
    }

}

Subscriber

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.journaldev.reactive.beans.Employee;

public class MySubscriber implements Subscriber<Employee> {

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        this.subscription = subscription;
        this.subscription.request(1); //requesting data from publisher
        System.out.println("onSubscribe requested 1 item");
    }

    @Override
    public void onNext(Employee item) {
        System.out.println("Processing Employee "+item);
        counter++;
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Some error happened");
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("All Processing Done");
    }

    public int getCounter() {
        return counter;
    }

}
  • Subscription变量以保持引用,以便可以在onNext方法中进行请求。
  • counter变量以记录处理的项目数,请注意它的值在onNext方法中增加。这将在我们的main方法中用于在结束主线程之前等待执行完成。
  • onSubscribe方法中调用订阅请求以开始处理。另请注意,onNext在处理项目后再次调用方法,要求对下一个从发布者发布的项目进行处理。
  • onErroronComplete在例子中没有太多逻辑,但在实际情况中,它们应该用于在发生错误时执行纠正措施或在处理成功完成时清理资源。

响应流测试程序

  我们将使用SubmissionPublisherPublisher作为示例,让我们看一下响应流实现的测试程序。

package com.journaldev.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.journaldev.reactive.beans.Employee;

public class MyReactiveApp {

    public static void main(String args[]) throws InterruptedException {

        // Create Publisher
        SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

        // Register Subscriber
        MySubscriber subs = new MySubscriber();
        publisher.subscribe(subs);

        List<Employee> emps = EmpHelper.getEmps();

        // Publish items
        System.out.println("Publishing Items to Subscriber");
        emps.stream().forEach(i -> publisher.submit(i));

        // logic to wait till processing of all messages are over
        while (emps.size() != subs.getCounter()) {
            Thread.sleep(10);
        }
        // close the Publisher
        publisher.close();

        System.out.println("Exiting the app");

    }

}

  在上述代码中,最重要的部分是发布者subscribesubmit方法的调用。我们应该始终关闭发布者以避免任何内存泄漏。

  执行上述程序时,我们将得到以下输出。

Subscribed
Publishing Items to Subscriber
onSubscribe requested 1 item
Processing Employee [id=1,name=Pankaj]
Processing Employee [id=2,name=David]
Processing Employee [id=3,name=Lisa]
Processing Employee [id=4,name=Ram]
Processing Employee [id=5,name=Anupam]
Exiting the app
All Processing Done

请注意,如果我们在处理所有项目之前,主线程已经退出了,那么我们将得到不想要的结果。

消息转换示例

  处理器用于在发布者和订阅者之间转换消息。假设我们有另一个用户希望处理不同类型的消息。假设这个新的消息类型是Freelancer

package com.journaldev.reactive.beans;

public class Freelancer extends Employee {

    private int fid;

    public int getFid() {
        return fid;
    }

    public void setFid(int fid) {
        this.fid = fid;
    }

    public Freelancer(int id, int fid, String name) {
        super(id, name);
        this.fid = fid;
    }

    @Override
    public String toString() {
        return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";
    }
}

  我们有一个新订阅者使用Freelancer流数据。

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

import com.journaldev.reactive.beans.Freelancer;

public class MyFreelancerSubscriber implements Subscriber<Freelancer> {

    private Subscription subscription;

    private int counter = 0;

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("Subscribed for Freelancer");
        this.subscription = subscription;
        this.subscription.request(1); //requesting data from publisher
        System.out.println("onSubscribe requested 1 item for Freelancer");
    }

    @Override
    public void onNext(Freelancer item) {
        System.out.println("Processing Freelancer "+item);
        counter++;
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Some error happened in MyFreelancerSubscriber");
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("All Processing Done for MyFreelancerSubscriber");
    }

    public int getCounter() {
        return counter;
    }

}

processor

  代码重要的部分是实现Processor接口。由于我们想要使用它SubmissionPublisher,我们会扩展它并在适合的地方使用它。

package com.journaldev.reactive_streams;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;

import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;

public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> {

    private Subscription subscription;
    private Function<Employee,Freelancer> function;

    public MyProcessor(Function<Employee,Freelancer> function) {  
        super();  
        this.function = function;  
      }  

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(Employee emp) {
        submit((Freelancer) function.apply(emp));  
        subscription.request(1);  
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }

}
  • Function 将用于将Employee对象转换为Freelancer对象。
  • 我们将传入的Employee消息转换为onNext方法中的Freelancer消息,然后使用SubmissionPublisher submit方法将其发送给订阅者。
  • 由于Processor既是订阅者又是发布者,我们可以在终端发布者和订阅者之间创建一系列处理器。

消息转换测试

package com.journaldev.reactive_streams;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;

import com.journaldev.reactive.beans.Employee;
import com.journaldev.reactive.beans.Freelancer;

public class MyReactiveAppWithProcessor {

    public static void main(String[] args) throws InterruptedException {
        // Create End Publisher
        SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

        // Create Processor
        MyProcessor transformProcessor = new MyProcessor(s -> {
            return new Freelancer(s.getId(), s.getId() + 100, s.getName());
        });

        //Create End Subscriber
        MyFreelancerSubscriber subs = new MyFreelancerSubscriber();

        //Create chain of publisher, processor and subscriber
        publisher.subscribe(transformProcessor); // publisher to processor
        transformProcessor.subscribe(subs); // processor to subscriber

        List<Employee> emps = EmpHelper.getEmps();

        // Publish items
        System.out.println("Publishing Items to Subscriber");
        emps.stream().forEach(i -> publisher.submit(i));

        // Logic to wait for messages processing to finish
        while (emps.size() != subs.getCounter()) {
            Thread.sleep(10);
        }

        // Closing publishers
        publisher.close();
        transformProcessor.close();

        System.out.println("Exiting the app");
    }

}

  阅读程序中的注释以正确理解它,最重要的变化是发布者 - 处理器 - 订阅者链的创建。执行上述程序时,我们将得到以下输出。

Subscribed for Freelancer
Publishing Items to Subscriber
onSubscribe requested 1 item for Freelancer
Processing Freelancer [id=1,name=Pankaj,fid=101]
Processing Freelancer [id=2,name=David,fid=102]
Processing Freelancer [id=3,name=Lisa,fid=103]
Processing Freelancer [id=4,name=Ram,fid=104]
Processing Freelancer [id=5,name=Anupam,fid=105]
Exiting the app
All Processing Done for MyFreelancerSubscriber
Done

取消订阅

  我们可以使用Subscription cancel方法停止在订阅者中接收消息。

请注意,如果我们取消订阅,则订阅者将不会收到onCompleteonError信号。

  以下是一个示例代码,其中订阅者只消费3条消息,然后取消订阅。

@Override
public void onNext(Employee item) {
    System.out.println("Processing Employee "+item);
    counter++;
    if(counter==3) {
        this.subscription.cancel();
        return;
    }
    this.subscription.request(1);
}

  请注意,在这种情况下,我们在处理所有消息之前停止主线程的逻辑将进入无限循环。我们可以为此场景添加一些额外的逻辑,如果订阅者已停止处理或取消订阅,就使用一些全局变量来标志该状态。

Back Pressure

  当发布者以比订阅者消费更快的速度生成消息时,会产生背压。Flow API不提供任何关于背压或处理它的信号的机制。但我们可以设计自己的策略来处理它,例如微调用户或降低信息产生率。您可以阅读RxJava deals with Back Pressure

总结

  Java 9 Flow API是响应式编程和创建异步非阻塞应用程序的良好举措。但是,只有在所有系统API都支持它时,才能创建真正的响应式应用程序。

原文地址:Java 9 Reactive Streams written by Pankaj 
完整代码:Github

本文转载自:https://blog.csdn.net/why_still_confused/article/details/82351960

上一篇: 开发流程
下一篇: java8之Stream
大头鬼_yc

大头鬼_yc

粉丝 5
博文 70
码字总数 18031
作品 0
昌平
程序员
私信 提问
Reactor 2.0.0.RC1 发布,支持 Reactive Stream

Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM 语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每...

oschina
2015/02/19
2.3K
1
Reactor 2.0.0.M1 发布,集成 Reactive Streams

Reactor 2.0.0.M1 发布,此版本完全兼容 Reactive Streams,完全重写了 和 APIs! 同时还集成了其他 Reactive Streams: Akka Streams, Ratpack, RxJava 等等。Reactor 提供一个固定的函数来构...

oschina
2014/10/22
1K
2
Reactive Programming 一种技术,各自表述

前言 作为一名 Java 开发人员,尤其是 Java 服务端工程师,对于 Reactive Programming 的概念似乎相对陌生。随着 Java 9 以及 Spring Framework 5 的相继发布,Reactive 技术逐渐开始被广大从...

小马哥mercyblitz
2018/07/23
0
0
RxJava 2.2.10 发布,Rx 的 Java 实现

RxJava 2.2.10 发布了,RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。 主要更新内容如下: Bug 修复 Pull 6499:将缺少的空检查添加到 BufferExactBounded...

xplanet
06/22
2K
1
Flow支持Reactive Streams in Java

Java 的 JVM Flow 就是按照 reactive-stream 的 API 规范写的,来看看Flow长的什么样? 可以看到是在神奇 java.util.concurrent 包中,而且这是一个 Final 修饰的类。 java.util.concurrent...

woshixin
2018/06/13
23
0

没有更多内容

加载失败,请刷新页面

加载更多

SSH安全加强两步走

从 OpenSSH 6.2 开始已经支持 SSH 多因素认证,本文就来讲讲如何在 OpenSSH 下启用该特性。 OpenSSH 6.2 以后的版本多了一个配置项 AuthenticationMethods。该配置项可以让 OpenSSH 同时指定...

xiangyunyan
28分钟前
4
0
C或C++不是C/C++

http://www.voidcn.com/article/p-mucdruqa-ws.html

shzwork
今天
6
0
OSChina 周六乱弹 —— 如何将梳子卖给和尚

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @for_ :划水五分钟,专注两小时。分享Various Artists的单曲《贝多芬第8号钢琴奏鸣曲悲伤的第三乐章》: 《贝多芬第8号钢琴奏鸣曲悲伤的第三乐...

小小编辑
今天
179
8
ES5

什么是ES5:比普通js运行要求更加严格的模式 为什么:js语言本身有很多广受诟病的缺陷 如何:在当前作用域的顶部添加:"use strict" 要求: 1、禁止给未声明的变量赋值 2、静默失败升级为错误...

wytao1995
今天
7
0
c++ 内联函数调用快的原因

见图片分析

天王盖地虎626
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部