spring集成activemq
spring集成activemq
尚浩宇 发表于2年前
spring集成activemq
  • 发表于 2年前
  • 阅读 105
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: spring集成activemq

两个项目,一个生产者一个消费者,这里只贴出关键代码(队列模式和订阅模式),文章最后会附上项目地址,有需要的可以自行下载。项目访问地址http://localhost:8080/activemq-producer/Test

生产者

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<!-- JMS消息服务器的IP和端口号 -->
				<property name="brokerURL">
					<value>tcp://127.0.0.1:61616</value>
				</property>
				<!-- 是否异步发送 -->  
                <property name="useAsyncSend" value="true" />
			</bean>
		</property>
		<property name="maxConnections" value="100" />
		<!-- 超时设置 -->
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>	
	<!-- 发送消息的目的地(一个主题) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  	
	<!-- 队列Template -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory">
			<ref local="jmsFactory" />
		</property>
		<!-- 默认队列名称 -->
		<property name="defaultDestinationName" value="sams" />
		<!-- 区别它采用的模式为false是p2p, true是订阅 -->
		<!-- <property name="pubSubDomain" value="false" /> -->
	</bean>	
	<!-- 订阅Template -->
	<bean id="topicjmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 订阅发布模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
	</bean>	
	<!-- 短信队列 -->
	<bean id="sams" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams" />
	</bean>
	<!-- 消息发送者 -->
	<bean id="QueueSender" class="com.producer.QueueSender">
		<constructor-arg ref="sams" />
		<property name="jmsTemplate" ref="jmsTemplate" />
	</bean>

queues.java

public class QueueSender {
	private Destination destination;
	private JmsTemplate jmsTemplate;

	public QueueSender(ActiveMQQueue queue) {
		this.destination = queue;
	}
	public void send(final String message) throws JMSException {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
}

Test.java

/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	private static final Logger LOGGER=Logger.getLogger(Test.class);
    /**
     * Default constructor. 
     */
    public Test() {
        // TODO Auto-generated constructor stub
    }
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doPost(request, response);
	}
	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		ApplicationContext ac1 = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
		JmsTemplate jmsTemplate=(JmsTemplate) ac1.getBean("jmsTemplate");
		jmsTemplate.send(new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message~!");   
			}
		});
		jmsTemplate.send("callcenter",new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage("send a message to callcenter~!");   
			}
		});
		JmsTemplate topicjmsTemplate=(JmsTemplate) ac1.getBean("topicjmsTemplate");
		topicjmsTemplate.send(new MessageCreator() {  
            public Message createMessage(Session session) throws JMSException {  
                TextMessage msg = session.createTextMessage();  
                msg.setStringProperty("phrCode", "C001");  
                msg.setText("Hello World!");  
                return msg;  
            }  
        });  
	}

}

消费者项目

<!-- 1. 配置connectionFactory -->
	<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory">
			<bean class="org.apache.activemq.ActiveMQConnectionFactory">
				<property name="brokerURL">
					<!-- JMS消息服务器的IP和端口号 -->
					<value>tcp://127.0.0.1:61616</value>
				</property>
			</bean>
		</property>
		<property name="maxConnections" value="100"/>
		<!-- <property name="idleTimeout" value="60"/> -->
	</bean>
	<!-- 来自短信系统的消息 -->
	<bean id="names" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="sams,callcenter" />
	</bean>
	 <!-- 订阅消息 -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 设置消息主题的名字 -->  
        <constructor-arg index="0" value="Online.Notice.Topic" />  
    </bean>  
	<!-- 队列监听器 -->
	<bean id="QueueReceiverListener" class="com.customer.QueueReceiverListener">
	</bean>
	<!-- 队列监听器的容器 -->
	<bean id="QueueReceiverContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="concurrentConsumers" value="2" />
		<property name="connectionFactory" ref="jmsFactory" />
		<!-- 监听的队列 -->
		<property name="destination" ref="names" />
		<property name="messageListener" ref="QueueReceiverListener" />
		<property name="pubSubNoLocal" value="false"/>
	</bean>
	<!-- 生产消息配置 (自己定义)-->  
    <bean id="myTopicConsumer" class="com.customer.SimpleJMSReceiver" />  
	<!-- 消息监听器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收消息的方法名称 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不进行消息转换 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
	<!-- 订阅监听器的容器 -->
	<bean id="myListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsFactory" />  
        <!-- 发布订阅模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 消息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10000"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="client_119" />  
        <property name="durableSubscriptionName" value="client_119"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" /> 
	</bean>

QueueReceiverListener.java

public class QueueReceiverListener implements SessionAwareMessageListener {
	private static final Logger LOGGER = Logger.getLogger(QueueReceiverListener.class);

	public void onMessage(Message message, Session session) throws JMSException {
				TextMessage msg = (TextMessage) message;			
				try {
					Thread.currentThread().sleep(5000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
	}
}

SimpleJMSReceiver.java

public class SimpleJMSReceiver {
	public void receive(TextMessage message) throws JmsException, JMSException {  
        System.out.println(message.getStringProperty("phrCode"));  
        System.out.println(message.getText());  
    } 
}

pom.xml两个项目都是一样的

<!-- 属性配置 -->
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>3.1.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>javax.annotation</groupId>
			<artifactId>jsr250-api</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.9.0</version>
		</dependency>
		<!-- 添加Spring依赖 -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>3.1.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-web</artifactId>
			<version>4.0.6.RELEASE</version>
		</dependency>
	</dependencies>

下面是输出信息:

17:18:39,725  INFO Test:45 - --------发送队列消息开始--------
17:18:39,725 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'jmsTemplate'
17:18:39,979 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSess
17:18:40,029  INFO QueueReceiverListener:16 - -------接收消息开始---------
17:18:40,030 DEBUG QueueReceiverListener:20 - QueueReceiverListener接收到报文:send a message~!
17:18:40,030 DEBUG QueueReceiverListener:22 - QueueReceiverListener接收到报文:send a message~!

17:18:40,046 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,047  INFO Test:60 - -------发送队列消息结束---------
17:18:40,047  INFO Test:61 - -------发送订阅消息开始---------
17:18:40,047 DEBUG DefaultListableBeanFactory:245 - Returning cached instance of singleton bean 'topicjmsTemplate'
17:18:40,057 DEBUG JmsTemplate:464 - Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:shy-PC-55695-1440407919807-1:3:1,started=false} }
17:18:40,058 DEBUG JmsTemplate:567 - Sending created message: ActiveMQTextMessage {commandId = 0, re
17:18:40,059  INFO Test:73 - -------发送订阅消息结束---------
17:18:40,061 DEBUG DefaultMessageListenerContainer:313 - Received message of type [class org.apache.
C001
Hello World!

项目下载地址:http://pan.baidu.com/s/1o6DvqrC

共有 人打赏支持
粉丝 46
博文 116
码字总数 93241
作品 1
×
尚浩宇
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: