文档章节

springboot整合activemq加入会签,自动重发机制,持久化

o
 osc_fmg49rzg
发布于 2019/03/20 10:40
字数 1251
阅读 22
收藏 0

精选30+云产品,助力企业轻松上云!>>>

消费者客户端成功接收一条消息的标志是:这条消息被签收。
消费者客户端成功接收一条消息一般包括三个阶段:

         1、消费者接收消息,也即从MessageConsumer的receive方法返回

         2、消费者处理消息

         3、消息被签收

        其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

        在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

        在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置
 
activemq的消息确认机制就是文档中说的ack机制有:
    AUTO_ACKNOWLEDGE = 1    自动确认
    CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
    DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
    SESSION_TRANSACTED = 0    事务提交并确认
    INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认 activemq 独有
  ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
  对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)
  与Broker之间建立一种简单的“担保”机制.
  手动确认和单条消息确认需要手动的在客户端调用message.acknowledge()
  消息重发机制RedeliveryPolicy 有几个属性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
            //是否在每次尝试重新发送失败后,增长这个等待时间
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重发次数,默认为6次   这里设置为10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重发时间间隔,默认为1秒
            redeliveryPolicy.setInitialRedeliveryDelay(1);
            //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);

那么在整合activemq时候就只需要修改配置文件和客户端就可以了,activemq就是这种机制,例如支付宝支付回调的时候,只有我们返回一个success,支付那边才不会给我重发消息

配置文件:

import javax.jms.Queue;
 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
 
@EnableJms  
@Configuration  
public class ActiveMQ4Config {  
 
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("queue1");
    }
 
    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
            RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
            //是否在每次尝试重新发送失败后,增长这个等待时间
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重发次数,默认为6次   这里设置为10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重发时间间隔,默认为1秒
            redeliveryPolicy.setInitialRedeliveryDelay(1);
            //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);
            return redeliveryPolicy;
    }
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  
        ActiveMQConnectionFactory activeMQConnectionFactory =  
                new ActiveMQConnectionFactory(
                       "admin",
                        "admin",
                        url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
        JmsTemplate jmsTemplate=new JmsTemplate();
        jmsTemplate.setDeliveryMode(2);//进行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
        jmsTemplate.setDefaultDestination(queue); //此处可不设置默认,在发送消息时也可设置队列
        jmsTemplate.setSessionAcknowledgeMode(4);//客户端签收模式
        return jmsTemplate;
    }
    
    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置连接数
        factory.setConcurrency("1-10");
        //重连间隔时间
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
 
}

消费者:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer {
 
    private final static Logger logger = LoggerFactory
            .getLogger(Consumer.class);
    
    @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
    public void receiveQueue(final TextMessage text, Session session)
            throws JMSException {
        try {
            logger.debug("Consumer收到的报文为:" + text.getText());
            text.acknowledge();// 使用手动签收模式,需要手动的调用,如果不在catch中调用session.recover()消息只会在重启服务后重发
        } catch (Exception e) {    
            session.recover();// 此不可省略 重发信息使用
        }
    }
}

由此可以知道activemq的queue消息是可以保证消息不丢失,不会被重复消费的,因为会给每个消息设置一个唯一的id,当消息发送失败之后可以根据这个机制来进行消费,当然也是一种处理分布式事物的方法

消息中间件的模式是可以保证消息不会丢失的,持久化和自动重发,消息回签,都可以很好的避免那种机制。消费端代码发生异常了,可以自动重发,自动消息重发。由于之前在测试的时候足够看官方文档,所以理解说客户端发生异常了,是不可以进行重发的,但是今天了解之后,发觉还是自动重发的机制,利用回签机制进行的。

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
Spring boot 整合ActiveMQ 和持久化

pom.xml配置 parent Spring 版本 springboot整合activeMQ案例,queue、topic两种模式 参数配置含义和基本功能 spring boot集成ActiveMQ 同时监听queue和Topic Activemq 介绍消息的发送/消费消...

osc_lrhgywny
2019/12/27
43
0
消息中间件之ActiveMQ教程

前言说明 两种讲授闲聊 MQ的产品学习说明 为什么要引入MQ上 为什么要引入MQ下 MQ的作用定义 ActiveMQ官网介绍和下载 ActiveMQ在Linux下安装 ActiveMQ安装后的控制台访问 Java编码MQ标准API讲...

osc_to22dmsu
2019/06/20
29
0
SpringBoot 2.x (13):整合ActiveMQ

ActiveMQ5.x不多做介绍了,主要是SpringBoot的整合 特点: 1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议 2)支持许多高级功能,如消息组,虚拟目标,通...

osc_2btjzrrn
2019/05/12
4
0
SpringBoot2.0源码分析(二):整合ActiveMQ分析

SpringBoot具体整合ActiveMQ可参考:SpringBoot2.0应用(二):SpringBoot2.0整合ActiveMQ ActiveMQ自动注入 当项目中存在和着两个类时,SpringBoot将ActiveMQ需要使用到的对象注册为Bean,供...

osc_uo9elnxq
2019/03/28
5
0
AcitvieMQ-VirtualTopic springboot整合activemq实现虚拟topic

Queue 点对点 一条消息只能被一个消费者消费,且是持久化消息-当没有可用的消费者时,该消息保存直到被消费位置;当消息被消费者收到但不响应时,该消息会一直保留或会转到另一个消费者,这是...

osc_2xb14pj9
2019/09/04
2
0

没有更多内容

加载失败,请刷新页面

加载更多

java学习day45-Thymeleaf教程(转载)

目录 Thymeleaf 教程 1. 创建模板文件 2. 标准表达式语法 2.1 简单表达式 2.1.1 ${…} 2.1.2 *{…} 2.1.3 #{…} 2.1.4 @{…} 2.1.5 ~{…} 2.1.6 内置对象 2.1.7 工具类 2.2 字面值 2.2.1 文字...

osc_nbg2lo7i
19分钟前
15
0
记录用户登陆信息,你用PHP是如何来实现的

对于初入门的PHP新手来说,或许有一定的难度。建议大家先看看PHP中session的基础含义,需要的朋友可以选择参考。 下面我们就通过具体的代码示例,为大家详细的介绍PHP中session实现记录用户登...

php开源社区
19分钟前
13
0
语音系统源码的开发,一对一语音直播源码

对于大多数人来说,直播已经不再陌生了,所谓是家喻户晓,只要是有智能手机,对于直播肯定是有所了解,对于直播大家想到是娱乐性的互动直播,其实视频直播的话也不是只有这一种方式,还有语音...

qq3595750856
20分钟前
9
0
友链

下面是我的友链啦~~ 外校大佬 _redness 魔法少女 Kylin_Seven 宠辱不惊,闲看庭前花开花落;去留无意,任随天边云卷云舒 Areds 不忘初心,方得使终 Quaint 技术宅拯救世界 校内巨佬们 wxyww ...

osc_94gn551r
21分钟前
5
0
友链

下面是我的友链啦~~ 外校大佬 _redness 魔法少女 Kylin_Seven 宠辱不惊,闲看庭前花开花落;去留无意,任随天边云卷云舒 Areds 不忘初心,方得使终 Quaint 技术宅拯救世界 校内巨佬们 wxyww ...

osc_xih8lf91
21分钟前
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部