文档章节

JMS(十一): ActiveMQ + Spring 之两种监听器的实现方案。

berg-dm
 berg-dm
发布于 2016/03/04 21:50
字数 1300
阅读 714
收藏 16


监听器两种实现方案:

    1.    采用原生的JMS的MessageListener

    2.    采用Spring的SessionAwareMessageListener


    1.1 定义接口PersonService  :

public interface PersonService {
	
	public void sendMessage( String message );
	
}

    

    1.2 JMS方案,定义ConsumerMessageListener1,实现的是JMS的接口 MessageListener:

/**
 * 监听器有两种实现方案:
 * 	一种是采用原生的jms的MessageListener
 * 	另一种是采用spring的方案 SessionAwareMessageListener
 * */

public class ConsumerMessageListener1 implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if( message instanceof TextMessage ){
			TextMessage tm = (TextMessage) message;
			try {
				System.out.println( "接收到的是个文本消息: "  + tm.getText() +"\n" );
			} catch (JMSException e) {
				e.printStackTrace();
			}
		} 
	}

}


    1.3 定义类PersonServiceImpl 实现接口PersonService:

@Service("personServiceImpl")
public class PersonServiceImpl implements PersonService {

	private Destination destination;
	private JmsTemplate jmsTemplate;
	
	@Override
	public void sendMessage(final String message) {
		
		System.out.println( "生产者发送消息: " + message );
		jmsTemplate.send(destination,new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				Message msg = session.createTextMessage(message);
				msg.setJMSDeliveryMode( DeliveryMode.NON_PERSISTENT );
				return msg;
			}
		});
	}

	@Resource(name="queueDestination")
	public void setDestination(Destination destination) {
		this.destination = destination;
	}
	
	@Resource(name="jmsTemplate")
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	
}


    1.4 测试类(针对原生方案)Test1:

public class Test1 {

	public static void main(String[] args) {
		ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
		PersonService ps = (PersonService) ac.getBean("personServiceImpl");
		for( int i=0;i<5;i++){
			ps.sendMessage("Hello World " + i );
		}
	}

}


    1.5 Test1的测试结果:

 // Test测试:
     
     生产者发送消息: Hello World 0
    接收到的是个文本消息: Hello World 0
    
    生产者发送消息: Hello World 1
    接收到的是个文本消息: Hello World 1
    
    生产者发送消息: Hello World 2
    接收到的是个文本消息: Hello World 2
    
    生产者发送消息: Hello World 3
    接收到的是个文本消息: Hello World 3
    
    生产者发送消息: Hello World 4
    接收到的是个文本消息: Hello World 4


    2.1 定义ConsumerMessageListener2 ,并且实现由Spring提供的SessionAwareMessageListener ,它可以在回调方法中传入session 以此回送消息到生产者。

/**
 * SessionAwareMessageListener :
 * 				由Spring提供  它可以在回调方法中传入session 以此回送消息到生产者。
 * */
@Component("consumerMessageListener2")
public class ConsumerMessageListener2 implements SessionAwareMessageListener<TextMessage> {
	
	private Destination destination;
	
	@Override
	public void onMessage(TextMessage message, Session session) throws JMSException {
		System.out.println( "接收到消息是个文本消息: " + message.getText()  );
		//通过session创建producer对象 再回送消息。
		//从message中取出消息回送的目的地 以便创建生产者。
		MessageProducer producer = session.createProducer( message.getJMSReplyTo() );
		
		//创建一条消息:
		Message msg = session.createTextMessage( "生产者发过来的消息已经处理完毕..." );
		//发送
		producer.send(msg);
		
	}
	
	@Resource( name="sendQueueDestination")
	public void setDestination(Destination destination) {
		this.destination = destination;
	}

}

    

    2.2 定义PersonServiceImpl2:

@Service("personServiceImpl2")
public class PersonServiceImpl2 implements PersonService {

	private Destination destination; //用于存发送消息的队列
	private JmsTemplate jmsTemplate; //jms操作模板
	private Destination replyDestination; //用于存回复消息的队列
	
	@Override
	public void sendMessage(final String message) {
		System.out.println( "生产者2发送消息:  " + message  );
		jmsTemplate.send(destination,new MessageCreator() {
			
			@Override
			public Message createMessage(Session session) throws JMSException {
				Message msg = session.createTextMessage(message);
				
				//设置回复消息的目的地队列:
				msg.setJMSReplyTo(replyDestination);
				//设置发送的消息类型为非持久化消息:
				msg.setJMSDeliveryMode( DeliveryMode.NON_PERSISTENT );
				
				//创建一个消费者 用于接收对方回复的消息  注意这个消费者监听 replyDestination
				MessageConsumer consumer = session.createConsumer(replyDestination);
				consumer.setMessageListener( new MessageListener() {
					
					@Override
					public void onMessage(Message m) {
						try {
							System.out.println( "接收道的回复消息为:  " + ((TextMessage)m).getText() );
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				});
				return msg;
			}
		});
	}

	@Resource(name="sendQueueDestination")
	public void setDestination(Destination destination) {
		this.destination = destination;
	}

	@Resource(name="jmsTemplate")
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	@Resource(name="replyQueueDestination")
	public void setReplyDestination(Destination replyDestination) {
		this.replyDestination = replyDestination;
	}

	
	
}


    2.3 Test2测试类:

public class Test2 {

	public static void main(String[] args) {
		ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
		PersonService ps = (PersonService) ac.getBean("personServiceImpl2");
		ps.sendMessage("我是生产者2生产的消息 , personServiceImpl2  "  );
	}

}


    2.4 Test2测试结果:

//Test2结果: 
    生产者2发送消息:  我是生产者2生产的消息 , personServiceImpl2  
    接收道的回复消息为:  生产者发过来的消息已经处理完毕...



applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx.xsd">

	<!-- 启用注解解析器 -->
	<context:annotation-config />
	
	<!-- 因为采用了混合解析方式( 有一部分配置在xml中,有一部分在java类中,所以要让spring的注解解析器去扫描包 -->
	<context:component-scan base-package="com.xj.jms4" />
	
	<!-- 启用aspectj的注解方式的代理 -->
	<aop:aspectj-autoproxy />


	<!-- 创建一个真正的基于 jms提供者的联接工厂 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
	</bean>

	<!-- ActiveMQ联接池的方案 -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
		<property name="connectionFactory" ref="targetConnectionFactory" />
		<property name="maxConnections" value="100" />
	</bean>

	<!-- 创建spring联接工厂 -->
	<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
		<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
	</bean>

	<!-- <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" 
		<property name="targetConnectionFactory" ref="pooledConnectionFactory" /> 
		</bean> -->

	<!-- 配置jmsTemplate -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="singleConnectionFactory" />
	</bean>

	<!-- 配置目的地: 这有两种:一种是 Queue对应是 P2P模式,另一种是 Topic 对应的是 发布/订阅模式, -->
	<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>myqueue</value>
		</constructor-arg>
	</bean>

	<!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> 
		<constructor-arg> <value>mytopic</value> </constructor-arg> </bean> -->


	<!-- 配置临听器 -->
	<bean id="consumerMessageListener1" class="com.xj.jms4.ConsumerMessageListener1">
	
	</bean>

	<!-- 配置临听器运行时所在的容器 -->
	<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="singleConnectionFactory" />
		<property name="destination" ref="queueDestination" />
		<property name="messageListener" ref="consumerMessageListener1" />
	</bean>



	<!-- 以下用于存放生产者发送的信息 -->
	<bean id="sendQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>sendQueue</value>
		</constructor-arg>
	</bean>
	
	<!--  以下用于存放消费者回复的信息  -->
	<bean id="replyQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg>
			<value>replyQueue</value>
		</constructor-arg>
	</bean>
	
	<!-- 配置   sessionAware的临听器 -->
	<bean id="consumerMessageListener2" class="com.xj.jms4.ConsumerMessageListener2" >
		<property name="destination" ref="sendQueueDestination" />
		<!--  <property name="replyDestination" ref="replyQueueDestination" /> -->
	</bean>	
	  
	<!-- 配置  consumerMessageListener2的容器 -->
	<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
			<property name="connectionFactory" ref="singleConnectionFactory" />
			<property name="destination" ref="sendQueueDestination" />
			<property name="messageListener" ref="consumerMessageListener2" />
	</bean>

</beans>


最后是导入的包(....):



© 著作权归作者所有

berg-dm
粉丝 26
博文 98
码字总数 88970
作品 0
深圳
程序员
私信 提问
一步一步Spring整合JMS

1.1 JMS简介 JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话...

摆渡者
2015/08/31
424
0
ActiveMQ结合Spring收发消息

原文出处:zylebron ActiveMQ 结合 Spring 收发消息 直接使用 ActiveMQ 的方式需要重复写很多代码,且不利于管理,Spring 提供了一种更加简便的方式————Spring JMS ,通过它可以更加方便...

zy_lebron
2018/10/09
0
0
spring整合jms系列之----点对点(一)

JMS作为一个支持点对点(PTP)和订阅式(pub/sub)式的消息中间件,为很多项目开发者所使用。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务,由于JMS对Spring的支持...

码上中国博客
2015/11/12
510
0
消息中间件系列三、JMS和activeMQ的简单使用

一、JMS 1、什么是JMS   JMS(JAVA Message Service,java消息服务)本质是API,Java平台消息中间件的规范,java应用程序之间进行消息交换。并且通过提供标准的产生、发送、接收消息的接口简...

我巴巴
2018/10/06
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
2018/06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

好程序员大数据教程Scala系列之样例类_Option_偏函数

  好程序员大数据教程Scala系列之样例类_Option_偏函数,在Scala中Option类型样例类用来表示可能存在或也可能不存在的值(Option的子类有Some和None)。Some包装了某个值,None表示没有值。 ...

好程序员官网
13分钟前
2
0
zk中ServerCnxnFactory连接管理工厂

作为ServerCnxn的工厂抽象类 属性 ZOOKEEPER_SERVER_CNXN_FACTORY zookeeper.serverCnxnFactory secure 在ServerCnxnFactory中SSL是否启用 sessionMap session管理配置中信息(sessionId,Ser......

writeademo
14分钟前
2
0
【代码审计01】几种常见的漏洞种类以及代码审计工具

前言 代码审计是在经过黑盒测试完毕,也就是检查应用的基本功能是否符合产品业务需求下进行的。需要有一定的编码基础以及对漏洞形成原理的基本认知,通过工具或者经验检测中代码中可能出现的...

北桥苏
15分钟前
3
0
重磅发布 | 全球首个云原生应用标准定义与架构模型 OAM 正式开源

作者: OAM 项目负责人 导读:2019 年 10 月 17 日,阿里巴巴合伙人、阿里云智能基础产品事业部总经理蒋江伟(花名:小邪)在 Qcon 上海重磅宣布,阿里云与微软联合推出开放应用模型 Open A...

阿里巴巴云原生
17分钟前
3
0
【进阶之定义函数】一个查询树结构数据的集合

1、基本定义 delimiter 自定义符号  -- 如果函数体只有一条语句, begin和end可以省略, 同时delimiter也可以省略create function 函数名(形参列表) returns 返回类型  -- 注意是retru...

卯金刀GG
23分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部