文档章节

spring集成activemq

尚浩宇
 尚浩宇
发布于 2015/08/24 17:22
字数 1180
阅读 114
收藏 0

两个项目,一个生产者一个消费者,这里只贴出关键代码(队列模式和订阅模式),文章最后会附上项目地址,有需要的可以自行下载。项目访问地址http://localhost:8080/activemq-producer/Test

生产者

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- JMS消息服务器的IP和端口号 -->
				<property name="brokerURL">
					<value>tcp://127.0.0.1:61616</value>
				</property>
				<!-- 是否异步发送 -->  
                <property name="useAsyncSend" value="true" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
		<!-- 超时设置 -->
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>	
	<!-- 发送消息的目的地(一个主题) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  	
	<!-- 队列Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<ref local="jmsFactory" />
		</property>
		<!-- 默认队列名称 -->
		<property name="defaultDestinationName" value="sams" />
		<!-- 区别它采用的模式为false是p2p, true是订阅 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>	
	<!-- 订阅Template -->
	<bean id="topicjmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 订阅发布模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
	</bean>	
	<!-- 短信队列 -->
	<bean id="sams" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams" />
	</bean>
	<!-- 消息发送者 -->
	<bean id="QueueSender" class="com.producer.QueueSender">
		<constructor-arg ref="sams" />
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>

queues.java

public class QueueSender {
	private Destination destination;
	private JmsTemplate jmsTemplate;

	public QueueSender(ActiveMQQueue queue) {
		this.destination = queue;
	}
	public void send(final String message) throws JMSException {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}

Test.java

/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	private static final Logger LOGGER=Logger.getLogger(Test.class);
    /**
     * Default constructor. 
     */
    public Test() {
        // TODO Auto-generated constructor stub
    }
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doPost(request, response);
	}
	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		ApplicationContext ac1 = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
		JmsTemplate jmsTemplate=(JmsTemplate) ac1.getBean("jmsTemplate");
		jmsTemplate.send(new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message~!");   
			}
		});
		jmsTemplate.send("callcenter",new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message to callcenter~!");   
			}
		});
		JmsTemplate topicjmsTemplate=(JmsTemplate) ac1.getBean("topicjmsTemplate");
		topicjmsTemplate.send(new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                TextMessage msg = session.createTextMessage();  
                msg.setStringProperty("phrCode", "C001");  
                msg.setText("Hello World!");  
                return msg;  
            }  
        });  
	}

}

消费者项目

<!-- 1. 配置connectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL">
					<!-- JMS消息服务器的IP和端口号 -->
					<value>tcp://127.0.0.1:61616</value>
				</property>
			</bean>
		</property>
		<property name="maxConnections" value="100"/>
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>
	<!-- 来自短信系统的消息 -->
	<bean id="names" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams,callcenter" />
	</bean>
	 <!-- 订阅消息 -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  
	<!-- 队列监听器 -->
	<bean id="QueueReceiverListener" class="com.customer.QueueReceiverListener">
	</bean>
	<!-- 队列监听器的容器 -->
	<bean id="QueueReceiverContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="concurrentConsumers" value="2" />
		<property name="connectionFactory" ref="jmsFactory" />
		<!-- 监听的队列 -->
		<property name="destination" ref="names" />
		<property name="messageListener" ref="QueueReceiverListener" />
		<property name="pubSubNoLocal" value="false"/>
	</bean>
	<!-- 生产消息配置 (自己定义)-->  
    <bean id="myTopicConsumer" class="com.customer.SimpleJMSReceiver" />  
	<!-- 消息监听器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收消息的方法名称 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不进行消息转换 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
	<!-- 订阅监听器的容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 消息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10000"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="client_119" />  
        <property name="durableSubscriptionName" value="client_119"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" /> 
	</bean>

QueueReceiverListener.java

public class QueueReceiverListener implements SessionAwareMessageListener {
	private static final Logger LOGGER = Logger.getLogger(QueueReceiverListener.class);

	public void onMessage(Message message, Session session) throws JMSException {
				TextMessage msg = (TextMessage) message;			
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
	}
}

SimpleJMSReceiver.java

public class SimpleJMSReceiver {
	public void receive(TextMessage message) throws JmsException, JMSException {  
        System.out.println(message.getStringProperty("phrCode"));  
        System.out.println(message.getText());  
    } 
}

pom.xml两个项目都是一样的

<!-- 属性配置 -->
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.9.0</version>
		</dependency>
		<!-- 添加Spring依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.6.RELEASE</version>
		</dependency>
	</dependencies>

下面是输出信息:

17:18:39,725  INFO Test:45 - --------发送队列消息开始--------
17:18:39,725 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'jmsTemplate'
17:18:39,979 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSess
17:18:40,029  INFO QueueReceiverListener:16 - -------接收消息开始---------
17:18:40,030 DEBUG QueueReceiverListener:20 - QueueReceiverListener接收到报文:send a message~!
17:18:40,030 DEBUG QueueReceiverListener:22 - QueueReceiverListener接收到报文:send a message~!

17:18:40,046 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,047  INFO Test:60 - -------发送队列消息结束---------
17:18:40,047  INFO Test:61 - -------发送订阅消息开始---------
17:18:40,047 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'topicjmsTemplate'
17:18:40,057 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:shy-PC-55695-1440407919807-1:3:1,started=false} }
17:18:40,058 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,059  INFO Test:73 - -------发送订阅消息结束---------
17:18:40,061 DEBUG DefaultMessageListenerContainer:313 - Received message of type [class org.apache.
C001
Hello World!

项目下载地址:http://pan.baidu.com/s/1o6DvqrC

© 著作权归作者所有

共有 人打赏支持
尚浩宇
粉丝 53
博文 140
码字总数 112241
作品 3
朝阳
程序员
ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike
05/08
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
SpringBoot集成ActiveMQ抛出java.lang.NoClassDefFoundError异常

版权声明:本文首发 http://asing1elife.com ,转载请注明出处。 https://blog.csdn.net/asing1elife/article/details/82732127 SpringBoot集成ActiveMQ抛出java.lang.NoClassDefFoundError异......

asing1elife
09/17
0
0
springJMS+activeMQ实践

运行环境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3. 一定要注意activeMQ的版本与jdk的兼容性,最新的activeMQ版本估计要在jdk1.7以上才能运行。 先说一下activeMQ的安装: 1、下载:h...

wangrikui
2015/06/28
0
2
只需三步:内嵌ActiveMQ到SpringBoot应用中

不知你是否有过这样的体验: 在调试带JMS组件的应用时, 需要额外启动一个JMS服务器来配合测试。 这样既操作繁琐,又不利于实现单元测试——不符合单元测试AIR原则(自动化,独立性,可重复,...

天上只北
06/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

awk命令用法介绍

10月18日任务 9.6/9.7 awk 1.awk(上)(下) 1.awk 分段操作功能 指定分隔符,并把第一段打印出来,不会改动文件内容 将所有内容打印出来 awk 没有指定分隔符号,则会默认用空格或者空白字符...

hhpuppy
51分钟前
2
0
Spring Cloud Eureka Server高可用之:在线扩容

本文共 1591字,阅读大约需要 6分钟 ! 概述 业务微服务化以后,我们要求服务高可用,于是我们可以部署多个相同的服务实例,并引入负载均衡机制。而微服务注册中心作为微服务化系统的重要单元...

CodeSheep
今天
2
0
内网esxi主机上安装CoreOS虚拟机

CoreOS是一个为专门运行容器而设计的轻量级linux发行版,旨在通过轻量的系统架构和灵活的应用程序部署能力简化数据中心的维护成本和复杂度。它没有包管理工具,运行容器化应用以提供服务;默...

hiwill
今天
1
0
20181018 上课截图

![](https://oscimg.oschina.net/oscnet/49f66c08ab8c59a21a3b98889d961672f30.jpg) ![](https://oscimg.oschina.net/oscnet/a61bc2d618b403650dbd4bf68a671fabecb.jpg)......

小丑鱼00
今天
3
0
WinDbg

参考来自:http://www.cnit.net.cn/?id=225 SRV*C:\Symbols*http://msdl.microsoft.com/download/symbols ctrl + d to open dump_file Microsoft (R) Windows Debugger Version 6.12.0002.633......

xueyuse0012
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部