文档章节

spring集成activemq

尚浩宇
 尚浩宇
发布于 2015/08/24 17:22
字数 1180
阅读 115
收藏 0

两个项目,一个生产者一个消费者,这里只贴出关键代码(队列模式和订阅模式),文章最后会附上项目地址,有需要的可以自行下载。项目访问地址http://localhost:8080/activemq-producer/Test

生产者

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- JMS消息服务器的IP和端口号 -->
				<property name="brokerURL">
					<value>tcp://127.0.0.1:61616</value>
				</property>
				<!-- 是否异步发送 -->  
                <property name="useAsyncSend" value="true" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
		<!-- 超时设置 -->
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>	
	<!-- 发送消息的目的地(一个主题) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  	
	<!-- 队列Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<ref local="jmsFactory" />
		</property>
		<!-- 默认队列名称 -->
		<property name="defaultDestinationName" value="sams" />
		<!-- 区别它采用的模式为false是p2p, true是订阅 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>	
	<!-- 订阅Template -->
	<bean id="topicjmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 订阅发布模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
	</bean>	
	<!-- 短信队列 -->
	<bean id="sams" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams" />
	</bean>
	<!-- 消息发送者 -->
	<bean id="QueueSender" class="com.producer.QueueSender">
		<constructor-arg ref="sams" />
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>

queues.java

public class QueueSender {
	private Destination destination;
	private JmsTemplate jmsTemplate;

	public QueueSender(ActiveMQQueue queue) {
		this.destination = queue;
	}
	public void send(final String message) throws JMSException {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}

Test.java

/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	private static final Logger LOGGER=Logger.getLogger(Test.class);
    /**
     * Default constructor. 
     */
    public Test() {
        // TODO Auto-generated constructor stub
    }
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doPost(request, response);
	}
	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		ApplicationContext ac1 = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
		JmsTemplate jmsTemplate=(JmsTemplate) ac1.getBean("jmsTemplate");
		jmsTemplate.send(new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message~!");   
			}
		});
		jmsTemplate.send("callcenter",new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message to callcenter~!");   
			}
		});
		JmsTemplate topicjmsTemplate=(JmsTemplate) ac1.getBean("topicjmsTemplate");
		topicjmsTemplate.send(new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                TextMessage msg = session.createTextMessage();  
                msg.setStringProperty("phrCode", "C001");  
                msg.setText("Hello World!");  
                return msg;  
            }  
        });  
	}

}

消费者项目

<!-- 1. 配置connectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL">
					<!-- JMS消息服务器的IP和端口号 -->
					<value>tcp://127.0.0.1:61616</value>
				</property>
			</bean>
		</property>
		<property name="maxConnections" value="100"/>
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>
	<!-- 来自短信系统的消息 -->
	<bean id="names" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams,callcenter" />
	</bean>
	 <!-- 订阅消息 -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  
	<!-- 队列监听器 -->
	<bean id="QueueReceiverListener" class="com.customer.QueueReceiverListener">
	</bean>
	<!-- 队列监听器的容器 -->
	<bean id="QueueReceiverContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="concurrentConsumers" value="2" />
		<property name="connectionFactory" ref="jmsFactory" />
		<!-- 监听的队列 -->
		<property name="destination" ref="names" />
		<property name="messageListener" ref="QueueReceiverListener" />
		<property name="pubSubNoLocal" value="false"/>
	</bean>
	<!-- 生产消息配置 (自己定义)-->  
    <bean id="myTopicConsumer" class="com.customer.SimpleJMSReceiver" />  
	<!-- 消息监听器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收消息的方法名称 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不进行消息转换 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
	<!-- 订阅监听器的容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 消息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10000"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="client_119" />  
        <property name="durableSubscriptionName" value="client_119"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" /> 
	</bean>

QueueReceiverListener.java

public class QueueReceiverListener implements SessionAwareMessageListener {
	private static final Logger LOGGER = Logger.getLogger(QueueReceiverListener.class);

	public void onMessage(Message message, Session session) throws JMSException {
				TextMessage msg = (TextMessage) message;			
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
	}
}

SimpleJMSReceiver.java

public class SimpleJMSReceiver {
	public void receive(TextMessage message) throws JmsException, JMSException {  
        System.out.println(message.getStringProperty("phrCode"));  
        System.out.println(message.getText());  
    } 
}

pom.xml两个项目都是一样的

<!-- 属性配置 -->
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.9.0</version>
		</dependency>
		<!-- 添加Spring依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.6.RELEASE</version>
		</dependency>
	</dependencies>

下面是输出信息:

17:18:39,725  INFO Test:45 - --------发送队列消息开始--------
17:18:39,725 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'jmsTemplate'
17:18:39,979 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSess
17:18:40,029  INFO QueueReceiverListener:16 - -------接收消息开始---------
17:18:40,030 DEBUG QueueReceiverListener:20 - QueueReceiverListener接收到报文:send a message~!
17:18:40,030 DEBUG QueueReceiverListener:22 - QueueReceiverListener接收到报文:send a message~!

17:18:40,046 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,047  INFO Test:60 - -------发送队列消息结束---------
17:18:40,047  INFO Test:61 - -------发送订阅消息开始---------
17:18:40,047 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'topicjmsTemplate'
17:18:40,057 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:shy-PC-55695-1440407919807-1:3:1,started=false} }
17:18:40,058 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,059  INFO Test:73 - -------发送订阅消息结束---------
17:18:40,061 DEBUG DefaultMessageListenerContainer:313 - Received message of type [class org.apache.
C001
Hello World!

项目下载地址:http://pan.baidu.com/s/1o6DvqrC

© 著作权归作者所有

共有 人打赏支持
上一篇: ActiveMQ集群部署
下一篇: ActiveMQ消息队列
尚浩宇
粉丝 61
博文 143
码字总数 115158
作品 3
朝阳
程序员
私信 提问
Spring集成ActiveMQ集群

看了很多Spring利用JMS配置方式集成ActiveMQ的例子,但是都是单节点配置地址,请问spring配置一个ActiveMQ集群要怎么配置brokerURL呢? 例如: 或者 一般brokerURL都配置一个地址,如果我做的...

Felix_Yao
2015/06/15
1K
1
ActiveMQ启动

自己在本机虚拟机搭建ActiveMQ正常也做了很多测试,但是在测试环境中搭建是发现jdk是1.6于是就把mq的jdk改成单独的jdk,启动正常: INFO: Loading '/usr/local/server/ActiveMQ/apache-activ...

跌倒了爬起来再哭
2016/10/09
324
1
关于ActiveMQ5.7突然崩溃讨论

2017-05-05 09:38:20.416 [ExceptionGuardFilter.java:22] - chain error:Uncategorized exception occured during JMS processing; n ested exception is javax.jms.JMSException: Peer (vm......

禾乘李朋友
2017/05/05
0
2
Spring整合ActiveMQ,无相关activemq日志输出

以内嵌的方式整合Spring和ActiveMQ,用的是ActiveMQ5.8.0版本,导入的jar有: activemq-broker-5.8.0.jar activemq-client-5.8.0.jar activemq-console-5.8.0.jar activemq-pool-5.8.0.jar a......

哎哎哎哎啊
2013/09/15
1K
1
ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike
05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

《碎玉投珠》的读后感想法心得范文3800字

《碎玉投珠》的读后感想法心得范文3800字: 《碎玉投珠》是晋江作者北南2018年的作品,内容主要讲述了其17年的《两小无嫌猜》中副cp师父师叔的爱情故事。 个人并没有看过北南其他的作品,这篇...

原创小博客
24分钟前
0
0
Confluence 6 文档主题合并问答

在 Confluence 官方 前期发布的消息 中,文档主题在 Confluence 6.0 及其后续版本中已经不可用。我们知道你可能对这个有很多好好奇的问题,因此我们在这里设置了一个问答用于帮助你将这个主题...

honeymose
36分钟前
2
0
java框架学习日志-2

上篇文章(java框架学习日志-1)虽然跟着写了例子,也理解为什么这么写,但是有个疑问,为什么叫控制反转?控制的是什么?反转又是什么? 控制其实就是控制对象的创建。 反转与正转对应,正转...

白话
今天
6
0
Integer使用双等号比较会发生什么

话不多说,根据以下程序运行,打印的结果为什么不同? Integer a = 100;Integer b = 100;System.out.println(a == b);//print : trueInteger a = 200;Integer b = 200;System.out.pr...

兜兜毛毛
昨天
11
0
CockroachDB

百度云上的CockroachDB 云数据库 帮助文档 > 产品文档 > CockroachDB 云数据库 > 产品描述 开源NewSQL – CockroachDB在百度内部的应用与实践 嘉宾演讲视频及PPT回顾:http://suo.im/5bnORh ...

miaojiangmin
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部