文档章节

RabbitMQ 消息确认

chaun
 chaun
发布于 2016/11/17 14:05
字数 692
阅读 203
收藏 0

RabbitMQ的消息确认类型

  • RabbitMQ给生产者返回消息确认
  • 消费端给RabbitMQ返回消息确认
  • 消费端给RabbitMQ返回消息拒绝

RabbitMQ返回消息确认

  • 生产者端将消息发送出去,消息到达RabbitMQ之后,会返回一个到达确认。
  • 这个确认实际上就是官方常说的ConfirmCallback,我们通过在生产者端使用一个回调类来监听RabbiMQ返回的消息确认。
  • Spring AMQP中我们通过设置RabbitTemplate的ConfirmCallback属性来实现消息确认回调,通过一个实现了ConfirmCallback的类来实现回调逻辑。

举例说明

@Bean(name = "smsRabbitTemplate")
public RabbitTemplate smsRabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.setRoutingKey(smsRoutingKey);
    rabbitTemplate.setQueue(QUEUE_SMS);
    rabbitTemplate.setExchange("smsExchange");
    rabbitTemplate.setConfirmCallback(smsConfirmCallBack());
    rabbitTemplate.setRetryTemplate(retryTemplate());
    return rabbitTemplate;
}
/**
 * Email队列回调
 * @author Xiaoyang.Li
 *
 */
public class EmailConfirmCallBack implements ConfirmCallback{

    private Logger logger = LoggerFactory.getLogger(EmailConfirmCallBack.class);

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info(correlationData.getId() + "--邮件已发送到RabbitMQ队列中.");
        }else{
            logger.info(correlationData.getId() + "--邮件发送到RabbitMQ队列失败。{}", cause);
        }
    }
}

消费端消息确认

  • 默认的rabbitmq消费端是开启了自动确认的。
  • 实际项目中往往我们会对消息进行一系列的处理,然后再给出消费确认,也就是我们需要关闭自动确认,使用手动确认,通过设置AcknowledgeMode为MANUAL可以开启手动确认。
  • Spring AMQP中我们可以通过设置SimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL)来进行开启手动确认

举例说明

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
       SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());
       container.setMessageConverter(new Jackson2JsonMessageConverter());
       //如果设置手动消费,那么需要使用Channel.basicAck()进行数据返回,
       //所以MessageListener需要实现ChannelAwareMessageListener接口
       container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置手动确认消息被消费
       container.setQueues(mailQueue());//添加监听队列
       container.setMessageListener(mailListenerUseChannel());//如果设置AcknowledgeMode为手动,那么需要使用这个

       return container;
}

消费端消息拒绝

  • 消费端如果多次消费失败,我们可以将这条消息拒绝,通过死信设置,rabbitmq会将拒绝的消息存放到死信队列中去。
  • 消费端通过使用basic.reject来进行拒绝操作。
  • Spring AMQP中我们通过调用channel.basicReject(deliveryTag, false)来进行消费拒绝

举例说明

public class MailListenerUseChannel implements ChannelAwareMessageListener {

    private Logger logger = LoggerFactory.getLogger(MailListenerUseChannel.class);

    @Autowired
    private MimeMailSender mimeMailSender;

    private ByteArrayToObject byteArrayToObj;

    @PostConstruct
    public void init(){
        byteArrayToObj = new ByteArrayToObject();
    }

    /**
     * 从mq取出数据然后发送
     */
    public void onMessage(Message message, Channel channel) {
        boolean isComplete = true;
        if(message == null){
            logger.error("接收到的消息为空");
            isComplete = false;
        }
        logger.info("消息格式"+message.getMessageProperties().getContentType());
        EmailMessage emailMessage = byteArrayToObj.parseMessage(message, EmailMessage.class);
        if(emailMessage.getHeader() == null){
            logger.error("Email header is null. Please check message.");
            isComplete = false;
        }

        if(isComplete){
            logger.info("邮件类型>>>>>"+emailMessage.getHeader().getMsgType());

            try{
                //发送邮件
                mimeMailSender.sendEmail(emailMessage);
                //发送成功后返回响应给mq
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch(Exception e){
                logger.error("sendEmail error.", e);
                e.printStackTrace();
            }
        }else{
            try {
                //如果消息不合法,直接丢弃
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                logger.error("Discarded a message!");
            } catch (IOException e) {
                logger.error("Reject/Nack message error.");
                e.printStackTrace();
            }
        }
    }
}

本文转载自:https://my.oschina.net/huaxian8812/blog/779474

共有 人打赏支持
chaun
粉丝 91
博文 269
码字总数 91059
作品 0
深圳
高级程序员
【转载】关于RabbitMQ的消息确认

RabbitMQ 将消息投递到客户端后,客户端如果没处理完这个消息就死掉了,这个消息还会不会存在?这取决于 RabbitMQ 的消息确认机制(Message acknowledgment)是否打开。 为了确保消息不会丢失...

摩云飞
2012/11/27
0
0
RabbitMQ中的工作队列(work queues)

工作队列的设计思想:避免立即执行资源密集型任务。 我们将任务封装为消息并将其发送到队列,消费者从队列中取出任务并执行任务。当我们开启了多个消费者的时候,任务将在他们之间共享。 循环...

m0_37884977
05/17
0
0
RabbitMQ与Redis做队列比较

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

Edwyn王
2015/05/20
0
0
RabbitMQ与Redis队列对比

本文仅针对RabbitMQ与Redis做队列应用时的情况进行对比 具体采用什么方式实现,还需要取决于系统的实际需求 简要介绍 RabbitMQ RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,...

凯文加内特
2015/05/20
0
0
python使用rabbitMQ介绍二(工作队列模式)

一模式介绍 第一章节的生产-消费者模式,是非常简单的模式,一发一收。在实际的应用中,消费者有的时候需要工作较长的时间,则需要增加消费者。 这时mq实现了一下几个功能: rabbitmq循环调度...

MyStitch
07/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

WinDbg

参考来自:http://www.cnit.net.cn/?id=225 SRV*C:\Symbols*http://msdl.microsoft.com/download/symbols ctrl + d to open dump_file Microsoft (R) Windows Debugger Version 6.12.0002.633......

xueyuse0012
41分钟前
2
0
OSChina 周五乱弹 —— 想不想把92年的萝莉退货

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @罗马的王:分享松澤由美的单曲《地球ぎ》 很久没看圣斗士星矢了 《地球ぎ》- 松澤由美 手机党少年们想听歌,请使劲儿戳(这里) @开源中国首...

小小编辑
今天
11
1
springBoot条件配置

本篇介绍下,如何通过springboot的条件配置,控制Bean的创建 介绍下开发环境 JDK版本1.8 springboot版本是1.5.2 开发工具为 intellij idea(2018.2) 开发环境为 15款MacBook Pro 前言 很多时候,...

贺小五
今天
1
0
javascript source map 的使用

之前发现VS.NET会为压缩的js文添加一个与文件名同名的.map文件,一直没有搞懂他是用来做什么的,直接删除掉运行时浏览器又会报错,后来google了一直才真正搞懂了这个小小的map文件背后的巨大...

粒子数反转
昨天
1
0
谈谈如何学Linux和它在如今社会的影响

昨天,还在农耕脑力社会,今天已经人工智能技术、大数据、信息技术的科技社会了,高速开展并迅速浸透到当今科技社会的各个方面,Linux日益成为人们信息时代的到来,更加考验我们对信息的处理程...

linux-tao
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部