RabbitMq 发送消息时,同时接受返回数据

原创
04/30 20:49
阅读数 325

生产者代码,注意要设在将返回数据放入的队列;

通过这种方式可以实现同步阻塞,从而得到返回数据

package com.boot.springbootnew.component;

import com.alibaba.fastjson.JSON;
import com.boot.springbootnew.config.ReturnComponent;
import com.boot.springbootnew.pojo.MqFailLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqComponent implements RabbitTemplate.ConfirmCallback{

    private Logger logger = LoggerFactory.getLogger(RabbitMqComponent.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Autowired
    private ReturnComponent returnComponent;

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info(" 回调id:{} 发送成功", correlationData.getId());
        }else {
            logger.error(" 回调id:{} 消费失败, {} " ,correlationData.getId(),cause);
        }
    }

    public void sendMessage(MqFailLog msgLog) {
        //设置回调为当前类对象
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(returnComponent);
        //构建回调id为uuid
        CorrelationData correlationId = new CorrelationData(JSON.toJSONString(msgLog));
        //发送消息到消息队列
//        rabbitTemplate.convertAndSend(msgLog.getExchange(),msgLog.getRouteKey(),msgLog.getMessage(),correlationId);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setCorrelationId(correlationId.getId());
        messageProperties.setReplyTo("user.reg.response.queue");
        messageProperties.setConsumerQueue("user.reg.response.queue");
        byte[] bytes = JSON.toJSONString(msgLog.getMessage()).getBytes();
        Message message = rabbitTemplate.sendAndReceive(msgLog.getExchange(), msgLog.getRouteKey(), new Message(bytes, messageProperties), correlationId);
        String response = new String(message.getBody());
        System.out.println(response);

    }
}

消费者代码

package com.listener;
import com.rabbitmq.client.*;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


//将该监听器注入ioc容器
@Component
public class MyRabbitListener {

    //监听的队列名为queue.news
    @RabbitListener(queues = "user.reg.queue")
    @RabbitHandler
    public String listener(byte[] bytes,Channel channel){
        String request = new String(bytes);

        System.out.println("收到消息:"+request);
        return "我处理完成了  MyRabbitListener";
    }

}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部