文档章节

ActiveMQ安装和使用

xiaozhou18
 xiaozhou18
发布于 2017/06/07 18:42
字数 1669
阅读 42
收藏 0

1、安装

Windows

解压apache-activemq-5.11.1-bin.zip   文件  双击apache-activemq-5.11.1\bin\win64\activemq.bat文件

在浏览器中输入  http://localhost:8161/  出现如下图所示 表示mq启动成功

2、第一个activemq代码

生产者

public class Producer {
    public static void main(String[] args) throws Exception {
    //创建连接工厂   
    //第一个参数用户名
    //第二个参数密码
    //第三个参数mq的的地址        在activemq.xml可以找到
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
            ActiveMQConnectionFactory.DEFAULT_USER, 
            ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
            "tcp://127.0.0.1:61616");
    //创建客户端连接
    Connection connection=connectionFactory.createConnection();
    //开启连接
    connection.start();
    //创建会话
    //第一个参数表示 是否开启实物   如果false表示不开启   如果是true 在发送完数据的时候要执行session.commit();
    //第二个参数表示签收模式 Session.AUTO_ACKNOWLEDGE 表示自动签收
    //Session.CLIENT_ACKNOWLEDGE  表示手动签收  消费端每消费一条数据时都要向mq发送一个确认签收   多用这种方式
    //Session.DUPS_OK_ACKNOWLEDGE  不一定签不签收    这个容易造成重复消费
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    //创建目的地(消息发送到哪里) 在PTP中 Destination被称作queue   在发布订阅模式中Destination被称作topic
    //第一个参数表示给要发送到哪里起个名字
    Destination queue = session.createQueue("aq");
    //创建生产者

//如果是发布订阅模式 用 session.createTopic(arg0)
    MessageProducer producer = session.createProducer(queue);
    //设置消息的持久策略
    //DeliveryMode.NON_PERSISTENT  不持久
    //DeliveryMode.PERSISTENT 持久
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    //生产消息
    TextMessage textMsg=session.createTextMessage();
    for(int i=0;i<10;i++){
        textMsg.setText("生产第 "+i+" 条数据");
        //如果上面开启事物 这里要提交事物session.commit();
        //    producer.send(arg0, arg1, arg2, arg3, arg4);  第一个参数目的地 第二个参数 要发送的消息 第三个参数 持久策略 地四个参数 是优先级   第五个参数消息在mq中的存活时间
        producer.send(textMsg);
    }
    if(connection!=null){
        connection.close();
    }
    }
}

消费者

public class Consoumer {

    public static void main(String[] args) throws Exception {
        //创建连接工厂   
        //第一个参数用户名
        //第二个参数密码
        //第三个参数mq的的地址        在activemq.xml可以找到
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER, 
                ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
                "tcp://127.0.0.1:61616");
        //创建客户端连接
        Connection connection=connectionFactory.createConnection();
        //开启连接
        connection.start();
        //创建会话
        //第一个参数表示 是否开启实物   如果false表示不开启   如果是true 在发送完数据的时候要执行session.commit();
        //第二个参数表示签收模式 Session.AUTO_ACKNOWLEDGE 表示自动签收
        //Session.CLIENT_ACKNOWLEDGE  表示手动签收  消费端每消费一条数据时都要向mq发送一个确认签收   多用这种方式
        //Session.DUPS_OK_ACKNOWLEDGE  不一定签不签收    这个容易造成重复消费
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //创建目的地(消息发送到哪里) 在PTP中 Destination被称作queue   在发布订阅模式中Destination被称作topic
        //第一个参数表示给要发送到哪里起个名字
        Destination queue = session.createQueue("aq");
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true){
            //接收消息
            //consumer.receive  可以传时间毫秒数表示 等待几毫秒
            //consumer.receiveNoWait()  表示不等待
            TextMessage textMsg=(TextMessage) consumer.receive();

// 如果生产者的签收策略是 CLIENT_ACKNOWLEDGE  这里 一定要手工签收textMsg.acknowledge(); 否则mq不认为这条数据已经被消费
            if (textMsg==null)break;
            String text = textMsg.getText();
            System.out.println("收到的消息是   "+text);
        }
        if(connection!=null){
            connection.close();
        }

    }

}

3、activemq安全机制

在activemq.xml 中的broker节点中加入以下内容 表示只要在这里配置的用户才可以生产或消费数据

<plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="aa" password="bb" groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
 </plugins>

4、 JMS Selectors  消费者可以过滤从queue中获取消息

JMS Selectors用于在订阅中,基于消息属性对消息进行过滤。JMS Selectors由SQL92语法定义。以下是个Selectors的例子:
Java代码

在生产数据的时候设置过滤条件的属性字段一定要用property如  setStringProperty("aa", "bb");

myMessage.setStringProperty("JMSType ", "car");

myMessage.setStringProperty("weight ", "5000");
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");

表示只消费 JMSType 属性值是car并且weight 大于2500的数据


在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如: LIKE '12%3' ('123' true,'12993' true,'1234' false) LIKE 'l_se' ('lose' true,'loose' false) LIKE '\_%' ESCAPE '\' ('_foo' true,'foo' false) 需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换

5、数据持久的 设置在mysql数据库中

把mysql-connector-java-5.1.26.jar 放到 apache-activemq-5.11.1\lib下

在activemq.xml 中加入如下配置  在broker节点外面

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  

<!-- activemq这张表要保证数据库里已经有了-->
      <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>  
      <property name="username" value="root"/>  
      <property name="password" value="root"/>  
      <property name="maxActive" value="200"/>  
      <property name="poolPreparedStatements" value="true"/>  
    </bean>

修改  <persistenceAdapter>
            <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
              <jdbcPersistenceAdapter dataSource="#mysql-ds"/>  
        </persistenceAdapter>

6、activemq  主从集群环境搭建

一、部署方案,ActiveMQ集群环境准备:
(1)首先我们下载apache-activemq-5.11.1-bin.tar.gz
(2)Zookeeper方案
主机IP      消息端口     通信端口    
node22       2181      2888:3888 
node33       2181      2888:3888 
node44       2181      2888:3888
(3)ActiveMQ方案
主机IP     集群通信端口      消息端口       控制台端口 
node22    62621                51511             8161 
node33    62621                51511              8161
node44    62621                51511              8161
二、:首先搭建zookeeper环境  参考 https://my.oschina.net/xiaozhou18/blog/787132
三、:搭建activemq环境
(1)在node22节点下,创建/usr/local/java/activemq-
cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件

(2)修改activemq-cluster/apache-activemq-5.11.1/conf/activemq.xml配置文件

第一处修改:brokerName=”activemq-cluster”(三个节点都需要修改)

第二处修改:先注释掉适配器中的kahadb

第三处修改:添加新的leveldb配置如下(三个节点都需要修改)

<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"  数据存放目录
replicas="3"  有几台activemq 就配置几
bind="tcp://0.0.0.0:62621"     集群通信地址
zkAddress="node22:node33:node44:2181"  zk集群地址
hostname="node22"   每台机器的主机名
zkPath="/activemq/leveldb-stores"  zk的挂载节点
/>
</persistenceAdapter>

(3)把配置好的activemq-cluster文件夹复制到其他三台机器上  并修改activemq.xml对应的配置

四、:测试启动activemq集群:
第一步:启动zookeeper集群,命令:zkServer.sh start
第二步:启动三台mq集群:顺序启动mq:命令如下:
/usr/local/java/activemq-cluster/apache-activemq-5.11.1/bin/activemq  start(关闭stop)
第三步:查看三台机器日志信息:
tail -100f /usr/local/java/activemq-cluster/apache-activemq-5.11.1/data/activemq.log 
如果不报错,我们的集群启动成功,可以使用控制台查看

五:集群的brokerUrl配置进行修改即可:
failover:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)?Randomize=false

集群操作只需要改 工厂连接即可

 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
            ActiveMQConnectionFactory.DEFAULT_USER, 
            ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
            "failover:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)?Randomize=false");

六:负载均衡配置如下:
集群1链接集群2:
<networkConnectors>
<networkConnector
uri="static:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)"
duplex="false"/>
</networkConnectors>
集群2链接集群1:
<networkConnectors>
<networkConnector
uri="static:(tcp://node22:51511,tcp://node33:51511,tcp://node44:51511)"
duplex="false"/>
</networkConnectors>

 

 

© 著作权归作者所有

上一篇: Activiti5使用
下一篇: linux 命令
xiaozhou18
粉丝 4
博文 44
码字总数 89773
作品 0
哈尔滨
程序员
私信 提问
Win7环境下安装ActiveMQ

参考ActiveMQ官方文档:http://activemq.apache.org/getting-started.html 安装ActiveMQ 近来要学习JMS,在网上查了些资料,发现ActiveMQ是比较流行的JMS开源框架,决定使用ActiveMQ来学习J...

纠结名字
2015/08/09
1K
0
ActiveMQ安装配置和使用简例

本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/48173665 ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件...

开开心心过
2015/09/02
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
2018/06/26
0
0
跟我学习dubbo-ActiveMQ的安装-单节点与使用(9)

ActiveMQ 的安装与使用(单节点) 1、 安装 JDK 并配置环境变量 JAVA_HOME=/usr/local/java/jdk1.7.0_72 2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz) $ ...

HI曲奇饼干
2016/01/19
238
0
mac 安装消息中间件---ActiveMQ

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010046908/article/details/54728375 一般在mac上安装软件大家都是比较喜欢用brew来安装,今天就用brew来安装...

请叫我东子
01/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
672
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
38
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
25
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
71
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
69
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部