文档章节

ActiveMQ与Spring

giianhui
 giianhui
发布于 2016/05/27 11:20
字数 1834
阅读 13
收藏 0

项目的后台要求在更改密码后发送邮件通知用户,为了避免发送邮件时程序对用户操作的阻塞,之前中文版中使用了线程来发送邮件,而在英文版中,我决定使用 JMS来异步发送邮件,让用户更改密码的操作和发送邮件的操作更进一步解耦,也在实际环境中试试JMS。

  我们的环境是Spring 2.5, Tomcat 5.5,使用ActiveMQ 来 实现JMS传送和接收。

  首先,我们在Spring中加入ActiveMQ Broker的配置:


     <bean id="broker"
        class="org.apache.activemq .xbean.BrokerFactoryBean">
        <property name="config"
            value="classpath:activemq .xml" />
        <property name="start"
            value="true" />
    </bean>

  我们在此处配置了BrokerFactoryBean,此Bean实现在Spring中配置嵌入式Broker,并且支持XBean方式的配 置。Broker的配置文件由config属性指定,此处定义配置文件位于classpath中的activemq .xml。


  接下来我们需要创建Broker的配置文件activemq .xml。其实我们 不需要从头配置,展开ActiveMQ 的jar包,在org.apache.activemq .xbean下,就有一个activemq .xml, 我们将其拷贝到WEB-INF/classes/目录下,并进行修改。

下面是activemq .xml的内容:


<beans>
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

    <broker useJmx="false"
        persistent="false"
        xmlns="http://activemq .apache.org/schema/core">

        <transportconnectors>
            <transportconnector uri="tcp://localhost:61636" />
        </transportconnectors>

        <networkconnectors></networkconnectors>
    </broker>
</beans>

  在broker中,我们指定了不开启JMX,并且不使用持久化(persistent=”false”)。
  如果不对消息进行持久化存储,在容器或者JVM关闭、重启,或者崩溃后,所有的消息都将丢失,在我们的业务中,对于发送密码更改通知邮件,并 非是重要的功能,所以我们选择不使用持久化存储,但对于不同业务逻辑,可能会需要进行持久化存储。ActiveMQ 提 供的持久化存储方案可以将消息存储到文件系统、数据库等。

  要在Broker中开启持久化存储,需要设置persistent为true,并且对其子节点persistenceAdapter, journaledJDBC进行配置。ActiveMQ jar包中的activemq .xml有被注释掉的示例,可以参考。

  接着我们在Spring中配置JMS Connection Factory。


     <bean id="jmsFactory"
        class="org.apache.activemq .ActiveMQConnectionFactory">
        <property name="brokerURL"
            value="tcp://localhost:61636" />
    </bean>

  注意其中的borkerURL,应该是你在activemq .xml中 transportconnector节点的uri属性,这表示JMS Server的监听地址。

  配置消息发送目的地:


     <bean id="topicDestination"
        class="org.apache.activemq .command.ActiveMQTopic">
        <constructor -arg value="MY.topic" />
    </bean>

    <bean id="queueDestination"
        class="org.apache.activemq .command.ActiveMQQueue">
        <constructor -arg value="MY.queue" />
    </bean>

  在JMS中,目的地有两种:主题(topic)和队列(queue)。两者的区别是:当一个主题目的地中被放入了一个消息后,所有的订阅者都 会收到通知;而对于队列,仅有一个“订阅者”会收到这个消息,队列中的消息一旦被处理,就不会存在于队列中。显然,对于邮件发送程序来说,使用队列才是正 确的选择,而使用主题时,可能会发送多封相同的邮件。
  Topic和Queue只是JMS内部以及其处理方式的不同,对于消息发送方和接收方来说,完全没有代码上的区别。

  配置Spring中消息发送的JMS Template:


     <bean id="producerJmsTemplate"
        class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory"
                    ref="jmsFactory" />
            </bean>
        </property>
        <property name="defaultDestination"
            ref="queueDestination" />
        <property name="messageConverter"
            ref="userMessageConverter" />
    </bean>

  注意此处的defaultDestination使用的是基于Queue的目的地。

  在实际的消息发送中,邮件内容需要用到User.username, User.password, User.email, User.fullname,显示如果我们直接发送User对象到消息队列,接收的时候也能直接取出User对象,那么在邮件发送程序中操作就会方便许 多,所以在些处,我们定义了messageConverter属性,他指定了发送消息时使用的消息转换bean,这样,在直接发送User到JMS队列 时,Spring会自动帮我们进行转换,下面是Converter的配置和代码:


     <bean id="userMessageConverter"
        class="com.tiandinet.jms.sample.UserMessageConverter" />

  此转换器同样也会使用在消息接收中,将接收到的消息转换为User对象。

package com.tiandinet.jms.sample;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.support.converter.MessageConverter;
import com.tiandinet.jms.sample.User;
/**
* Converte User message.
*
* @author Yangtze
*/
public class UserMessageConverter implements MessageConverter {
    private static transient Log logger = LogFactory.getLog(UserMessageConverter.class);
    /**
     * {@inheritDoc}
     *
     * @see org.springframework.jms.support.converter.MessageConverter
     * #fromMessage(javax.jms.Message)
     */
    public Object fromMessage(Message message) throws JMSException {
        if (logger.isDebugEnabled()) {
            logger.debug("Receive JMS message: " + message);
        }
        if (message instanceof ObjectMessage) {
            ObjectMessage oMsg = (ObjectMessage) message;
            if (oMsg instanceof ActiveMQObjectMessage) {
                ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) oMsg;
                try {
                    User user = (User) aMsg.getObject();
                    return user;
                } catch (Exception e) {
                    logger.error("Message:[" + message + "] is not a instance of User.");
                    throw new JMSException("Message:[" + message + "] is not a instance of User.");
                }
            } else {
                logger.error("Message:[" + message + "] is not "
    + "a instance of ActiveMQObjectMessage[User].");
                throw new JMSException("Message:[" + message + "] is not "
    + "a instance of ActiveMQObjectMessage[User].");
            }
        } else {
            logger.error("Message:[" + message + "] is not a instance of ObjectMessage.");
            throw new JMSException("Message:[" + message + "] is not a instance of ObjectMessage.");
        }
    }
    /**
     * {@inheritDoc}
     *
     * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object,
     *      javax.jms.Session)
     */
    public Message toMessage(Object obj, Session session) throws JMSException {
        if (logger.isDebugEnabled()) {
            logger.debug("Convert User object to JMS message: " + obj);
        }
        if (obj instanceof User) {
            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
            msg.setObject((User) obj);
            return msg;
        } else {
            logger.error("Object:[" + obj + "] is not a instance of User.");
            throw new JMSException("Object:[" + obj + "] is not a instance of User.");
        }
    }
}


  此程序实现了MessageConverter接口,并实现其中的fromMessage和toMessage方法,分别实现转换接收到的消 息为User对象和转换User对象到消息。
我们在程序中使用的是ActiveMQObjectMessage,它是 ActiveMQ 中 对javax.jms.ObjectMessage的一个实现。

  此时,我们已经完成了JMS Connection Factory和用于发送JMS消息的JMS Template配置,接下来,应该编写发送消息的Bean了,代码如下:
package com.tiandinet.jms.sample;
import org.springframework.jms.core.JmsTemplate;
import com.tiandinet.jms.sample.User;
/**
* Send user's login information mail via JMS.
*
* @author Yangtze
*/
public class UserMessageProducerImpl implements IUserMessageProducer {
    private JmsTemplate jmsTemplate;
    /**
     * {@inheritDoc}
     *
     * @see com.tiandinet.jms.sample.IUserMessageProducer
     * #sendUserLoginInformationMail(com.tiandinet.jms.sample.User)
     */
    public void sendUserLoginInformationMail(User user) {
        getJmsTemplate().convertAndSend(user);
    }
    /**
     * Return the jmsTemplate.
     *
     * @return the jmsTemplate
     */
    public final JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }
    /**
     * Set the jmsTemplate.
     *
     * @param jmsTemplate
     *            the jmsTemplate to set
     */
    public final void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
}

  代码很简单,sendUserLoginInformationMail方法是唯一我们需要编写的,调用JMSTemplate的 convertAndSend方法,Spring会自己调用我们之前配置的converter来转换我们发送的User对象。
  将此Java在Spring中进行配置,然后在Controller中进行调用即可实现发送User对象到JMS。

  到此为止,我们已经实现了消息的发送,现在我们来实现消息的接收。

  相对于发送,消息的接收的配置要相对简短些,我们使用MDP(Message Drive POJO)来实现消息的异步接收。我们需要实现javax.jms.MessageListener接口的void onMessage(Message message)方法来接收消息。不过我们可以使用Spring中提供的MessageListenerAdapter来简化接收消息的代码。
  我们先写处理消息的接口和实现类:

package com.tiandinet.jms.sample;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import com.tiandinet.jms.sample.User;
/**
* JMS message handler.
*
* Yangtze
*/
public interface IMessageConsumer {
    /**
     * Handle the user message.
     *
     * @param user
     *            User
     * @throws JMSException
     *             exception
     */
    void handleMessage(User user) throws JMSException;
}
package com.tiandinet.jms.sample;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.tiandinet.jms.sample.User;
import com.tiandinet.jms.sample.IMailService;
/**
* JMS message handler - for User Message.
*
* Yangtze
*/
public class UserMessageConsumerImpl implements IMessageConsumer {
    private static transient Log logger = LogFactory.getLog(UserMessageConsumerImpl.class);
    private IMailService mailService;
    /**
     * {@inheritDoc}
     *
     * @see com.tiandinet.jms.sample.IMessageConsumer
* #handleMessage(com.tiandinet.jms.sample.User)
     */
    public void handleMessage(User user) throws JMSException {
        if (logger.isDebugEnabled()) {
            logger.debug("Receive a User object from ActiveMQ: " + user.toString());
        }
        mailService.sendUserLoginInforMail(user);
    }
} 


  配置message listener

     <bean id="messageListener"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor -arg>
            <bean class="com.tiandinet.jms.sample.UserMessageConsumerImpl">
                <property name="mailService"
                    ref="mailService" />
            </bean>
        </constructor>
        <property name="defaultListenerMethod"
            value="handleMessage" />
        <property name="messageConverter"
            ref="userMessageConverter" />
    </bean>

  其中的mailService即是我们的邮件发送类,其sendUserLoginInforMail方法实现了邮件发送的功能。
消息侦听适配器defaultListenerMethod属性指定Spring在收到消息后调用的方法,此处为 handleMessage,Spring会根据收到的消息User对象,调用handleMessage(User user)方法。

  配置消息侦听容器,并指定我们定义的消息侦听器。

     <bean id="listenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers"
            value="5" />
        <property name="connectionFactory"
            ref="jmsFactory" />
        <property name="destination"
            ref="queueDestination" />
        <property name="messageListener"
            ref="messageListener" />
    </bean>

本文转载自:http://blog.csdn.net/giianhui/article/details/5489412

giianhui
粉丝 6
博文 287
码字总数 0
作品 0
深圳
技术主管
私信 提问
Spring Cloud Stream Binder 实现

Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖

xiaoshuaiv5
2019/04/01
0
0
ActiveMQ启动

自己在本机虚拟机搭建ActiveMQ正常也做了很多测试,但是在测试环境中搭建是发现jdk是1.6于是就把mq的jdk改成单独的jdk,启动正常: INFO: Loading '/usr/local/server/ActiveMQ/apache-activ...

跌倒了爬起来再哭
2016/10/09
587
1
关于ActiveMQ5.7突然崩溃讨论

2017-05-05 09:38:20.416 [ExceptionGuardFilter.java:22] - chain error:Uncategorized exception occured during JMS processing; n ested exception is javax.jms.JMSException: Peer (vm......

禾乘李朋友
2017/05/05
5
2
Spring整合ActiveMQ,无相关activemq日志输出

以内嵌的方式整合Spring和ActiveMQ,用的是ActiveMQ5.8.0版本,导入的jar有: activemq-broker-5.8.0.jar activemq-client-5.8.0.jar activemq-console-5.8.0.jar activemq-pool-5.8.0.jar a......

哎哎哎哎啊
2013/09/15
2K
1
ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike
2018/05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

如何为MVC-3转换为4应用程序添加对System.Web.Optimization的引用

我正在最近从MVC 3转换为MVC 4 beta的项目中尝试使用新的捆绑功能。 它需要global.asax中的一行代码, BundleTable.Bundles.RegisterTemplateBundles(); ,这需要using System.Web.Optimiza...

技术盛宴
今天
79
0
Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法。所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用。 本篇文章有参考自:https://www...

CREATE_17
昨天
114
0
处理CSV文件中的逗号

我正在寻找有关如何处理正在创建的csv文件的建议,然后由我们的客户上传,并且该值可能带有逗号(例如公司名称)。 我们正在研究的一些想法是:带引号的标识符(值“,”值“,”等)或使用|...

javail
昨天
79
0
计算一个数的数位之和

计算一个数的数位之和 例如:128 :1+2+8 = 11 public int numSum(int num) { int sum = 0; do { sum += num % 10; } while ((num = num / 10) > 0); return sum;......

SongAlone
昨天
128
0
为什么图片反复压缩后普遍会变绿,而不是其他颜色?

作者:Lion Yang 链接:https://www.zhihu.com/question/29355920/answer/119088684 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 业余版概要:安卓的...

shzwork
昨天
85
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部