webflux简介

原创
09/04 11:29
阅读数 185

[toc]

前言

 响应式编程是一种面向数据流和变化传播的编程范式。使用它可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。我们可以使用声明的方式构建应用程序的能力,形成更加敏感和有弹性的应用,所以Spring 5在其核心框架中增加了反应系统,已经开始向声明式编程的范式转变。<br>  Flux:https://blog.csdn.net/lz710117239/article/details/93777692 <br>  优秀博客:https://blog.csdn.net/get_set/article/details/79480233

一、响应式编程的优点

  • 高层次的抽象与响应式编程导致了代码可读性的提高,因此开发人员可以主要关注定义业务逻辑的事件的相互依存性。
  • 在高度并发的环境中,响应模式自然地适合于消息处理,这是一个常见的企业用例。
  • 由于执行反压力的特性,响应式方法最适合控制生产者和消费者之间的流量,这将有助于避免内存不足。
  • 对于一个或几个线程,IO绑定任务可以通过异步和非阻塞方式执行,而且不阻塞当前线程。
  • 在高交互和实时应用程序或任何操作/事件时,都可能触发多个连接子系统的通知,在这种情况下响应式编程可以更有效的进行管理。

二、Spring 5前瞻

 作为Java中的首个响应式Web框架,Spring 5.0最大的亮点莫过于提供了完整的端到端响应式编程的支持。<br> spring5.png  如上图所示,左侧是传统的基于Servlet的Spring Web MVC框架,右侧是基于Reactive Streams的Spring WebFlux框架,从上往下依次是:

  1. Router Functions:对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
  2. spring webflux:核心组件,协调上下游各个组件提供响应式编程支持。
  3. Reactive Streams:一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

三、使用Spring Webflux开发Reactive应用

3.1 相关依赖包

 修改项目的pom.xml文件,如下所示

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>
  • spring-boot-starter-webflux:webflux依赖包,是响应式开发的核心依赖包,其中包含了spring-boot-starter-reactor-netty、spring5 webflux包,默认是容器是netty。
  • spring-boot-starter-test:springboot的单元测试工具库。
  • spring-boot-starter-test:springboot的单元测试工具库。

3.2 开发Reactive应用

 webflux的使用方式有两种:基于注解、函数式编程。这里使用函数式编程,具体操作如下:

3.2.1 创建实体类
package com.chandler.spring.elasticsearch.example.entity;

import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.util.List;

/**
 * 用户文档对象
 *
 * @author 钱丁君-chandler 2020/1/19 5:28 PM
 * @version 1.0.0
 * @since 1.8
 */
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Document(indexName = "user", type = "_doc", shards = 1, replicas = 0)
public class User {
    @Id
    private String id;
    @Field(value = "useName", type = FieldType.Keyword, store = true)
    private String useName;
    @Field(value = "password", type = FieldType.Keyword, store = true)
    private String password;
    @Field(value = "age", type = FieldType.Short, format = DateFormat.date, store = true)
    private short age;
    @Field(value = "brithday", type = FieldType.Date, store = true)
    private String brithday;
    @Field(value = "addresses", type = FieldType.Text, store = true)
    private List<String> addresses;
    @Field(value = "introduction", type = FieldType.Text, store = true)
    private String introduction;
    @Field(value = "memberFlag", type = FieldType.Boolean)
    private Boolean memberFlag;
    @Field(value = "tags", type = FieldType.Auto)
    private List<String> tags;
    @Field(value = "active", type = FieldType.Long)
    private Long active;
    @Field(value = "session_data", type = FieldType.Auto)
    private Object sessionData;
    @Field(value = "onlineTime", type = FieldType.Date)
    private String onlineTime;
}
3.2.2 创建UserGenerate
package com.chandler.spring.elasticsearch.example.domain.resp;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * BeanUtils.copyProperties(User, UserResponse);
 *
 * @author 钱丁君-chandler 2020/1/19 6:53 PM
 * @version 1.0.0
 * @since 1.8
 */
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserGenerate {
    private String id;
    private String useName;
    private String password;
    private short age;
    private String brithday;
    private List<String> addresses;
    private String introduction;
    private Boolean memberFlag;
    private List<String> tags;
    private Long active;
    private Object sessionData;
    private String onlineTime;
}

3.2.3 创建UserHandler
package com.chandler.spring.elasticsearch.example.handler;

import com.chandler.spring.elasticsearch.example.domain.req.UserRequest;
import com.chandler.spring.elasticsearch.example.domain.resp.UserGenerate;
import com.chandler.spring.elasticsearch.example.entity.User;
import com.chandler.spring.elasticsearch.example.service.UserService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

/**
 * 处理器
 *
 * @author 钱丁君-chandler 2020/1/19 6:58 PM
 * @version 1.0.0
 * @since 1.8
 */
@Component
@Configuration
public class UserHandler {

    @Autowired
    private UserService userService;

    public Mono<ServerResponse> findByUsename(ServerRequest request) {
        Flux<UserGenerate> userGenerateFlux = userService.findByUsename(request.queryParam("name").get()).map(u -> {
            UserGenerate userGenerate = UserGenerate.builder().build();
            BeanUtils.copyProperties(u, userGenerate);
            return userGenerate;
        });
        return ok().contentType(APPLICATION_STREAM_JSON)
                .body(userGenerateFlux, UserGenerate.class);
    }

    public Mono<ServerResponse> save(ServerRequest request) {
        return request.bodyToMono(UserRequest.class).map(userReq -> {
            User u = User.builder().build();
            BeanUtils.copyProperties(userReq, u);
            return u;
        }).flatMap(u -> {
            return ServerResponse.ok().contentType(APPLICATION_STREAM_JSON)
                    .body(userService.save(u), User.class);
        });
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<UserGenerate> userGenerateFlux = userService.findAll().map(u -> {
            UserGenerate userGenerate = UserGenerate.builder().build();
            BeanUtils.copyProperties(u, userGenerate);
            return userGenerate;
        });
        return ok().contentType(APPLICATION_STREAM_JSON)
                .body(userGenerateFlux, UserGenerate.class);
    }

    public Mono<ServerResponse> findById(ServerRequest request) {
        Mono<UserGenerate> userGenerateMono = userService.findById(request.pathVariable("id")).map(u -> {
            UserGenerate userGenerate = UserGenerate.builder().build();
            BeanUtils.copyProperties(u, userGenerate);
            return userGenerate;
        });
        return ok().contentType(APPLICATION_STREAM_JSON)
                .body(userGenerateMono, UserGenerate.class);
    }

    public Mono<ServerResponse> findAllById(ServerRequest request) {
        List<String> list = new ArrayList<>();
        list.add(request.pathVariable("id"));
        Flux<UserGenerate> users = userService.findAllById(list).map(u -> {
            UserGenerate userGenerate = UserGenerate.builder().build();
            BeanUtils.copyProperties(u, userGenerate);
            return userGenerate;
        });
        return ok().contentType(APPLICATION_STREAM_JSON)
                .body(users, UserGenerate.class);
    }

    public Mono<ServerResponse> existsById(ServerRequest request) {
        return ok().contentType(APPLICATION_STREAM_JSON)
                .body(userService.existsById(request.pathVariable("id")), Boolean.class);
    }

}
  • 分析:Handler主要用来处理请求操作,并将Mono<ServerResponse>返回Mono<ServerResponse>中会封装响应数据。

 上传文件,并进行存储:

public Mono<ServerResponse> upload(ServerRequest request) {
    //获取文件参数 并进行存储
    return request.multipartData().flatMap(map -> {
        map.forEach((k, v) -> {
            v.forEach(i -> {
                FilePart f = (FilePart) i;
                f.transferTo(new File("/tmp/" + f.filename()));
            });
        });
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8)
                .body(BodyInserters.fromObject(map.size()));
    });
}

 获取文件:

public Mono<ServerResponse> download(ServerRequest request) {
    //读取文件并包装为DataBuffer返回,spring-webflux会自动写入response
    File file = new File("/tmp/test.jpeg");
    return ServerResponse.ok().header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=test.jpeg")
            .contentType(MediaType.IMAGE_JPEG).contentLength(file.length())
            .body(BodyInserters.fromDataBuffers(Mono.create(r -> {
                DataBuffer buf = new DefaultDataBufferFactory().wrap(FileIOUtil.syncRead(file));
                r.success(buf);
                return;
            })));
}
3.2.4 创建UserRouter
package com.chandler.spring.elasticsearch.example.route;

import com.chandler.spring.elasticsearch.example.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;

/**
 * 路由器
 *
 * @author 钱丁君-chandler 2020/1/19 6:59 PM
 * @version 1.0.0
 * @since 1.8
 */
@Configuration
public class UserRouter {
    @Bean
    public RouterFunction<ServerResponse> route(UserHandler userHandler) {
        return RouterFunctions
                .route(GET("/users")
                        .and(accept(APPLICATION_STREAM_JSON)), userHandler::findAll)
                .andRoute(PUT("/users/save")
                        .and(accept(APPLICATION_STREAM_JSON)), userHandler::save)
                .andRoute(GET("/users/find")
                        .and(accept(APPLICATION_STREAM_JSON)), userHandler::findByUsename)
                .andRoute(GET("/users/find/{id}")
                        .and(accept(APPLICATION_STREAM_JSON)), userHandler::findById)
                .andRoute(GET("/users/exists/{id}")
                        .and(accept(APPLICATION_STREAM_JSON)), userHandler::existsById);
    }
}
  • 分析:UserRouter主要用来设置请求路径和转化HTTP请求,可以使用route()方法和andRoute()方法设置多个请求路径和转化操作。
3.2.5 运行测试

 启东项目,之后访问http://localhost:9002/users和http://localhost:9002/users/find?name=chandler,如下所示:<br> postman-name.png postman-users.png

3.2.6 单元测试

 在项目中我们也可以使用使用一个Spring 5新引入的测试工具类WebTestClient,专门用于测试RP应用,具体代码如下:

package com.chandler.spring.elasticsearch.example.router;

import com.chandler.spring.elasticsearch.example.domain.resp.UserGenerate;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;

/**
 * router测试
 *
 * @author 钱丁君-chandler 2020/1/21 5:50 PM
 * @version 1.0.0
 * @since 1.8
 */
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UserRouterTest {

    @Autowired
    private WebTestClient webTestClient;

    @Test
    public void route() {
         webTestClient.get()
                .uri("/users").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                .expectStatus().isOk().returnResult(UserGenerate.class).getResponseBody()
                 .subscribe(u->log.info("spring webflux UserGenerate:{}", u.toString()));
    }

}

 创建WebTestClient实例时可以看到,编写RP应用的单元测试,同样也是数据不落地的流式风格。 webflux单元测试.png

3.2.7 小结

 HTTP请求会由UserRouter转发给对应的Handler,Handler处理请求,并放回Mono<ServerResponse>。

Router对象类似@RequestMapping,Handler类似Controller。

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
OSCHINA
登录后可查看更多优质内容
返回顶部
顶部