描述问题
最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。
相关业务
本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送
相关分析
网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。
还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧
了解rabbit的相关几个概念
了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer
我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,
但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单
下面我们就对这个问题的答案来个coding
解决办法
首先我们因为需要失败重试,使用spring的RepublishMessageRecoverer可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列
相关配置
1 @Bean
2 ObjectMapper objectMapper() {
3 ObjectMapper objectMapper = new ObjectMapper();
4 DateFormat dateFormat = objectMapper.getDateFormat();
5 JavaTimeModule javaTimeModule = new JavaTimeModule();
6
7 SimpleModule module = new SimpleModule();
8 module.addSerializer(new ToStringSerializer(Long.TYPE));
9 module.addSerializer(new ToStringSerializer(Long.class));
10 module.addSerializer(new ToStringSerializer(BigInteger.class));
11
12 javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
13 javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
14 javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
15
16 objectMapper.registerModule(module);
17 objectMapper.registerModule(javaTimeModule);
18 objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持
19 objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
20 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
21 return objectMapper;
22 }
23
24
25
26 @Bean
27 RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
28 return new RabbitAdmin(aConnectionFactory);
29 }
30
31 @Bean
32 MessageConverter jacksonAmqpMessageConverter( ) {
33 return new Jackson2JsonMessageConverter(objectMapper());
34 }
35
36
37 @Bean
38 Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
39 Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
40 rabbitAdmin.declareQueue(queue);
41 return queue;
42 }
43 @Bean
44 Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
45 Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
46 rabbitAdmin.declareQueue(queue);
47 return queue;
48 }
49 @Bean
50 Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
51 Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
52 rabbitAdmin.declareQueue(queue);
53 return queue;
54 }
55 /**
56 * 声明一个交换机
57 * @return
58 */
59 @Bean
60 TopicExchange controlExchange () {
61 return new TopicExchange(Exchanges.ExangeTOPIC);
62 }
63
64
65 /**
66 * 延时重试队列
67 */
68 @Bean
69 public Queue bcwPayControlRetryQueue() {
70 Map<String, Object> arguments = new HashMap<>();
71 arguments.put("x-message-ttl", 10 * 1000);
72 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
73 // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
74 arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
75 return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
76 }
77 /**
78 * 延时重试队列
79 */
80 @Bean
81 public Queue bcwPushControlRetryQueue() {
82 Map<String, Object> arguments = new HashMap<>();
83 arguments.put("x-message-ttl", 10 * 1000);
84 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
85 // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
86 arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
87 return new Queue("queue_bcw@push@retry", true, false, false, arguments);
88 }
89 /**
90 * 延时重试队列
91 */
92 @Bean
93 public Queue bcwPullControlRetryQueue() {
94 Map<String, Object> arguments = new HashMap<>();
95 arguments.put("x-message-ttl", 10 * 1000);
96 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
97 // 如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
98 // arguments.put("x-dead-letter-routing-key", "queue_bcw");
99 return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
100 }
101 @Bean
102 public Binding bcwPayControlRetryBinding() {
103 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104 }
105 @Bean
106 public Binding bcwPushControlRetryBinding() {
107 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108 }
109 @Bean
110 public Binding bcwPullControlRetryBinding() {
111 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112 }
113
114 /**
115 * 队列绑定并关联到RoutingKey
116 *
117 * @param queueMessages 队列名称
118 * @param exchange 交换机
119 * @return 绑定
120 */
121 @Bean
122 Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124 }
125 /**
126 * 队列绑定并关联到RoutingKey
127 *
128 * @param queueMessages 队列名称
129 * @param exchange 交换机
130 * @return 绑定
131 */
132 @Bean
133 Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135 }
136 /**
137 * 队列绑定并关联到RoutingKey
138 *
139 * @param queueMessages 队列名称
140 * @param exchange 交换机
141 * @return 绑定
142 */
143 @Bean
144 Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146 }
147
148 @Bean
149 @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151 SimpleRabbitListenerContainerFactoryConfigurer configurer,
152 ConnectionFactory connectionFactory) {
153 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154 configurer.configure(factory, connectionFactory);
155 factory.setMessageConverter(jacksonAmqpMessageConverter());
156 return factory;
157 }
下面就是我们的主题,定时任务使用的是org.springframework.scheduling
1 /**
2 * 手动确认消息的,定时获取队列消息实现
3 */
4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
5 protected final Logger logger = LoggerFactory.getLogger(getClass());
6 private List<Message> body = new LinkedList<>();
7 public long start_time;
8 private Channel channel;
9 @Autowired
10 private ObjectMapper objectMapper;
11 @Autowired
12 private RabbitTemplate rabbitTemplate;
13
14 public QuartzSimpleMessageListenerContainer() {
15 // 手动确认
16 this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
17
18 this.setMessageListener((ChannelAwareMessageListener) (message,channel) -> {
19 long current_time = System.currentTimeMillis();
20 int time = (int) ((current_time - start_time)/1000);
21 logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue());
22 Long retryCount = getRetryCount(message.getMessageProperties());
23 if (retryCount > 3) {
24 logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue());
25 try {
26 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
27 } catch (IOException ex) {
28 ex.printStackTrace();
29 }
30 return;
31 }
32
33 this.body.add(message);
34 /**
35 * 判断数组数据是否满了,判断此监听器时间是否大于执行时间
36 * 如果在最后延时时间段内没有业务消息,此监听器会一直开着
37 */
38 if(body.size()>=3 || time>60){
39 this.channel = channel;
40 callback();
41 }
42 });
43
44
45
46 }
47 private void callback(){
48 // channel = getChannel(getTransactionalResourceHolder());
49 if(body.size()>0 && channel !=null && channel.isOpen()){
50 try {
51 callbackWork();
52 }catch (Exception e){
53 logger.error("推送数据出错:{}",e.getMessage());
54
55 body.stream().forEach(message -> {
56 Long retryCount = getRetryCount(message.getMessageProperties());
57 if (retryCount <= 3) {
58 logger.info("将消息置入延时重试队列,重试次数:" + retryCount);
59 rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
60 }
61 });
62
63 } finally{
64
65 logger.info("flsher too data");
66
67 body.stream().forEach(message -> {
68 //手动acknowledge
69 try {
70 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
71 } catch (IOException e) {
72 logger.error("手动确认消息失败!");
73 e.printStackTrace();
74 }
75 });
76
77 body.clear();
78 this.stop();
79
80 }
81 }
82
83 }
84 abstract void callbackWork() throws Exception;
85 /**
86 * 获取消息失败次数
87 * @param properties
88 * @return
89 */
90 private long getRetryCount(MessageProperties properties){
91 long retryCount = 0L;
92 Map<String,Object> header = properties.getHeaders();
93 if(header != null && header.containsKey("x-death")){
94 List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
95 if(deaths.size()>0){
96 Map<String,Object> death = deaths.get(0);
97 retryCount = (Long)death.get("count");
98 }
99 }
100 return retryCount;
101 }
102
103 @Override
104 @Scheduled(cron = "0 0/2 * * * ? ")
105 public void start() {
106 logger.info("start push data scheduled!");
107 //初始化数据,将未处理的调用stop方法,返还至rabbit
108 body.clear();
109 super.stop();
110 start_time = System.currentTimeMillis();
111 super.start();
112
113 logger.info("end push data scheduled!");
114 }
115
116 public List<WDNJPullOrder> getBody() {
117
118 List<WDNJPullOrder> collect = body.stream().map(data -> {
119 byte[] body = data.getBody();
120 WDNJPullOrder readValue = null;
121 try {
122 readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
123 });
124 } catch (IOException e) {
125 logger.error("处理数据出错{}",e.getMessage());
126 }
127 return readValue;
128 }
129 ).collect(Collectors.toList());
130
131 return collect;
132
133
134 }
135
136 }
后续
当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现
@Scheduled(cron = "0 0/2 * * * ? ")
public void start()