文档章节

spring整合消息队列rabbitmq

Fe-Fe
 Fe-Fe
发布于 2013/06/25 14:58
字数 822
阅读 6.4W
收藏 79

3 月,跳不动了?>>>

spring大家太熟,就不多说了

rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684

本文侧重介绍如何将rabbitmq整合到项目中

ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..


 

1.首先是生产者配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
        
   <rabbit:admin connection-factory="connectionFactory"/>
   
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
   
   
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
    <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>
    
    <-- spring template声明-->
    <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>
</beans>

2.fastjson messageconver插件实现


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import fe.json.FastJson;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
	private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

	public static final String DEFAULT_CHARSET = "UTF-8";

	private volatile String defaultCharset = DEFAULT_CHARSET;
	
	public FastJsonMessageConverter() {
		super();
		//init();
	}
	
	public void setDefaultCharset(String defaultCharset) {
		this.defaultCharset = (defaultCharset != null) ? defaultCharset
				: DEFAULT_CHARSET;
	}
	
	public Object fromMessage(Message message)
			throws MessageConversionException {
		return null;
	}
	
	public <T> T fromMessage(Message message,T t) {
		String json = "";
		try {
			json = new String(message.getBody(),defaultCharset);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return (T) FastJson.fromJson(json, t.getClass());
	}	
	

	protected Message createMessage(Object objectToConvert,
			MessageProperties messageProperties)
			throws MessageConversionException {
		byte[] bytes = null;
		try {
			String jsonString = FastJson.toJson(objectToConvert);
			bytes = jsonString.getBytes(this.defaultCharset);
		} catch (UnsupportedEncodingException e) {
			throw new MessageConversionException(
					"Failed to convert Message content", e);
		} 
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
		messageProperties.setContentEncoding(this.defaultCharset);
		if (bytes != null) {
			messageProperties.setContentLength(bytes.length);
		}
		return new Message(bytes, messageProperties);

	}
}

3.生产者端调用


import java.util.List;

import org.springframework.amqp.core.AmqpTemplate;


public class MyMqGatway {
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	public void sendDataToCrQueue(Object obj) {
		amqpTemplate.convertAndSend("queue_one_key", obj);
	}	
}

4.消费者端配置(与生产者端大同小异)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    
   <!-- 连接服务配置  -->
   <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"
        password="guest" port="5672"  />
        
   <rabbit:admin connection-factory="connectionFactory"/>
   
   <!-- queue 队列声明-->
   <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
   
   
   <!-- exchange queue binging key 绑定 -->
    <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    
     
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="queueOneLitener"/>
    </rabbit:listener-container>
</beans>


5.消费者端调用

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueOneLitener implements  MessageListener{
	@Override
	public void onMessage(Message message) {
		System.out.println(" data :" + message.getBody());
	}
}

6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可


打个广告:创业项目www.1zheke.com,有培训诉求的人可以支持一下,或者周围有培训需求的人,帮忙扩散一下,限量1折课,奖学拿到手软,非常感谢

© 著作权归作者所有

上一篇: git常用命令
下一篇: apache ab
Fe-Fe
粉丝 5
博文 5
码字总数 1079
作品 0
杭州
后端工程师
私信 提问
加载中

评论(24)

成从豪
成从豪

引用来自“王炜sky”的评论

为啥我的监听器,接收不到消息?求帮助
listener没有交给spring管理
成从豪
成从豪
你这个listener能正常执行?监听得到?
杨延庆
杨延庆
如果对象是复杂自定义对象你这里没有考虑。
王炜sky
为啥我的监听器,接收不到消息?求帮助
学习使人上进
学习使人上进
一个业务系统存在多个消费connector,这种情况,有遇到过吗,怎么处理呢
_凤求凰_
_凤求凰_
/**
* 把json数据转换成类
* @param jsonString
* @param clazz
* @param <T>
* @return
*/
public static <T> T fromJson(String json, Class<T> clazz) {
return JSON.parseObject(json,clazz);
}


/**
* po类转换成json String
* @param po
* @return
*/
public static String toJson(Object obj) {
//String result = JSON.toJSONString(obj);
String result=JSON.toJSONString(obj, SerializerFeature.WriteMapNullValue);
return result;
}
}
logen2014
logen2014

引用来自“菘鬆”的评论

fe.json.FastJson 这个给我一份 可以吗
312451021@qq.com
zhuganlai
zhuganlai
方便给俺发一份么,113617814@qq.com
尐葮阿譽
尐葮阿譽
看过rabbitmq官网的人都知道,queue,exchange,routingKey三者的绑定只需要在消费端进行就可以了,无需在生产端进行.
你我他有个梦
你我他有个梦

引用来自“菘鬆”的评论

fe.json.FastJson 这个给我一份 可以吗
liuzha13@163.com
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
2018/09/13
651
0
RabbitMQ 实战教程 文集

RabbitMQ 实战教程 文集 此系列博客经梁总(梁桂钊的博客)授权录入本站点,下面推荐下梁总的技术公众号【服务端的思维】,公众号不定时更新技术文章,干货满满!! RabbitMQ 实战教程(一) He...

chenssy
2018/10/06
0
0
Spring Boot RabbitMQ 优先级队列

Docker With RabbitMQ 官方 Docker 镜像仓库地址 https://hub.docker.com/_/rabbitmq 本地运行 RabbitMQ 访问可视化面板 地址:http://127.0.0.1:15672/ 默认账号:guest 默认密码:guest S...

Anoyi
2019/02/23
0
0
阿里大牛带你轻松实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是...

Java架构
2019/01/23
0
0
Spring Boot与RabbitMQ结合实现延迟队列的示例

背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 场景一:在订单系统中,一个用户下单之后通常有...

xiaomin0322
2018/05/11
765
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot 从入门到进阶系列小册SpringForAll社区全网独家发布

经过小编的整理,将社区的SpringBoot系列从入门到进阶文章进行了小册子制作,领取方式见文末,小册大纲如下: Spring Boot 入门系列 Spring Boot 快速入门 《使用Intellij中的Spring Initia...

SpringForA
28分钟前
14
0
springboot configuration annotation processor not configured

解决办法: 在pom.xml中增加如下依赖       <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-configuratio......

ATOZ_HJ
32分钟前
22
0
2D游戏开发(4) CreateJS函数库简单案例

案例1:围住神经猫 开发工具:VsCode 游戏介绍: 蓝色圆圈表示神经猫,点击灰色圆圈围捕神经猫,不让神经猫逃走即可获胜。 代码参考: index.html <!DOCTYPE html><html> <head> ...

hhn30
36分钟前
24
0
java的问答知识区一

游戏规则,采用问答的方式进行知识的巩固; (1) 有那几种隔离级别? 1.1 有四种隔离级别,其分别是读未提交,读已提交,可重复读,串行化读写1.2 隔离,是事务中ACID中的重要一个环;数据库提供隔...

鱼君love
今天
20
0
聊聊skywalking的RemoteClientManager

序 本文主要研究一下skywalking的RemoteClientManager RemoteClientManager skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client......

go4it
今天
17
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部