Spring5的WebClient使用详解

原创
2019/11/29 14:11
阅读数 4.7K

前言

Spring5带来了新的响应式web开发框架WebFlux,同时,也引入了新的HttpClient框架WebClient。WebClient是Spring5中引入的执行 HTTP 请求的非阻塞、反应式客户端。它对同步和异步以及流方案都有很好的支持,WebClient发布后,RestTemplate将在将来版本中弃用,并且不会向前添加主要新功能。

WebClient与RestTemplate比较

WebClient是一个功能完善的Http请求客户端,与RestTemplate相比,WebClient支持以下内容:

  • 非阻塞 I/O。
  • 反应流背压(消费者消费负载过高时主动反馈生产者放慢生产速度的一种机制)。
  • 具有高并发性,硬件资源消耗更少。
  • 流畅的API设计。
  • 同步和异步交互。
  • 流式传输支持

HTTP底层库选择

Spring5的WebClient客户端和WebFlux服务器都依赖于相同的非阻塞编解码器来编码和解码请求和响应内容。默认底层使用Netty,内置支持Jetty反应性HttpClient实现。同时,也可以通过编码的方式实现ClientHttpConnector接口自定义新的底层库;如切换Jetty实现:

        WebClient.builder()
                .clientConnector(new JettyClientHttpConnector())
                .build();

WebClient配置

基础配置

WebClient实例构造器可以设置一些基础的全局的web请求配置信息,比如默认的cookie、header、baseUrl等

WebClient.builder()
                .defaultCookie("kl","kl")
                .defaultUriVariables(ImmutableMap.of("name","kl"))
                .defaultHeader("header","kl")
                .defaultHeaders(httpHeaders -> {
                    httpHeaders.add("header1","kl");
                    httpHeaders.add("header2","kl");
                })
                .defaultCookies(cookie ->{
                    cookie.add("cookie1","kl");
                    cookie.add("cookie2","kl");
                })
                .baseUrl("http://www.kailing.pub")
                .build();

底层依赖Netty库配置

通过定制Netty底层库,可以配置SSl安全连接,以及请求超时,读写超时等。这里需要注意一个问题,默认的连接池最大连接500。获取连接超时默认是45000ms,你可以配置成动态的连接池,就可以突破这些默认配置,也可以根据业务自己制定。包括Netty的select线程和工作线程也都可以自己设置。

        //配置动态连接池
         //ConnectionProvider provider = ConnectionProvider.elastic("elastic pool");
         //配置固定大小连接池,如最大连接数、连接获取超时、空闲连接死亡时间等
         ConnectionProvider provider = ConnectionProvider.fixed("fixed", 45, 4000, Duration.ofSeconds(6));
         HttpClient httpClient = HttpClient.create(provider)
                 .secure(sslContextSpec -> {
                     SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
                             .trustManager(new File("E://server.truststore"));
                     sslContextSpec.sslContext(sslContextBuilder);
                 }).tcpConfiguration(tcpClient -> {
                     //指定Netty的select 和 work线程数量
                     LoopResources loop = LoopResources.create("kl-event-loop", 1, 4, true);
                     return tcpClient.doOnConnected(connection -> {
                         //读写超时设置
                         connection.addHandlerLast(new ReadTimeoutHandler(10, TimeUnit.SECONDS))
                                 .addHandlerLast(new WriteTimeoutHandler(10));
                     })
                             //连接超时设置
                             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                             .option(ChannelOption.TCP_NODELAY, true)
                             .runOn(loop);
                 });
 
         WebClient.builder()
                 .clientConnector(new ReactorClientHttpConnector(httpClient))
                 .build();

关于连接池的设置,据群友反馈,他们在使用WebClient是并发场景下会抛获取连接异常。异常如下:

Caused by: reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 45000ms

后经博主深入研究发现,WebClient底层依赖库reactory-netty在不同的版本下,初始化默认TcpTcpResources策略不一样,博主在网关系统中使用的reactory-netty版本是0.8.3,默认创建的是动态的连接池,即使在并发场景下也没发生过这种异常。而在0.9.x后,初始化的是固定大小的连接池,这位群友正是因为使用的是0.9.1的reactory-netty,在并发时导致连接不可用,等待默认的45s后就抛异常了。所以,使用最新版本的WebClient一定要根据自己的业务场景结合博主上面的Netty HttpClient配置示例合理设置好底层资源。

编解码配置

针对特定的数据交互格式,可以设置自定义编解码的模式,如下:

        ExchangeStrategies strategies = ExchangeStrategies.builder()
                .codecs(configurer -> {
                    configurer.customCodecs().decoder(new Jackson2JsonDecoder());
                    configurer.customCodecs().encoder(new Jackson2JsonEncoder());
                })
                .build();
        WebClient.builder()
                .exchangeStrategies(strategies)
                .build();

get请求示例

uri构造时支持属性占位符,真实参数在入参时排序好就可以。同时可以通过accept设置媒体类型,以及编码。最终的结果值是通过Mono和Flux来接收的,在subscribe方法中订阅返回值。

        WebClient client = WebClient.create("http://www.kailing.pub");
        Mono<String> result = client.get()
                .uri("/article/index/arcid/{id}.html", 256)
                .acceptCharset(StandardCharsets.UTF_8)
                .accept(MediaType.TEXT_HTML)
                .retrieve()
                .bodyToMono(String.class);
        result.subscribe(System.err::println);

如果需要携带复杂的查询参数,可以通过UriComponentsBuilder构造出uri请求地址,如:

        //定义query参数
        MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
        params.add("name", "kl");
        params.add("age", "19");
        //定义url参数
        Map<String, Object> uriVariables = new HashMap<>();
        uriVariables.put("id", 200);
        String uri = UriComponentsBuilder.fromUriString("/article/index/arcid/{id}.html")
                .queryParams(params)
                .uriVariables(uriVariables)
                .toUriString();

下载文件时,因为不清楚各种格式文件对应的MIME Type,可以设置accept为MediaType.ALL,然后使用Spring的Resource来接收数据即可,如:

        WebClient.create("https://kk-open-public.oss-cn-shanghai.aliyuncs.com/xxx.xlsx")
                .get()
                .accept(MediaType.ALL)
                .retrieve()
                .bodyToMono(Resource.class)
                .subscribe(resource -> {
                    try {
                        File file = new File("E://abcd.xlsx");
                        FileCopyUtils.copy(StreamUtils.copyToByteArray(resource.getInputStream()), file);
                    }catch (IOException ex){}
                });

post请求示例

post请求示例演示了一个比较复杂的场景,同时包含表单参数和文件流数据。如果是普通post请求,直接通过bodyValue设置对象实例即可。不用FormInserter构造。

        WebClient client = WebClient.create("http://www.kailing.pub");
        FormInserter formInserter = fromMultipartData("name","kl")
                .with("age",19)
                .with("map",ImmutableMap.of("xx","xx"))
                .with("file",new File("E://xxx.doc"));
        Mono<String> result = client.post()
                .uri("/article/index/arcid/{id}.html", 256)
                .contentType(MediaType.APPLICATION_JSON)
                .body(formInserter)
                //.bodyValue(ImmutableMap.of("name","kl"))
                .retrieve()
                .bodyToMono(String.class);
        result.subscribe(System.err::println);

同步返回结果

上面演示的都是异步的通过mono的subscribe订阅响应值。当然,如果你想同步阻塞获取结果,也可以通过.block()阻塞当前线程获取返回值。

      WebClient client =  WebClient.create("http://www.kailing.pub");
      String result = client .get()
                .uri("/article/index/arcid/{id}.html", 256)
                .retrieve()
                .bodyToMono(String.class)
                .block();
        System.err.println(result);

但是,如果需要进行多个调用,则更高效地方式是避免单独阻塞每个响应,而是等待组合结果,如:

      WebClient client =  WebClient.create("http://www.kailing.pub");
        Mono<String> result1Mono = client .get()
                .uri("/article/index/arcid/{id}.html", 255)
                .retrieve()
                .bodyToMono(String.class);
        Mono<String> result2Mono = client .get()
                .uri("/article/index/arcid/{id}.html", 254)
                .retrieve()
                .bodyToMono(String.class);
        Map<String,String>  map = Mono.zip(result1Mono, result2Mono, (result1, result2) -> {
            Map<String, String> arrayList = new HashMap<>();
            arrayList.put("result1", result1);
            arrayList.put("result2", result2);
            return arrayList;
        }).block();
        System.err.println(map.toString());

Filter过滤器

可以通过设置filter拦截器,统一修改拦截请求,比如认证的场景,如下示例,filter注册单个拦截器,filters可以注册多个拦截器,basicAuthentication是系统内置的用于basicAuth的拦截器,limitResponseSize是系统内置用于限制响值byte大小的拦截器

        WebClient.builder()
                .baseUrl("http://www.kailing.pub")
                .filter((request, next) -> {
                    ClientRequest filtered = ClientRequest.from(request)
                            .header("foo", "bar")
                            .build();
                    return next.exchange(filtered);
                })
                .filters(filters ->{
                    filters.add(ExchangeFilterFunctions.basicAuthentication("username","password"));
                    filters.add(ExchangeFilterFunctions.limitResponseSize(800));
                })
                .build().get()
                .uri("/article/index/arcid/{id}.html", 254)
                .retrieve()
                .bodyToMono(String.class)
                .subscribe(System.err::println);

websocket支持

WebClient不支持websocket请求,请求websocket接口时需要使用WebSocketClient,如:

WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
        session.receive()
                .doOnNext(System.out::println)
                .then());

结语

我们已经在业务api网关、短信平台等多个项目中使用WebClient,从网关的流量和稳定足以可见WebClient的性能和稳定性。响应式编程模型是未来的web编程趋势,RestTemplate会逐步被取缔淘汰,并且官方已经不在更新和维护。WebClient很好的支持了响应式模型,而且api设计友好,是博主力荐新的HttpClient库。赶紧试试吧。

作者简介:

陈凯玲,2016年5月加入凯京科技。现任凯京科技研发中心架构组经理,救火队队长。独立博客KL博客(http://www.kailing.pub)博主。

展开阅读全文
打赏
1
2 收藏
分享
加载中
博主有试过在linux机器上请求超过65535个不? 我定时任务中循环尝试请求,在windows上正常,在linux上,每一个请求会new SocketStream,100%的超过文件最大限制。
06/30 22:31
回复
举报
没有发现这种问题,我们应用在网关里,肯定会超过这个请求的。底层连接池用的netty的,你查下你当前版本的默认配置呢。
07/01 09:10
回复
举报
我是在一个定时器中循环调用测试,不是通过网关请求,可能是我使用的问题,这个随便一个Linux机器,一个定时任务很好复现,不太了解资源是如何释放的,在什么时候释放😂
07/01 20:45
回复
举报
刚提了一个问题,描述了大概的细节,方便的,期望得到回复,感谢,https://www.oschina.net/question/3482502_2317431
07/01 21:48
回复
举报
更多评论
打赏
4 评论
2 收藏
1
分享
返回顶部
顶部