文档章节

spring集成activemq

尚浩宇
 尚浩宇
发布于 2015/08/24 17:22
字数 1180
阅读 112
收藏 0
点赞 0
评论 0

两个项目,一个生产者一个消费者,这里只贴出关键代码(队列模式和订阅模式),文章最后会附上项目地址,有需要的可以自行下载。项目访问地址http://localhost:8080/activemq-producer/Test

生产者

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- JMS消息服务器的IP和端口号 -->
				<property name="brokerURL">
					<value>tcp://127.0.0.1:61616</value>
				</property>
				<!-- 是否异步发送 -->  
                <property name="useAsyncSend" value="true" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
		<!-- 超时设置 -->
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>	
	<!-- 发送消息的目的地(一个主题) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  	
	<!-- 队列Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<ref local="jmsFactory" />
		</property>
		<!-- 默认队列名称 -->
		<property name="defaultDestinationName" value="sams" />
		<!-- 区别它采用的模式为false是p2p, true是订阅 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>	
	<!-- 订阅Template -->
	<bean id="topicjmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 订阅发布模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
	</bean>	
	<!-- 短信队列 -->
	<bean id="sams" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams" />
	</bean>
	<!-- 消息发送者 -->
	<bean id="QueueSender" class="com.producer.QueueSender">
		<constructor-arg ref="sams" />
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>

queues.java

public class QueueSender {
	private Destination destination;
	private JmsTemplate jmsTemplate;

	public QueueSender(ActiveMQQueue queue) {
		this.destination = queue;
	}
	public void send(final String message) throws JMSException {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}

Test.java

/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	private static final Logger LOGGER=Logger.getLogger(Test.class);
    /**
     * Default constructor. 
     */
    public Test() {
        // TODO Auto-generated constructor stub
    }
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doPost(request, response);
	}
	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		ApplicationContext ac1 = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
		JmsTemplate jmsTemplate=(JmsTemplate) ac1.getBean("jmsTemplate");
		jmsTemplate.send(new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message~!");   
			}
		});
		jmsTemplate.send("callcenter",new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message to callcenter~!");   
			}
		});
		JmsTemplate topicjmsTemplate=(JmsTemplate) ac1.getBean("topicjmsTemplate");
		topicjmsTemplate.send(new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                TextMessage msg = session.createTextMessage();  
                msg.setStringProperty("phrCode", "C001");  
                msg.setText("Hello World!");  
                return msg;  
            }  
        });  
	}

}

消费者项目

<!-- 1. 配置connectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL">
					<!-- JMS消息服务器的IP和端口号 -->
					<value>tcp://127.0.0.1:61616</value>
				</property>
			</bean>
		</property>
		<property name="maxConnections" value="100"/>
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>
	<!-- 来自短信系统的消息 -->
	<bean id="names" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams,callcenter" />
	</bean>
	 <!-- 订阅消息 -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  
	<!-- 队列监听器 -->
	<bean id="QueueReceiverListener" class="com.customer.QueueReceiverListener">
	</bean>
	<!-- 队列监听器的容器 -->
	<bean id="QueueReceiverContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="concurrentConsumers" value="2" />
		<property name="connectionFactory" ref="jmsFactory" />
		<!-- 监听的队列 -->
		<property name="destination" ref="names" />
		<property name="messageListener" ref="QueueReceiverListener" />
		<property name="pubSubNoLocal" value="false"/>
	</bean>
	<!-- 生产消息配置 (自己定义)-->  
    <bean id="myTopicConsumer" class="com.customer.SimpleJMSReceiver" />  
	<!-- 消息监听器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收消息的方法名称 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不进行消息转换 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
	<!-- 订阅监听器的容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 消息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10000"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="client_119" />  
        <property name="durableSubscriptionName" value="client_119"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" /> 
	</bean>

QueueReceiverListener.java

public class QueueReceiverListener implements SessionAwareMessageListener {
	private static final Logger LOGGER = Logger.getLogger(QueueReceiverListener.class);

	public void onMessage(Message message, Session session) throws JMSException {
				TextMessage msg = (TextMessage) message;			
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
	}
}

SimpleJMSReceiver.java

public class SimpleJMSReceiver {
	public void receive(TextMessage message) throws JmsException, JMSException {  
        System.out.println(message.getStringProperty("phrCode"));  
        System.out.println(message.getText());  
    } 
}

pom.xml两个项目都是一样的

<!-- 属性配置 -->
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.9.0</version>
		</dependency>
		<!-- 添加Spring依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.6.RELEASE</version>
		</dependency>
	</dependencies>

下面是输出信息:

17:18:39,725  INFO Test:45 - --------发送队列消息开始--------
17:18:39,725 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'jmsTemplate'
17:18:39,979 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSess
17:18:40,029  INFO QueueReceiverListener:16 - -------接收消息开始---------
17:18:40,030 DEBUG QueueReceiverListener:20 - QueueReceiverListener接收到报文:send a message~!
17:18:40,030 DEBUG QueueReceiverListener:22 - QueueReceiverListener接收到报文:send a message~!

17:18:40,046 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,047  INFO Test:60 - -------发送队列消息结束---------
17:18:40,047  INFO Test:61 - -------发送订阅消息开始---------
17:18:40,047 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'topicjmsTemplate'
17:18:40,057 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:shy-PC-55695-1440407919807-1:3:1,started=false} }
17:18:40,058 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,059  INFO Test:73 - -------发送订阅消息结束---------
17:18:40,061 DEBUG DefaultMessageListenerContainer:313 - Received message of type [class org.apache.
C001
Hello World!

项目下载地址:http://pan.baidu.com/s/1o6DvqrC

© 著作权归作者所有

共有 人打赏支持
尚浩宇
粉丝 50
博文 137
码字总数 109686
作品 3
朝阳
程序员
ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike ⋅ 05/08 ⋅ 0

activeMQ发送邮件小例子

先安装apache-activemq-5.11.1 ,然后启动activemq.bat 启动activeMQ的控制台:http://localhost:8161/admin/,展示图如下: 解释:Number Of Pending Messages :等待消费的消息,这个是当前...

zhouyuzhoujing ⋅ 05/07 ⋅ 0

ActiveMQ初探(3)——与SpringBoot整合

在上一章:ActiveMQ(2)——与Spring整合中,我们已经完成了ActiveMQ与Spring进行整合,本章将进行与SpringBoot整合。 发送即时消息 Step1:添加依赖包: 这里我们不使用网上大多数的,因为...

yuanlaijike ⋅ 05/08 ⋅ 0

漏网de鱼/fish java分布式商城B2C

系统介绍 Fish 是J2EE分布式开发,技术栈:(dubbo、zookeeper、activemq、Spring、SpringMVC、MyBatis、Shiro、redis、quartz、activiti、MYCAT) 包括核心模块如:用户管理、系统设置、权限...

漏网de鱼 ⋅ 04/23 ⋅ 0

ActiveMQ初探(1)——介绍与基本使用

一、ActiveMQ 1.1 什么是ActiveMQ 是Apache出品,最流行的,能力强劲的。ActiveMQ是一个完全支持和规范的 实现,尽管规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊...

yuanlaijike ⋅ 04/15 ⋅ 0

ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai ⋅ 04/15 ⋅ 0

爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场 ⋅ 04/26 ⋅ 0

ActiveMQ RabbitMQ KafKa对比

前言: ActiveMQ和 RabbitMq 以及Kafka在之前的项目中都有陆续使用过,当然对于三者没有进行过具体的对比,以下摘抄了一些网上关于这三者的对比情况,我自己看过之后感觉还 是可以的,比较清...

xiaomin0322 ⋅ 05/11 ⋅ 0

ActiveMQ集群方案(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51124749 目...

yunlielai ⋅ 04/15 ⋅ 0

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的...

afreon ⋅ 04/22 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

vuex学习

1、getters基本用法: 在store.js里面用const声明我们的getters属性。 const getters={ count:function (state) { return state.count +=100; }} export default new Vuex.S......

大美琴 ⋅ 35分钟前 ⋅ 0

292. Nim Game - LeetCode

Question 292. Nim Game Solution 思路:试着列举一下,就能发现一个n只要不是4的倍数,就能赢。 n 是否能赢1 true2 true3 true4 false 不论删除几,对方都能一把赢5 t...

yysue ⋅ 47分钟前 ⋅ 0

G6 关系数据可视化图形库 简单使用

官网 https://antv.alipay.com/zh-cn/g6/1.x/index.html 效果 首先生成给定数目的小球,并设置随机的颜色 按照顺序,设置小球的角度以及坐标 设置定时器,每隔一定的时间修改小球的角度和坐标...

阿豪boy ⋅ 50分钟前 ⋅ 0

6.5 zip压缩工具 6.6 tar打包 6.7 打包并压缩

zip压缩工具 zip命令可以压缩目录和文件,-r 压缩目录。 zip使用方法 zip 1.txt.zip 1.txt //压缩文件 zip -r 123.zip 123/ //压缩目录 unzip 1.txt.zip //解压 unzip 123.zip -d /root/456...

Linux_老吴 ⋅ 58分钟前 ⋅ 0

react-loadable使用跳坑

官方给react-loadable的定义是: A higher order component for loading components with dynamic imports. 动态路由示例 withLoadable.js import React from 'react'import Loadable fro......

pengqinmm ⋅ 今天 ⋅ 0

记录工作中遇到的坑

1、ios safari浏览器向下滚动会触发window resize事件

端木遗风 ⋅ 今天 ⋅ 0

桥接设计模式

1、概述: 将抽象部分与他的实现部分分离,这样抽象化与实现化解耦,使他们可以独立的变化 如何实现解耦的呢,就是通过提供抽象化和实现化之间的桥接结构 桥接模式将继承模式转化成关联关系,他降...

职业搬砖20年 ⋅ 今天 ⋅ 0

20.zip压缩 tar打包 打包并压缩

6月25日任务 6.5 zip压缩工具 6.6 tar打包 6.7 打包并压缩 6.5 zip压缩工具: zip支持压缩目录 zip压缩完之后原来的文件不删除 不同的文件内容其实压缩的效果不一样 文件内有很多重复的用xz压...

王鑫linux ⋅ 今天 ⋅ 0

double类型数据保留四位小数的另一种思路

来源:透析公式处理,有时候数据有很长的小数位,有的时候由在四位以内,如果用一般的处理方法,那么不足四位的小树会补充0到第四位,这样子有点画蛇添足的感觉,不太好看。所以要根据小数的...

young_chen ⋅ 今天 ⋅ 0

Django配置163邮箱出现 authentication failed(535)错误解决方法

最近用Django写某网站,当配置163邮箱设置完成后,出现535错误即:smtplib.SMTPAuthenticationError: (535, b'Error: authentication failed') Django初始配置邮箱设置 EMAIL_HOST = "smtp.1...

陈墨轩_CJX ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部