文档章节

ActiveMQ与Spring整合

嘻哈开发者
 嘻哈开发者
发布于 2016/01/02 00:24
字数 1777
阅读 81
收藏 1

「深度学习福利」大神带你进阶工程师,立即查看>>>

ActiveMQ 是Apache出品, 是最流行和最强大的开源消息总线。 同时完全支持 JMS 1.1和J2EE 1.4规范。

ActiveMQ 特性

  1. 支持多种编程语言和协议编写客户端。
  2. 在JMS客户端和消息代理完全支持企业集成模式
  3. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。
  4. 对Spring的支持, ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
  5. 测试通过常见的J2EE服务器。如:Geronimo, JBoss 4, GlassFish, WebLogic。
  6. 支持多种传送协议:in-VM, TCP, SSL, NIO, UDP, JGroups,JXTA。
  7. 支持通过JDBC和journal提供高速的消息持久化。
  8. 从设计上保证了高性能的集群,客户端-服务器,点对点。
  9. 支持Ajax
  10. 支持与Axis的整合
  11. 可以很容易得调用内嵌JMS provider,进行测试。

通过网络搜索及个人理解,整理出ActiveMQ与Spring整合的文章。

相关jar包

activemq-core-5.5.1.jar   
geronimo-j2ee-management_1.1_spec-1.0.1.jar   
geronimo-jms_1.1_spec-1.1.1.jar   
geronimo-jta_1.0.1B_spec-1.0.1.jar   
kahadb-5.5.1.jar   
xbean-spring-3.7.jar   
commons-beanutils.jar   
commons-codec.jar   
commons-collections.jar   
commons-fileupload.jar   
commons-httpclient.jar   
commons-io.jar   
commons-lang.jar   
commons-logging.jar   
commons-validator.jar   
dom4j-1.6.1.jar   
javaee.jar   
jsf-api.jar   
jsf-impl.jar   
jstl.jar   
log4j-1.2.15.jar   
slf4j-api-1.5.8.jar   
slf4j-log4j12-1.5.8.jar   
aspectjrt.jar   
aspectjweaver.jar   
spring.jar

spring.xml

<?xml version="1.0" encoding="UTF-8"?>   
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
       xmlns:p="http://www.springframework.org/schema/p"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"   
       xmlns:util="http://www.springframework.org/schema/util"   
       xmlns:jee="http://www.springframework.org/schema/jee"   
       xmlns:amq="http://activemq.apache.org/schema/core"   
       xsi:schemaLocation="http://www.springframework.org/schema/beans   
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd   
       http://www.springframework.org/schema/tx   
       http://www.springframework.org/schema/tx/spring-tx-2.5.xsd   
       http://www.springframework.org/schema/aop   
       http://www.springframework.org/schema/aop/spring-aop-2.5.xsd   
       http://www.springframework.org/schema/context   
       http://www.springframework.org/schema/context/spring-context-2.5.xsd   
       http://www.springframework.org/schema/util   
       http://www.springframework.org/schema/util/spring-util-2.5.xsd   
       http://www.springframework.org/schema/jee   
       http://www.springframework.org/schema/jee/spring-jee-2.5.xsd   
       http://activemq.apache.org/schema/core   
       http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">   
   
  <!-- Annotation Config -->   
  <context:annotation-config/>   
   
  <!-- Compoent Scan -->   
  <context:component-scan base-package="com.uu.web.*"/>   
   
  <!-- Property Placeholder -->   
  <context:property-placeholder location="classpath:config.properties"/>   
       
  <!-- Aop Config -->   
  <aop:aspectj-autoproxy/>   
  <!--    
  <bean id="mysql-ds"  destroy-method="close">   
    <property name="driverClassName" value="${dirver}"/>   
    <property name="url" value="${url}"/>   
    <property name="username" value="${username}"/>   
    <property name="password" value="${password}"/>   
  </bean>   
  -->   
  <!--    
    使用spring的listenerContainer 消息用持久化保存,服务器重启不会丢失   
    也可以配置在${ACTIVEMQ_HOME}/conf/activemq.xml内   
    消息的保存方式文件持久化和数据库持久化   
    此配置是文件持久化   
  -->   
  <!-- Embedded ActiveMQ Broker -->   
  <amq:broker useJmx="false" persistent="true">   
    <amq:persistenceAdapter>   
      <amq:amqPersistenceAdapter directory="G:/amq"/>   
      <!-- 使用数据库持久化 -->   
      <!--<amq:jdbcPersistenceAdapter dataSource="#mysql-ds" />-->   
    </amq:persistenceAdapter>   
    <amq:transportConnectors>   
      <amq:transportConnector uri="tcp://localhost:61616" />   
    </amq:transportConnectors>   
  </amq:broker>   
       
  <!--ActiveMQ connectionFactory  -->   
  <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />   
       
  <!-- ActiveMQ destinations  -->   
  <!--    
    TOPIC:发布订阅消息   
        无状态,不保证每条消息被消费   
        只有监听该TOPIC地址才能收到消息并消费,否则该消息将会丢失   
        一对多的发布接受策略,可以同时消费多个消息   
  -->   
  <amq:topic name="TOPIC" physicalName="JMS-TOPIC" />   
   
    <!--   
    QUEUE: 点对点   
         消息数据被持久化,每条消息都能被消费   
         没有监听QUEUE地址也能被消费,数据不会丢失   
         一对一的发布接受策略,保证数据完整   
    -->   
    <amq:queue name="QUEUE" physicalName="JMS-QUEUE" />   
       
  <!-- ConnectionFactory -->   
  <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">   
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>   
  </bean>   
       
  <!-- 添加事务 -->   
  <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager" >   
    <property name="connectionFactory" ref="singleConnectionFactory"/>   
  </bean>   
       
  <!--  Spring JmsTemplate config -->   
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">   
    <!--  lets wrap in a pool to avoid creating a connection per send -->   
    <property name="connectionFactory" ref="singleConnectionFactory"/>   
    <!-- custom MessageConverter -->   
    <property name="messageConverter" ref="defaultMessageConverter" />   
    <property name="sessionTransacted" value="true"/>   
  </bean>   
   
  <!-- converter  -->   
  <bean id="defaultMessageConverter" class="com.uu.activemq.DefaultMessageConverter" />   
       
  <!-- 生产消息配置 -->   
  <!-- POJO which send Message uses  Spring JmsTemplate -->   
  <bean id="topicMessageProducer" class="com.uu.activemq.TopicMessageProducer">   
    <property name="template" ref="jmsTemplate" />   
    <property name="destination" ref="TOPIC" />   
  </bean>   
  <bean id="queueMessageProducer" class="com.uu.activemq.QueueMessageProducer">   
    <property name="template" ref="jmsTemplate" />   
    <property name="destination" ref="QUEUE" />   
  </bean>   
       
  <!-- 消费消息 配置 -->   
  <!--  Message Driven POJO (MDP) -->   
  <!-- consumer1 for topic -->   
  <bean id="topicConsumer" class="com.uu.activemq.TopicConsumer" />   
   
  <!-- consumer for queue -->   
  <bean id="queueConsumer" class="com.uu.activemq.QueueConsumer" />   
       
  <!-- Message Listener for  -->   
  <bean id="topicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">   
    <constructor-arg ref="topicConsumer" />   
    <!--  指定消费消息的方法 -->   
    <property name="defaultListenerMethod" value="receive" />   
    <!-- custom MessageConverter define -->   
    <property name="messageConverter" ref="defaultMessageConverter" />   
  </bean>   
  <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">   
    <constructor-arg ref="queueConsumer" />   
    <!--  指定消费消息的方法 -->   
    <property name="defaultListenerMethod" value="receive" />   
    <!-- custom MessageConverter define -->   
    <property name="messageConverter" ref="defaultMessageConverter" />   
  </bean>   
       
  <!--  listener container,MDP无需实现接口 -->   
  <bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">   
    <property name="connectionFactory" ref="jmsConnectionFactory" />   
    <property name="destination" ref="TOPIC" />   
    <property name="messageListener" ref="topicListener" />   
  </bean>   
  <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">   
    <property name="connectionFactory" ref="jmsConnectionFactory" />   
    <property name="destination" ref="QUEUE" />   
    <property name="messageListener" ref="queueListener" />   
    <property name="transactionManager" ref="jmsTransactionManager"/>   
    <property name="sessionTransacted" value="true"/>   
    <property name="concurrentConsumers" value="5"/>   
  </bean>   
</beans>

消息转换类

DefaultMessageConverter.java

1 package com.uu.activemq;   
 2    
 3 import java.io.ByteArrayInputStream;   
 4 import java.io.ByteArrayOutputStream;   
 5 import java.io.IOException;   
 6 import java.io.ObjectInputStream;   
 7 import java.io.ObjectOutputStream;   
 8 import java.util.HashMap;   
 9    
10 import javax.jms.JMSException;   
11 import javax.jms.Message;   
12 import javax.jms.ObjectMessage;   
13 import javax.jms.Session;   
14    
15 import org.apache.activemq.command.ActiveMQObjectMessage;   
16 import org.apache.commons.logging.Log;   
17 import org.apache.commons.logging.LogFactory;   
18 import org.springframework.jms.support.converter.MessageConverter;   
19    
20 /**   
21  * 消息转换类   
22  */   
23 @SuppressWarnings("unchecked")   
24 public class DefaultMessageConverter implements MessageConverter {   
25        
26     private static final Log log = LogFactory.getLog(DefaultMessageConverter.class);   
27    
28     public Message toMessage(Object obj, Session session) throws JMSException {   
29         if (log.isDebugEnabled()) {   
30             log.debug("toMessage(Object, Session) - start");   
31         }   
32    
33         // check Type   
34         ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();   
35         HashMap<String, byte[]> map = new HashMap<String, byte[]>();   
36         try {   
37             // POJO must implements Seralizable   
38             ByteArrayOutputStream bos = new ByteArrayOutputStream();   
39             ObjectOutputStream oos = new ObjectOutputStream(bos);   
40             oos.writeObject(obj);   
41             map.put("POJO", bos.toByteArray());   
42             objMsg.setObjectProperty("Map", map);   
43    
44         } catch (IOException e) {   
45             log.error("toMessage(Object, Session)", e);   
46         }   
47         return objMsg;   
48     }   
49    
50        
51     public Object fromMessage(Message msg) throws JMSException {   
52         if (log.isDebugEnabled()) {   
53             log.debug("fromMessage(Message) - start");   
54         }   
55    
56         if (msg instanceof ObjectMessage) {   
57             HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");   
58             try {   
59                 // POJO must implements Seralizable   
60                 ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO"));   
61                 ObjectInputStream ois = new ObjectInputStream(bis);   
62                 Object returnObject = ois.readObject();   
63                 return returnObject;   
64             } catch (IOException e) {   
65                 log.error("fromMessage(Message)", e);   
66    
67             } catch (ClassNotFoundException e) {   
68                 log.error("fromMessage(Message)", e);   
69             }   
70    
71             return null;   
72         } else {   
73             throw new JMSException("Msg:[" + msg + "] is not Map");   
74         }   
75     }   
76 }

接受Queue方式消费

QueueConsumer.java

1 package com.uu.activemq;   
 2    
 3 import java.util.Map;   
 4 import java.util.Set;   
 5    
 6 public class QueueConsumer   
 7 {   
 8     public void receive(Map<String, Object> message)   
 9     {   
10         Set<String> set = message.keySet();   
11         String str = "";   
12         for(String key : set)   
13         {   
14             str += key + "_" + message.get(key) + ":Queue";   
15         }   
16         System.out.println(str);   
17     }   
18 }

接受Topic方式消费

TopicConsumer.java

1 package com.uu.activemq;   
 2    
 3 import java.util.Map;   
 4 import java.util.Set;   
 5 import java.util.concurrent.ExecutorService;   
 6 import java.util.concurrent.Executors;   
 7    
 8 /**   
 9  * 创建10个线程池   
10  * 1.使用线程异步消息处理   
11  * 2.不使用线程,那么消息等待上一个消息处理完成后才继续   
12  * 注:如果担心据同步问题,那么使用第2种方法   
13  */   
14 public class TopicConsumer   
15 {   
16     protected static ExecutorService exec = Executors.newFixedThreadPool(10);   
17        
18     public void receive(Map<String, Object> message)   
19     {   
20         /*exec.submit(new Runnable(){   
21                
22             public void run()   
23             {   
24                    
25             }   
26         });*/   
27         Set<String> set = message.keySet();   
28         String str = "";   
29         for(String key : set)   
30         {   
31             str += key + "_" + message.get(key) + ":Topic";   
32         }   
33         System.out.println(str);   
34     }   
35 }

生产Queue方式的消息

QueueMessageProducer.java

1 package com.uu.activemq;   
 2    
 3 import java.util.Map;   
 4    
 5 import javax.jms.Queue;   
 6    
 7 import org.springframework.jms.core.JmsTemplate;   
 8    
 9 public class QueueMessageProducer {   
10    
11     private JmsTemplate template;   
12    
13     private Queue destination;   
14    
15     public void setTemplate(JmsTemplate template) {   
16         this.template = template;   
17     }   
18    
19     public void setDestination(Queue destination) {   
20         this.destination = destination;   
21     }   
22    
23     public void send(Map<String,Object> message) {   
24         template.convertAndSend(this.destination, message);   
25     }   
26    
27 }

生产Topic方式的消息

TopicMessageProducer.java

1 package com.uu.activemq;   
 2    
 3 import java.util.Map;   
 4    
 5 import javax.jms.Topic;   
 6    
 7 import org.springframework.jms.core.JmsTemplate;   
 8    
 9 public class TopicMessageProducer {   
10        
11     private JmsTemplate template;   
12    
13     private Topic destination;   
14    
15     public void setTemplate(JmsTemplate template) {   
16         this.template = template;   
17     }   
18    
19     public void setDestination(Topic destination) {   
20         this.destination = destination;   
21     }   
22    
23     public void send(Map<String,Object> message) {   
24         template.convertAndSend(this.destination, message);   
25     }   
26 }

写一个测试类

1 package com.uu.activemq;   
 2    
 3 import java.util.LinkedHashMap;   
 4 import java.util.Map;   
 5    
 6 import org.springframework.context.ApplicationContext;   
 7 import org.springframework.context.support.FileSystemXmlApplicationContext;   
 8    
 9 /**   
10  * 解决耗时的数据操作   
11  * 发送消息不等待返回,继续执行   
12  */   
13 public class MainTest   
14 {   
15     public static void main(String[] args)   
16     {   
17         ApplicationContext wac = new FileSystemXmlApplicationContext("classpath:spring.xml");   
18            
19         TopicMessageProducer topicMessageProducer  = (TopicMessageProducer) wac.getBean("topicMessageProducer");   
20            
21         QueueMessageProducer queueMessageProducer  = (QueueMessageProducer) wac.getBean("queueMessageProducer");   
22            
23         Map<String, Object> message = new LinkedHashMap<String, Object>();   
24         message.put("test", "ActiveMQ");   
25            
26         queueMessageProducer.send(message);   
27         topicMessageProducer.send(message);   
28         System.out.println("完成");   
29     }   
30 }

结果先输出完成然后输出传递的参数

完成   
test_ActiveMQ:Queue   
test_ActiveMQ:Topic
嘻哈开发者
粉丝 57
博文 125
码字总数 1563
作品 0
广州
程序员
私信 提问
加载中
请先登录后再评论。
高效 Java Web 开发框架--JessMA

JessMA 是功能完备的高性能 Full-Stack Web 应用开发框架,内置可扩展的 MVC Web 基础架构和 DAO 数据库访问组件(内部已提供了 Hibernate、MyBatis 与 JDBC DAO 组件),集成了 Action 拦截...

伤神小怪兽
2012/11/13
9.2K
3
eternal mvc 框架--eternal

spring在设计上真博大精深啊,无论从哪点来说,都非常不错,当然啦大家一直说他的配置麻烦,这应该是人家的设计思想吧,把每个类都看成一个bean,以xml的方式配置关系,就像java的面向对象,...

青青小树
2013/01/09
1.3K
0
疯狂Spring Cloud连载(29)微服务跟踪概述

本文节选自《疯狂Spring Cloud微服务架构实战》 京东购买地址:https://item.jd.com/12256011.html 当当网购买地址:http://product.dangdang.com/25201393.html Spring Cloud教学视频:htt...

杨大仙的程序空间
2018/01/09
542
0
Spring Framework 4.2 中的新功能和增强功能

本文同不至 至今为止,Spring Framework 的最新版本为 4.2.1.RELEASE。那么 Spring Framework 4.2 中的又有哪些新功能和增强功能呢? 核心容器改进 如 @bean 注释,就如同得到发现和处理 Java...

waylau
2015/10/07
2.4K
3
J360-cloud SpringCloud系列一:分布式配置服务器ConfigServer

j360开源博客之 ---------------------------------------------------------- J360-Cloud系列 spring-cloud快速入门工程之j360-cloud-all:(欢迎star、fork) https://github.com/xuminwl......

Hi徐敏
2015/10/08
4K
14

没有更多内容

加载失败,请刷新页面

加载更多

SwaggerUI看烦了,IGeekFan.AspNetCore.Knife4jUI 帮你换个新皮肤

背景 好像是上周四,看到微信群有人说java有轮子swagger-bootstrap-ui,而c#,就是找不到。 于是我一看,就说大话:“这个只是一套UI,他这个有开源地址么” 被@at说:你试试... 当天晚上就把...

张善友
今天
6
0
字符串驻留是个什么鬼!

先看下面这段代码 >>> a = 'apple' >>> b = 'apple' >>> a is b True 返回结果看起来没毛病 再来看这段: >>> a = 'apple!' >>> b = 'apple!' >>> a is b False 这个结果是不是有点诡异 前后......

谭庆波
昨天
14
0
golang包的日常(2)——log包

普通使用 log包定义了Logger类型,该类型提供了一些格式化输出的方法。 log包中预定义了一个标准logger对象std,使我们可以直接调用函数来打印日志,默认是标准错误输出(Stderr),打印到终...

osc_dg21zk4i
22分钟前
0
0
近10年全国高校双青人才排名TOP50,苏州大学表现抢眼

转眼又到了每年的杰青和优青项目评选的日子,作为高校人才水平的重要评判标准,每年高校的自然科学基金项目数都是一个非常重要的指标。当然,由于杰青每年只有200个名额,而国内有3000多所高...

科研菌
昨天
0
0
意派Epub360丨从制作到推广,H5页面如何刷屏?

不是所有H5最终都可以引爆朋友圈,也不是所有未能刷屏的H5都不够优秀。如何找到爆点、如何制作实现、如何传播转化……每一个要点都可能是影响刷屏的关键。即使是再资深的营销专家,也很难在H...

牙牙乐牙牙乐
22分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部