文档章节

ActiveMQ的简单使用

乐在克里特
 乐在克里特
发布于 2017/02/24 13:45
字数 664
阅读 131
收藏 6

项目中使用的介绍

一、运行ActiveMQ

在文件路径下...\apache-activemq-5.13.3\bin\win64

运行activemq.bat 

这是系统中的使用,运行后还可以访问相应的页面。

二、项目中的实践

1.依赖的jar包

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-spring</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>artemis-jms-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>artemis-jms-server</artifactId>
        </dependency>

2.配置ActiveMQ的配置,要和运行的保持一致,在commom.properties文件中。

activemq.broker.url=tcp://127.0.0.1:61616
activemq.user=admin
activemq.password=admin
activemq.name=datacenter.test
activemq.name.access=datacenter.access

3.applicationContext-mq.xml 中配置(推消息端)

    <!-- 1.配置ActiveMQ 连接工厂 -->
    <amq:connectionFactory id="creditMQConnectionFactory"
                           brokerURL="${activemq.broker.url}" 
                           userName="${activemq.user}" 
                           password="${activemq.password}"  />

    <!-- 2.配置缓存 ConnectionFactory -->
    <bean id="creditMQconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="creditMQConnectionFactory"/>
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="creditJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <constructor-arg ref="creditMQConnectionFactory" />
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 定义授权队列消费者
    <bean id="creditConsumer" class="com.itom.mq.consumer.CreditConsumer"/>-->

    <!-- 定义Queue监听器 -->
    <jms:listener-container destination-type="queue" container-type="default" 
                            connection-factory="creditMQconnectionFactory" 
                            acknowledge="auto">    
    </jms:listener-container>
    <!--运营相关配置结束-->

4.编写生产者和消费者

  1) 生产者

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class CreditProducer {

    private Map<String ,Queue> cacheQueue = new ConcurrentHashMap<String, Queue>();

    @Resource
    private JmsTemplate creditJmsTemplate;

    public void pushCreditData(String queueName ,Object message){

        Queue queue = cacheQueue.get(queueName);
        if(queue == null){
            queue = new ActiveMQQueue(queueName);
            cacheQueue.put(queueName,queue);
        }
        this.creditJmsTemplate.convertAndSend(queue, message);
    }
}

  2) 消费者:

消费者xml配置信息:

    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="${jms.broker.user}"/>
        <constructor-arg index="1" value="${jms.broker.password}"/>
        <constructor-arg index="2" value="${jms.broker.url}"/>
    </bean>

    <!-- ConnectionFactory Definition -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"/>
    </bean>

    <!--  Default Destination Queue Definition-->
    <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${jms.queue.name}"/>
    </bean>

    <!-- JmsTemplate Definition -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="defaultDestination"/>
    </bean>

    <!-- JmsTemplate Receiver-->
    <bean id="messageReceiver" class="com.itom.rating.listener.MessageReceiver"/>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="${jms.queue.name}"/>
        <property name="messageListener" ref="messageReceiver"/>
    </bean>

    <!--the same ActiveMQ but different receiver -->
    <bean id="accessMessageReceiver" class="com.itom.rating.listener.AccessMessageReceiver"/>
    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="${jms.queue.access.name}"/>
        <property name="accessMessageListener" ref="accessMessageReceiver"/>
    </bean>

线程池的配置:

@Service("messageComputePool")
public class MessageComputePool {

    public static final Logger logger = LoggerFactory.getLogger(MessageComputePool.class);

    private ExecutorService tPool = null;

    public MessageComputePool() {
        this.tPool = Executors.newFixedThreadPool(50);
    }

    public void submit(Callable task) {
        this.debug("messageComputePool 开始工作");
        this.tPool.submit(task);
    }

    public void debug(String message) {
        if (logger.isDebugEnabled()) {
            logger.debug(message);
        }
    }

    public void info(String message) {
        if (logger.isInfoEnabled()) {
            logger.info(message);
        }
    }
}

代码中使用:

public class MessageReceiver implements MessageListener {

    //自己定义的类,使用线程池来处理消息
    @Resource(name = "messageComputePool")
    MessageComputePool pool;

    public void onMessage(Message message) {
        if(message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println(String.format("Received: %s",text));
                JSONObject jsonObj = (JSONObject)JSONObject.parse(text);
                ...
                //消费者接收到消息就可以进行处理了
                ComputeTask task = new ComputeTask();//...implements Callable<Result>...
                pool.submit(task);//...
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

  3)使用生产者推消息

@Service("collectService ")
public class CollectServiceImpl implements CollectService {
      public static final Logger LOG  = LoggerFactory.getLogger(CollectServiceImpl.class);

      @Resource
      private CreditProducer creditProducer;

      @Override
      public void collect(CollectParameter parameter) {
            Map dataMap = new HashMap();
            dataMap.put("sourceCode", parameter.getValue());
            dataMap.put("userId",parameter.getUserid());           
            creditProducer.pushCreditData("itom.activemq.access", JSON.toJSONString(dataMap));
        }
    }
}

 

推荐个ActiveMQ入门教程:

http://blog.csdn.net/jolingogo/article/category/1658165

© 著作权归作者所有

共有 人打赏支持
下一篇: Java中的static
乐在克里特
粉丝 16
博文 268
码字总数 394729
作品 0
杭州
程序员
私信 提问
ActiveMQ安装配置和使用简例

本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/48173665 ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件...

开开心心过
2015/09/02
0
0
Linux环境下Apache ActiveMQ 基本安装

在linux上安装mq,并映射到外网。 1.Apache ActiveMQ安装基本条件请参考链接: 2.下载Apache ActiveMQ 3.解压安装 将下载好的apache-activemq-5.13.3-bin.tar解压至安装目录 命令:tar -zxvf...

zhuyuansj
2018/04/02
0
0
跟我学习dubbo-ActiveMQ的安装-单节点与使用(9)

ActiveMQ 的安装与使用(单节点) 1、 安装 JDK 并配置环境变量 JAVA_HOME=/usr/local/java/jdk1.7.0_72 2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz) $ ...

HI曲奇饼干
2016/01/19
238
0
SpringBoot整合ActiveMQ消息队列

首先要讲什么是ActiveMQ:AciveMQ是Apache出品的目前最流行,能力强劲的开源消息总线 主要功能: 1、 解决服务之间代码耦合 2、 使用消息队列,增加系统并发处理量 主要应用场景: 1、 当系统...

逆风局局长熊
2018/12/06
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
2018/04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

发布xxl-job executor dotnet core 执行器的实现

DotXxlJob [github][https://github.com/xuanye/DotXxlJob] xxl-job的dotnet core 执行器实现,支持XXL-JOB 2.0+ 1 XXL-JOB概述 [XXL-JOB][1]是一个轻量级分布式任务调度平台,其核心设计目标...

假正经哥哥
今天
2
0
mysql 查询当天、本周,本月,上一个月的数据

今天 select * from 表名 where to_days(时间字段名) = to_days(now()); 昨天 SELECT * FROM 表名 WHERE TO_DAYS( NOW( ) ) - TO_DAYS( 时间字段名) <= 1 近7天 SELECT * FROM 表名 wher......

BraveLN
今天
3
0
Spring 事务初始化源码分析

相关文章 Spring 事务使用详解 Spring AOP 创建代理的源码解析 Spring AOP 注解方式源码解析 Spring AOP 功能使用详解 Spring 的 getBean 方法源码解析 Spring bean 创建过程源码解析 Spring...

TSMYK
今天
1
0
Android Multimedia框架总结(六)C++中MediaPlayer的C/S架构

前面几节中,都是通过java层调用到jni中,jni向下到c++层并未介绍 看下Java层一个方法在c++层 MediaPlayer后续过程 frameworks/av/media/libmedia/MediaPlayer.cpp 找一个我们之前熟悉的setDa...

天王盖地虎626
今天
3
0
【Linux】【MySQL】CentOS7安装最新版MySQL8.0.13(最新版MySQL从安装到运行)

1、前言   框框博客在线报时:2018-11-07 19:31:06   当前MySQL最新版本:8.0.13 (听说比5.7快2倍)   官方之前表示:MySQL 8.0 正式版 8.0.11 已发布,MySQL 8 要比 MySQL 5.7 快 2 ...

Code辉
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部