文档章节

webflux提供响应式API,玩出不一样的花样

边鹏_尛爺鑫
 边鹏_尛爺鑫
发布于 06/06 11:48
字数 1581
阅读 1361
收藏 12

先说说什么是响应式

        响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式,直白的说就是:将变化的值通过数据流进行传播。

WebFlux又是什么呢

        WebFlux 模块的名称是 spring-webflux,名称中的 Flux 来源于 Reactor 中的类 Flux。Spring webflux 有一个全新的非堵塞的函数式 Reactive Web 框架,可以用来构建异步的、非堵塞的、事件驱动的服务,在伸缩性方面表现非常好。

        spring-webflux 模块。该模块包含对响应式 HTTP 和 WebSocket 客户端的支持,以及对 REST,HTML 和 WebSocket 交互等程序的支持。一般来说,Spring MVC 用于同步处理,Spring Webflux 用于异步处理。

        Spring Boot Webflux 有两种编程模型实现,一种类似 Spring MVC 注解方式,另一种是基于 Reactor 的响应式方式。

实践走起

    我在网找了下发现现在支持的DAL包有:
    spring-boot-starter-data-redis-reactive、spring-boot-starter-data-mongodb-reactive
    也许还有别的,我本意是想要spring-boot-starter-data-mysql-reactive,然而并木有。那就说下上面2个包的实践把。

spring-boot-starter-data-redis-reactive

用到的包

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-pool2</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>com.github.flying-cattle</groupId>
			<artifactId>mybatis-dsc-generator</artifactId>
			<version>${mybatis-dsc-generator.version}</version>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>

YMl配置

server:
  port: 8080
spring:
  application:
    name: webFlux-test
  redis:
    host: 127.0.0.1
    port: 6379
    password: pwd2020
    timeout: 5000
    lettuce:
      pool:
        max-active: 200 
        max-idle: 20 
        min-idle: 5 
        max-wait: 1000 

整合redis-reactive

        虽然包是starter,但是还是要有自己的配置才能用不然报错如下:

Description:

Field redisTemplate in com.flying.cattle.wf.service.impl.RedisServiceImpl required a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' that could not be found.

The injection point has the following annotations:
	- @org.springframework.beans.factory.annotation.Autowired(required=true)


Action:

Consider defining a bean of type 'org.springframework.data.redis.core.ReactiveRedisTemplate' in your configuration.

看了下官方文档需要加上如下:

@Bean
	public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
		ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(factory,RedisSerializationContext.string());
		return reactiveRedisTemplate;
	}

发现了么是ReactiveRedisTemplate<String, String> 感觉就不很友好了,本来我是想声明成ReactiveRedisTemplate<String, Serializable>,搞古了一会儿木有搞定。有那个大佬有好的方案,望指点哈

Service代码:

@Service
public class RedisServiceImpl implements RedisService {

	@Autowired
	private ReactiveRedisTemplate<String, String> redisTemplate;
	
	@Override
	public Mono<String> getById(String key) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.get(key);
	}

	@Override
	public Mono<String> addUser(String key,User user) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Mono<Boolean> deleteById(String key) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.delete(key);
	}

	@Override
	public Mono<String> updateById(String key,User user) {
		// TODO Auto-generated method stub
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return operations.getAndSet(key, JSON.toJSONString(user));
	}

	@Override
	public Flux<String> findAll(String key) {
		// TODO Auto-generated method stub
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.range(key, 0, -1);
	}


	@Override
	public Mono<Long> addlist(String key,List<String> list) {
		// TODO Auto-generated method stub
		ReactiveListOperations<String, String> operations = redisTemplate.opsForList();
		return operations.leftPushAll(key, list);
	}
	
	@Override
	public Flux<String> findUsers(String key) {
		ReactiveValueOperations<String, String> operations = redisTemplate.opsForValue();
		return redisTemplate.keys(key).flatMap(keyId ->operations.get(keyId));
	}
}

Controller代码

@RestController
@RequestMapping("/user")
public class UserController {
	
	public final static String USER_KEY="user";
	
	@Autowired
	private RedisService redisService;
	
	@Autowired
	private RedisGenerateId redisGenerateId;
	
	@GetMapping("/getId")
	public Long getUserId(){
		return redisGenerateId.generate(USER_KEY);
		
	}
	
	public String getKey(Long id) {
		return USER_KEY+"_"+id;
	}
	
	@GetMapping("/getById/{id}")
	public Mono<String> getUserById(@PathVariable("id")Long id){
		return redisService.getById(getKey(id));
	}
	
	@GetMapping("/add")
	public Mono<String> add(User user){
		user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("505237@qq.com");
		user.setPhone("13666275002");
		user.setSex(true);
		String bd="1990-01-01";
		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
		try {
			user.setBirthday(fmt.parse(bd));
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新区");
		user.setAddress("天 府大道XXd段XX号");
		user.setState("1");
		// 以上是模拟数据
		ValidationResult vr=ValidationUtils.validateEntity(user);
		if (!vr.isHasErrors()) {
			user.setId(getUserId());
			System.out.println(JSON.toJSONString(user));
			return redisService.addUser(getKey(user.getId()),user);
		}else {
			return Mono.just(vr.getFirstErrors());
		}
		
	}
	
	@GetMapping("/addlist")
	public Mono<Long> addlist(){
		List<String> list=new ArrayList<String>();
		User user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("505237@qq.com");
		user.setPhone("13666275002");
		user.setSex(true);
		user.setBirthday(new Date());
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新区");
		user.setAddress("天 府大道XXd段XX号");
		user.setState("1");
		//添加第一条数据
		Long id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		//添加第二条数据
		id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		//添加第三条数据
		id=redisGenerateId.generate("user");
		user.setId(id);
		list.add(JSON.toJSONString(user));
		
		return redisService.addlist("list", list);
	}
	
	/**
	 *	这个就是流响应式的接口了,是一个一个的返回数据的,异步返回 
	 *  delayElements(Duration.ofSeconds(2))这个是不要的,只是方便看效果
	 *  redis 直接就是一个一个返回,不需要produces,不知道为什么...还木有深究。
	 */
	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<String> findAll(){
		return redisService.findAll("list").delayElements(Duration.ofSeconds(2));
	}
	
	@GetMapping("/getUsers")
	public Flux<String> findUsers() {
		// TODO Auto-generated method stub
		return redisService.findUsers(USER_KEY+"_"+"*").delayElements(Duration.ofSeconds(2));
	}
}

一个是差list数据类型,一个是匹配key查询的,都是一个一个返回的,实际开发中去掉.delayElements(Duration.ofSeconds(2))就好

整合mongodb-reactive

需要的包,只需要在redis的基础上下面的jar

        <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
		</dependency>

MongoDB就很人性化了,感觉就很友好。而且是真的starter包,配置好数据库连接,就不需要其他配置了,直接可用

DAO

import org.springframework.data.mongodb.repository.ReactiveMongoRepository;

import com.flying.cattle.wf.entity.User;

public interface UserRepository extends ReactiveMongoRepository<User, Long>{

}

SERVICE(接口层我就不贴代码了)

@Service
public class MongoServiceImpl implements MongoService {
	
	@Autowired
	private UserRepository userRepository;
	
	@Override
	public Mono<User> getById(Long id) {
		// TODO Auto-generated method stub
		return userRepository.findById(id);
	}

	@Override
	public Mono<User> addUser(User user) {
		// TODO Auto-generated method stub
		return userRepository.save(user);
	}

	@Override
	public Mono<Boolean> deleteById(Long id) {
		// TODO Auto-generated method stub
		 userRepository.deleteById(id);
		 return Mono.create(userMonoSink -> userMonoSink.success());
	}

	@Override
	public Mono<User> updateById(User user) {
		// TODO Auto-generated method stub
		return userRepository.save(user);
	}

	@Override
	public Flux<User> findAllUser() {
		// TODO Auto-generated method stub
		return userRepository.findAll();
	}
}

CONTROLLER

@RestController
@RequestMapping("/usermg")
public class UserMongoController {
	
	public final static String USER_KEY="user";
	
	@Autowired
	private RedisGenerateId redisGenerateId;
	
	@Autowired
	private MongoService mongoService;
	
	@GetMapping("/getId")
	public Long getUserId(){
		return redisGenerateId.generate(USER_KEY);
	}

	@GetMapping("/add")
	public Mono<User> add(User user) {
		user = new User();
		user.setAccount("admin1");
		user.setPassword("123123");
		user.setNickname("admin");
		user.setEmail("505237@qq.com");
		user.setPhone("13666275002");
		user.setSex(true);
		String bd = "1990-01-01";
		DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd");
		try {
			user.setBirthday(fmt.parse(bd));
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		user.setProvince("四川省");
		user.setCity("成都市");
		user.setCounty("高新区");
		user.setAddress("天 府大道XXd段XX号");
		user.setState("1");
		// 以上是模拟数据
		ValidationResult vr = ValidationUtils.validateEntity(user);
		if (!vr.isHasErrors()) {
			user.setId(getUserId());
			System.out.println(JSON.toJSONString(user));
			return mongoService.addUser(user);
		} else {
			 System.err.println(vr.getFirstErrors());
		}
		return null;
	}
	
	/**
	 *	注意这里produces = MediaType.APPLICATION_STREAM_JSON_VALUE
	 *	如果不是application/stream+json则调用端无法滚动得到结果,将一直阻塞等待数据流结束或超时。
	 */
	@GetMapping(value="/findAll",produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
	public Flux<User> findAll(){
		return mongoService.findAllUser().delayElements(Duration.ofSeconds(1));
	}
}

代码就这些,大家要体验这个框架,建议还是用MongoDB把,毕竟redis主要是做缓存的。

给大家看下数据结构图

 

 

源码地址:https://gitee.com/flying-cattle/infrastructure/tree/master/webFluxTest

© 著作权归作者所有

边鹏_尛爺鑫
粉丝 44
博文 37
码字总数 39637
作品 0
成都
程序员
私信 提问
加载中

评论(7)

KL博客
KL博客

引用来自“KL博客”的评论

@Bean
@Qualifier("reactiveRedisTemplate")
public ReactiveRedisTemplate<String, Object> stringReactiveRedisTemplate(final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> keySerializer = new StringRedisSerializer();
RedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer(Object.class);
RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext
.<String, Object>newSerializationContext()
.key(keySerializer)
.value(valueSerializer)
.hashKey(keySerializer)
.hashValue(valueSerializer)
.build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, serializationContext);
}

引用来自“KL博客”的评论

看下是你要的么,osc回复对代码支持不友好呀

引用来自“边鹏_尛爺鑫”的评论

这个配置我是过,就只是把string该成了object,并不能再申明ReactiveRedisTemplate里面给user,order等类似的对象进去,感觉差不多,都不好...
可以的,针对key和value指定了不同的序列化方式了
边鹏_尛爺鑫
边鹏_尛爺鑫

引用来自“KL博客”的评论

@Bean
@Qualifier("reactiveRedisTemplate")
public ReactiveRedisTemplate<String, Object> stringReactiveRedisTemplate(final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> keySerializer = new StringRedisSerializer();
RedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer(Object.class);
RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext
.<String, Object>newSerializationContext()
.key(keySerializer)
.value(valueSerializer)
.hashKey(keySerializer)
.hashValue(valueSerializer)
.build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, serializationContext);
}

引用来自“KL博客”的评论

看下是你要的么,osc回复对代码支持不友好呀
这个配置我是过,就只是把string该成了object,并不能再申明ReactiveRedisTemplate里面给user,order等类似的对象进去,感觉差不多,都不好...
KL博客
KL博客

引用来自“KL博客”的评论

@Bean
@Qualifier("reactiveRedisTemplate")
public ReactiveRedisTemplate<String, Object> stringReactiveRedisTemplate(final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> keySerializer = new StringRedisSerializer();
RedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer(Object.class);
RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext
.<String, Object>newSerializationContext()
.key(keySerializer)
.value(valueSerializer)
.hashKey(keySerializer)
.hashValue(valueSerializer)
.build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, serializationContext);
}
看下是你要的么,osc回复对代码支持不友好呀
KL博客
KL博客
@Bean
@Qualifier("reactiveRedisTemplate")
public ReactiveRedisTemplate<String, Object> stringReactiveRedisTemplate(final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializer<String> keySerializer = new StringRedisSerializer();
RedisSerializer<Object> valueSerializer = new Jackson2JsonRedisSerializer(Object.class);
RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext
.<String, Object>newSerializationContext()
.key(keySerializer)
.value(valueSerializer)
.hashKey(keySerializer)
.hashValue(valueSerializer)
.build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, serializationContext);
}
zhenruyan
zhenruyan
彭涵钧
彭涵钧
彭涵钧
彭涵钧
Spring 5 新特性

作为Java世界首个响应式Web框架,Spring 5最大的亮点莫过于提供了完整的端到端响应式编程的支持。 image.png 左侧是传统的基于Servlet的Spring Web MVC框架,右侧是5.0版本新引入的基于React...

angeChen
2018/01/02
0
0
Spring Boot 2.0 WebFlux 上手系列课程:快速入门(一)

02:WebFlux 快速入门实践 ## Spring Boot 2.0 spring.io 官网有句醒目的话是: BUILD ANYTHING WITH SPRING BOOT Spring Boot (Boot 顾名思义,是引导的意思)框架是用于简化 Spring 应用从...

泥沙砖瓦浆木匠
2018/04/15
0
0
SpringBoot2使用WebFlux函数式编程

本文只是简单使用SpringBoot2使用WebFlux的函数式编程简单使用,后续会继续写关于Webflux相关的文章。 最近一直在研究WebFlux,后续会陆续出一些相关的文章。 首先看一下Srping官网上的一张图...

dalaoyang
2018/08/02
0
0
Spring Framework 5.0 新特性

Spring Framework 5.0 是自 2013年12月版本 4 发布之后 Spring Framework 的第一个主发行版。Spring Framework 项目的领导人 Juergen Hoeller 于 2016 年 7 月 28 日宣布了第一个 Spring Fra...

独孤环宇
2017/10/23
0
0
聊聊 Spring Boot 2.0 的 WebFlux

聊聊 Spring Boot 2.0 的 WebFlux 泥瓦匠BYSocket2017-11-0416 阅读 Spring技术 聊聊 Spring Boot 2.0 的 WebFlux## 前言 对照下 Spring Web MVC ,Spring Web MVC 是基于 Servlet API 和 Se......

泥瓦匠BYSocket
2017/11/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

局域网能互相ping通,ubuntu虚拟机不能上外网

【问题】 桥接模式老是无法上网,查看本机IP发现被分配了一个私网地址,猜测应该是虚拟DHCP服务器没有打开,于是查看Ubuntu的网络配置: /etc/network/interfaces 发现没有dhcp配置的信息,只...

tahiti_aa
46分钟前
1
0
以太坊助记词PHP开发包简介

以太坊助记词PHP开发包用来为PHP以太坊应用增加助记词和层级确定密钥支持能力。下载地址:以太坊助记词php开发包 。 1、开发包概述 以太坊助记词PHP开发包主要包括以下特性: 生成符合BIP39...

汇智网教程
昨天
2
0
系统监控-分布式调用链Skywalking

1. 为什么要使用分布式调用链技术? 随着公司业务的高速发展,公司服务之间的调用关系愈加复杂,如何理清并跟踪它们之间的调用关系就显的比较关键。线上每一个请求会经过多个业务系统,并产生...

秋日芒草
昨天
4
0
告诉自己的一些建议

摆脱学生心态 尽快发挥自己价值,让公司感知自己的存在,才是王道 选择比努力重要 自己附着的平台的经济体要是一个快速崛起的行业 转行趁早,年龄越大选择成本越高 趁早大量试错,学习新领域...

林怡丰
昨天
3
0
Windows下安装Redis

下载地址: 3.0老版已不维护更新:https://github.com/MicrosoftArchive/redis/releases 4.0版 https://github.com/tporadowski/redis/releases 中文官网:http://www.redis.net.cn/ https:......

Aeroever
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部