文档章节

RabbitMQ spring 使用总结

bill2candy
 bill2candy
发布于 2016/09/08 09:42
字数 826
阅读 3294
收藏 107

rabbitMQ相关概念不在本文介绍范围,rabbitMQ官网和其他博客都有大量介绍。

本文重点内容是spring和rabbit环境搭建以及使用中注意事项总结。

1.1   rabbitMQ服务器搭建

下载安装官网最新版本服务器

1.2   rabbitMQ开启服务管理

rabbitMQ start 启动

1.3   spring pom配置

<spring-rabbit.version>1.3.9.RELEASE</spring-rabbit.version>
<!-- 消息队列 rabbitmq -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>${rabbitmq-client.version}</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>${spring-rabbit.version}</version>
</dependency>

1.4   spring config配置

在D:\workspace\sps\src\main\resources\spring-rabbitmq.xml

配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
      http://www.springframework.org/schema/mvc
     http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <mvc:annotation-driven />


    <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.master.ip}" port="${rabbitmq.master.port}" username="${rabbitmq.master.username}" password="${rabbitmq.master.password}" />

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     exchange="order_topic_exchange" message-converter="gsonConverter" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="orderQueue" durable="true"  />
    <rabbit:queue name="orderPayQueryQueue" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">600000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="pay_delay_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:queue name="orderPayDelayQueryQueue" durable="true"/>

    <rabbit:topic-exchange name="pay_delay_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderPayDelayQueryQueue" pattern="orderPay.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <rabbit:topic-exchange name="order_topic_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="orderQueue" pattern="sps.#"/>
            <rabbit:binding queue="orderPayQueryQueue" pattern="orderPay.#"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"  concurrency="10">
        <rabbit:listener queues="orderQueue" ref="orderQueueListener"/>

    </rabbit:listener-container>
   
    <bean id="orderQueueListener" class="com.supuy.sps.services.queue.OrderQueueListener" />

    <bean id="gsonConverter" class="com.supuy.core.mq.Gson2JsonMessageConverter"/>


</beans>

1.5  延迟消息队列

有时候,因为各种原因,我们想实现延迟消费的目的,但是rabbitMQ并没有提供这个功能,这时候,可以通过x-message-ttlx-dead-letter-exchange实现

    <rabbit:queue name="orderPayQueryQueue" durable="true" auto-delete="false" exclusive="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <value  type="java.lang.Long">600000</value>
            </entry>
            <entry key="x-dead-letter-exchange" value="pay_delay_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

1.6   生产者

@Override
public void orderBuilder(int type,String orderCode) {
    String key = "tps."+orderCode;
    orderCode = type+"."+orderCode;
    amqpMaster.convertAndSend(key, orderCode);
    logger.info("订单加入消息队列,订单编码:{}", key);
}

1.7   消费者

package com.supuy.tps.service.queue;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.supuy.tps.common.mq.Gson2JsonMessageConverter;
import com.supuy.tps.dto.bean.WmsOrderParam;
import com.supuy.tps.service.IOrderShopService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * Created by bill on 2016/5/31.
 */
public class OrderSendQueueListener implements ChannelAwareMessageListener {
    private static Logger logger = LoggerFactory.getLogger(OrderSendQueueListener.class);
    @Autowired
    private Gson2JsonMessageConverter messageConverter;
    @Autowired
    private IOrderShopService orderShopService;
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        channel.basicQos(100);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        String data=(String)messageConverter.fromMessage(message);
        if (data!=null){
            WmsOrderParam wmsOrderParam= JSON.parseObject(data,WmsOrderParam.class);
            if (wmsOrderParam != null){
                wmsOrderParam.setOrderCode(wmsOrderParam.getOrderCode().substring(1));
                orderShopService.pushOrderLogInfo(wmsOrderParam);
            }
        }
    }
}

附加类Gson2JsonMessageConverter实现如下,

package com.supuy.tps.common.mq;

import com.google.gson.Gson;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
      
    private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
      
    private static ClassMapper classMapper =  new DefaultClassMapper();
  
    private static Gson gson = new Gson();
  
    public Gson2JsonMessageConverter() {  
        super();  
    }  


    @Override  
    protected Message createMessage(Object object,
            MessageProperties messageProperties) {
        byte[] bytes = null;  
        try {  
            String jsonString = gson.toJson(object);  
            bytes = jsonString.getBytes(getDefaultCharset());  
        }  
        catch (IOException e) {  
            throw new MessageConversionException(
                    "Failed to convert Message content", e);  
        }  
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(getDefaultCharset());  
        if (bytes != null) {  
            messageProperties.setContentLength(bytes.length);  
        }  
        classMapper.fromClass(object.getClass(), messageProperties);
        return new Message(bytes, messageProperties);
    }  
  
    @Override  
    public Object fromMessage(Message message)
            throws MessageConversionException {
        Object content = null;  
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {  
            String contentType = properties.getContentType();  
            if (contentType != null && contentType.contains("json")) {  
                String encoding = properties.getContentEncoding();  
                if (encoding == null) {  
                    encoding = getDefaultCharset();  
                }  
                try {  
                        Class<?> targetClass = getClassMapper().toClass(
                                message.getMessageProperties());
                        content = convertBytesToObject(message.getBody(),  
                                encoding, targetClass);  
                }  
                catch (IOException e) {  
                    throw new MessageConversionException(
                            "Failed to convert Message content", e);  
                }  
            }  
            else {  
                log.warn("Could not convert incoming message with content-type ["  
                        + contentType + "]");  
            }  
        }  
        if (content == null) {  
            content = message.getBody();  
        }  
        return content;  
    }  
  
    private Object convertBytesToObject(byte[] body, String encoding,  
            Class<?> clazz) throws UnsupportedEncodingException {  
        String contentAsString = new String(body, encoding);  
        return gson.fromJson(contentAsString, clazz);  
    }

    @Override
    public ClassMapper getClassMapper() {
        return new DefaultClassMapper();

    }
}  

1.8   Q&A

1 ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。

2 接受消息之后,出现错误,该消息就会被持续占有,无法消费。所以,要活用消息的ack,nack,reject。

© 著作权归作者所有

bill2candy

bill2candy

粉丝 24
博文 14
码字总数 16723
作品 1
青岛
后端工程师
私信 提问
加载中

评论(7)

Kevin_Zhan
Kevin_Zhan
没用过mq,但一直能听到,假如真要使用可能还不知道在什么场景用才好😮
bill2candy
bill2candy 博主

引用来自“一起开源”的评论

推荐一个系列的Rabbitmq教程 http://www.rm5u.com/mq/rabbitmq/
谢谢分享,本文只是做简单的使用介绍。
一起开源
推荐一个系列的Rabbitmq教程 http://www.rm5u.com/mq/rabbitmq/
bill2candy
bill2candy 博主

引用来自“yytf”的评论

博主知道一条消息消费失败,想放回队列,使用spring amqp的哪个方法吗?
同上,可以试试nack方法
bill2candy
bill2candy 博主

引用来自“lindent”的评论

博主知道一条消息消费失败,想放回队列,使用spring amqp的哪个方法吗?
需要捕获异常,并回复nack
yytf
yytf
博主知道一条消息消费失败,想放回队列,使用spring amqp的哪个方法吗?
lindent
lindent
博主知道一条消息消费失败,想放回队列,使用spring amqp的哪个方法吗?
阿里云Kubernetes SpringCloud 实践进行时(2): 分布式配置管理

简介 为了更好地支撑日益增长的庞大业务量,我们常常需要把服务进行整合、拆分,使我们的服务不仅能通过集群部署抵挡流量的冲击,又能根据业务在其上进行灵活的扩展。随着分布式的普及、服务...

osswangxining
2018/05/25
0
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
2018/07/25
833
0
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
2018/09/13
445
0
我的RabbitMQ的学习成果

背景 在研发分布式事务的最终一致性事务模式时,使用了RabbitMQ。 在这之前也接触过RabbitMQ,但没有特别深入的去了解它的特性与原理。这次决定系统的学习一次,所以业余时间阅读大神们的书籍...

XuePeng77
04/15
266
0
Spring Boot RabbitMQ 优先级队列

Docker With RabbitMQ 官方 Docker 镜像仓库地址 https://hub.docker.com/_/rabbitmq 本地运行 RabbitMQ 访问可视化面板 地址:http://127.0.0.1:15672/ 默认账号:guest 默认密码:guest S...

Anoyi
02/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

代理模式之JDK动态代理 — “JDK Dynamic Proxy“

动态代理的原理是什么? 所谓的动态代理,他是一个代理机制,代理机制可以看作是对调用目标的一个包装,这样我们对目标代码的调用不是直接发生的,而是通过代理完成,通过代理可以有效的让调...

code-ortaerc
今天
5
0
学习记录(day05-标签操作、属性绑定、语句控制、数据绑定、事件绑定、案例用户登录)

[TOC] 1.1.1标签操作v-text&v-html v-text:会把data中绑定的数据值原样输出。 v-html:会把data中值输出,且会自动解析html代码 <!--可以将指定的内容显示到标签体中--><标签 v-text=""></......

庭前云落
今天
8
0
VMware vSphere的两种RDM磁盘

在VMware vSphere vCenter中创建虚拟机时,可以添加一种叫RDM的磁盘。 RDM - Raw Device Mapping,原始设备映射,那么,RDM磁盘是不是就可以称作为“原始设备映射磁盘”呢?这也是一种可以热...

大别阿郎
今天
12
0
【AngularJS学习笔记】02 小杂烩及学习总结

本文转载于:专业的前端网站☞【AngularJS学习笔记】02 小杂烩及学习总结 表格示例 <div ng-app="myApp" ng-controller="customersCtrl"> <table> <tr ng-repeat="x in names | orderBy ......

前端老手
昨天
16
0
Linux 内核的五大创新

在科技行业,创新这个词几乎和革命一样到处泛滥,所以很难将那些夸张的东西与真正令人振奋的东西区分开来。Linux内核被称为创新,但它又被称为现代计算中最大的奇迹,一个微观世界中的庞然大...

阮鹏
昨天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部