文档章节

JMS消息中间件之ActiveMQ学习

c
 caiyezi
发布于 2016/11/08 20:24
字数 1643
阅读 2
收藏 0
点赞 0
评论 0

1、下载

下载二进制bin文件:http://activemq.apache.org/activemq-5132-release.html

下载源码:

 

2、启动:

解压任意路径:

启动后:

3、访问:

访问http://localhost:8161/admin/  用户名&密码:admin

4、主要应用:

 

5、点对点消息发送&接收

首先是producer方:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageProducer = session.createProducer(destination); // 创建消息生产者
            
            sendMessage(session, messageProducer);
            
            session.commit();            //commit提交
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {    
            TextMessage textMessage = session.createTextMessage("Active MQ消息"+i);        //文本消息
            System.out.println("发送消息: Active MQ消息"+i);
            messageProducer.send(textMessage);
        }
    }

}

然后是消费方实现,主要有两种,一种是直接receive方法接收消息,一种是通过监听实现:

receive:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            while (true) {
                TextMessage textMessage = (TextMessage)messageConsumer.receive(100000);        //接收消息(文本消息)
                if(textMessage != null){
                    System.out.println("接收到的消息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

监听方式:

package com.activemq.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}
package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

几轮测试下来,消费生产记录:

 

6、发布订阅模式

新建订阅1:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者1
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者1接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

 

订阅2:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者2
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener2());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者2接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

 

消息一定要先订阅,然后producer再发布,否则先发布再订阅的话后边才订阅的一方是收不到之前发布的消息的!

然后是发布方:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接
            
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createTopic("SecondTopic"); // 创建发布主题
                
            messageProducer = session.createProducer(destination); // 创建消息发布者
            
            sendMessage(session, messageProducer);

            session.commit(); // commit提交
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {
            TextMessage textMessage = session.createTextMessage("Active MQ发布消息" + i); // 文本消息
            System.out.println("发送消息: Active MQ 发布的消息" + i);
            messageProducer.send(textMessage);
        }
    }
}

运行效果查看:

 

本文转载自:http://www.cnblogs.com/vipzhou/p/5379866.html

共有 人打赏支持
c
粉丝 1
博文 108
码字总数 0
作品 0
西安
程序员
activeMQ5官方文档翻译-初始化配置

首先你需要把jar包加到classpath 所需的jar包 为了使ActiveMQ更容易使用,默认的activemq-all.jar包包含了所有需要用到的库文件。如果你喜欢以明确的控制jar包的方式来使用ActiveMQ,那下面是...

z_jordon
2015/05/29
0
0
spring整合jms系列之----点对点(一)

JMS作为一个支持点对点(PTP)和订阅式(pub/sub)式的消息中间件,为很多项目开发者所使用。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务,由于JMS对Spring的支持...

码上中国博客
2015/11/12
0
0
Win7环境下安装ActiveMQ

参考ActiveMQ官方文档:http://activemq.apache.org/getting-started.html 安装ActiveMQ 近来要学习JMS,在网上查了些资料,发现ActiveMQ是比较流行的JMS开源框架,决定使用ActiveMQ来学习J...

纠结名字
2015/08/09
0
0
2.ActiveMQ消息队列安装使用

全程是MOM (Message Oriented Middleware) 消息中间件 消息中间件有很多,比如: 1.ActiveMQ java语言编写的和java系统结合紧密 2.RabbitMQ Erlong语言开发的,天生支持高并发,性能优于A...

小杰java
2017/10/26
0
0
activeMQ5官方文档翻译-运行消息中间件

把activeMQ中间件运行起来 注:如果你想以内嵌的方式来使用消息中间件的话你可以参照怎么在一个连接中间嵌入消息中间件 以二进制文件的方式分发的ActiveMQ自带了一个'activemq'的脚本文件来运...

z_jordon
2015/05/31
0
0
Java消息中间件之ActiveMQ

一、消息中间件 1.消息中间件概述 中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。 消息中间件:关注与数据的发送和接...

aibinxiao
2017/11/01
0
0
ActiveMQ 消息服务(一)

1、百度百科对ActiveMQ的解释: ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的...

Andy市民
2015/11/06
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
06/26
0
0
一步一步Spring整合JMS

1.1 JMS简介 JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话...

摆渡者
2015/08/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

fiddle 4 初始化

下载 配置fiddle 4 如果证书导出失败,执行下面脚本 D:\programs\Fiddler>makecert.exe -r -ss my -n "CN=DO_NOT_TRUST_FiddlerRoot, O=DO_NOT_TRUST, OU=Created by http://www.fiddler2.c......

柯里昂
2分钟前
0
0
rabbitmq学习记录(六)交换机Exchange-direct

实现功能:一条消息发送给多个消费者 交换机模式:direct 相比于之前的fanout模式,可以进一步的筛选获取消息的消费者。 fanout模式下,只要消费者监听的队列,已经与接收生产者消息的交换机...

人觉非常君
19分钟前
0
0
Java 之 枚举

Java 中声明的枚举类,均是 java.lang.Enum 类的子类,Enun 类中的常用方法有: name() 返回枚举对象名称 ordinal() 返回枚举对象下标 valueOf(Class enumType, String name) 转换枚举对象 ...

绝世武神
27分钟前
0
0
使用爬虫实现代理IP池之放弃篇

啥叫代理IP以及代理IP池 概念上的东西网上搜索一下就好了,这里简单科普一下(大部分会读这篇文章的人,基本是不需要我来科普的),白话说就是能联网并提供代理访问互联网的服务器,它提供的...

一别丶经年
43分钟前
0
0
sqoop导入数据到Base并同步hive与impala

使用Sqoop从MySQL导入数据到Hive和HBase 及近期感悟 基础环境 Sqool和Hive、HBase简介 Sqoop Hive HBase 测试Sqoop 使用Sqoop从MySQL导入数据到Hive 使用复杂SQL 调整Hive数据类型 不断更新 ...

hblt-j
今天
0
0
Dart 服务端开发 文件上传

clent端使用angular组件 upload_component.html form id="myForm" method="POST" enctype="multipart/form-data"> <input type="file" name="fileData"> <!-- file field --></form>......

scooplol
今天
0
0
apache和tomcat同时开启,乱码问题

tomcat和apache同时开启,会走apache的转发,执行的是AJP/1.3协议。所以在tomcat的配置文件server中, <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" useBodyEncodingForU......

Kefy
今天
0
0
使用ssh-keygen和ssh-copy-id三步实现SSH无密码登录 和ssh常用命令

ssh-keygen 产生公钥与私钥对. ssh-copy-id 将本机的公钥复制到远程机器的authorized_keys文件中,ssh-copy-id也能让你有到远程机器的home, ~./ssh , 和 ~/.ssh/authorized_keys的权利 第一步...

xtof
今天
0
0
orcale 查询表结构

SELECT t.table_name, t.colUMN_NAME, t.DATA_TYPE || '(' || t.DATA_LENGTH || ')', t1.COMMENTS FROM User_Tab_Cols t, User_Col_Comments t1WHERE t.table_name......

wertwang
今天
0
0
华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大

华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大!华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大! 在华为最新发布的nova 3手机上,抖音通过华为himedia SDK集成了60fps、超级...

华为终端开放实验室
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部