文档章节

JMS消息中间件之ActiveMQ学习

c
 caiyezi
发布于 2016/11/08 20:24
字数 1643
阅读 6
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

1、下载

下载二进制bin文件:http://activemq.apache.org/activemq-5132-release.html

下载源码:

 

2、启动:

解压任意路径:

启动后:

3、访问:

访问http://localhost:8161/admin/  用户名&密码:admin

4、主要应用:

 

5、点对点消息发送&接收

首先是producer方:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageProducer = session.createProducer(destination); // 创建消息生产者
            
            sendMessage(session, messageProducer);
            
            session.commit();            //commit提交
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {    
            TextMessage textMessage = session.createTextMessage("Active MQ消息"+i);        //文本消息
            System.out.println("发送消息: Active MQ消息"+i);
            messageProducer.send(textMessage);
        }
    }

}

然后是消费方实现,主要有两种,一种是直接receive方法接收消息,一种是通过监听实现:

receive:

package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            while (true) {
                TextMessage textMessage = (TextMessage)messageConsumer.receive(100000);        //接收消息(文本消息)
                if(textMessage != null){
                    System.out.println("接收到的消息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

监听方式:

package com.activemq.test;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}
package com.activemq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createQueue("FirstQueue"); // 创建消息队列

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}

几轮测试下来,消费生产记录:

 

6、发布订阅模式

新建订阅1:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者1
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者1接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

 

订阅2:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者    -- 消息订阅者2
 * 普通receive方式接收消息
 * @author Administrator
 *
 */
public class JMSConsumer2 {
    
    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地
        
        MessageConsumer messageConsumer = null;        //消息消费者
        
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
                JMSConsumer2.BROKERURL);
        
        try {
            connection = connectionFactory.createConnection();
            
            connection.start(); // 启动连接
            
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建session,false表示不添加事务
            
            destination = session.createTopic("SecondTopic"); // 创建消息订阅

            messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new Listener2());        //注册监听
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
    
}
package com.activemq.test2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息监听-消息订阅者1的监听
 * @author Administrator
 *
 */
public class Listener2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        
        try {
            System.out.println("订阅者2接收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }
}

 

消息一定要先订阅,然后producer再发布,否则先发布再订阅的话后边才订阅的一方是收不到之前发布的消息的!

然后是发布方:

package com.activemq.test2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * 
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER; // 默认连接用户
    private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; // 默认连接密码
    private static final String BROKERURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; // 默认消息总线

    private static final int SENDNUM = 10; // 发送消息总量

    public static void main(String[] args) {

        ConnectionFactory connectionFactory = null; // 连接工厂
        Connection connection = null; // 连接对象

        Session session = null; // 会话级session,接收或发送消息的线程
        Destination destination = null; // 消息发送的目的地

        MessageProducer messageProducer = null; // 消息生产者

        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
                JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();

            connection.start(); // 启动连接
            
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建session,true表示添加事务

            destination = session.createTopic("SecondTopic"); // 创建发布主题
                
            messageProducer = session.createProducer(destination); // 创建消息发布者
            
            sendMessage(session, messageProducer);

            session.commit(); // commit提交
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送消息
     * 
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < SENDNUM; i++) {
            TextMessage textMessage = session.createTextMessage("Active MQ发布消息" + i); // 文本消息
            System.out.println("发送消息: Active MQ 发布的消息" + i);
            messageProducer.send(textMessage);
        }
    }
}

运行效果查看:

 

本文转载自:http://www.cnblogs.com/vipzhou/p/5379866.html

上一篇: nodejs初印象
下一篇: mysql随笔
c
粉丝 1
博文 108
码字总数 0
作品 0
西安
程序员
私信 提问
加载中

评论(0)

activeMQ5官方文档翻译-初始化配置

首先你需要把jar包加到classpath 所需的jar包 为了使ActiveMQ更容易使用,默认的activemq-all.jar包包含了所有需要用到的库文件。如果你喜欢以明确的控制jar包的方式来使用ActiveMQ,那下面是...

z_jordon
2015/05/29
342
0
linux下activemq安装与配置

什么是消息中间件(MQ)? 1.1 为什么会需要消息队列(MQ)?   主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达M...

osc_44jaxl0s
2019/02/13
6
0
品优购商城项目(五)消息中间件 ActiveMQ

消息中间件用于降低各个项目模块的耦合,适用于不需要等待返回消息才能进入下一个业务环节的模块,以及实时要求性不高的业务模块。 一、JMS   JMS(Java Messaging Service)是Java平台上...

osc_uwpj27el
2019/09/09
1
0
消息中间件--ActiveMQ&JMS消息服务

消息中间件 ### ---------- 消息中间件 1. 消息中间件的概述 2. 消息中间件的应用场景   * 异步处理   * 应用解耦   * 流量削峰   * 消息通信 ---------- ### JMS消息服务 ### ----...

osc_9ajwkns1
2018/04/22
4
0
分布式消息通信(ActiveMQ)

分布式消息通信(ActiveMQ) 应用场景 异步通信 应用解耦 流量削峰 # ActiveMQ安装 下载 http://activemq.apache.org/ 压缩包上传到Linux系统 apache-activemq-5.15.9-bin.tar.gz 解压缩 ta...

osc_hlq50xpf
2019/06/05
17
0

没有更多内容

加载失败,请刷新页面

加载更多

总结:Spring boot熔断

一、介绍 1、熔断的目的:是为了保证服务高可用,不能因为系统中的一个小服务不可用,从而导致整个系统崩溃。 2、熔断的原理:对于使用相关注解的类或者方法,系统会监控其错误,如果多次出现...

浮躁的码农
9分钟前
10
0
抽象工厂

1. 介绍 提供一个创建一系列相关或相互依赖对象的接口,而无须指定它们具体的类。抽象工厂模式又称为Kit模式,属于对象创建型模式,是工厂方法模式的升级版,在有多个业务品种、业务分类时,...

steven-黄笑笑
14分钟前
22
0
Autoruns显示windows系统所有启动项

如题,可以显示所有启动项,然后把不需要的启动项都取消勾选即可。而对于“服务”项,建议不要在里面去掉勾选,因为这样会禁止服务,容易出错。我通常都会打开电脑的”服务“,按启动类型排序...

ethanleellj
15分钟前
11
0
多线程基础学习

线程和进程有区别? 进程=加载上下文+执行程序+保存上下文 进程属于资源分配的最小单位,线程属于执行任务的最小单位。 线程6种状态: 1、New 尚未启动的线程的线程状态。 2、Runnable 可运行...

javazyw
43分钟前
28
0
英文对“ Big O”符号的解释是什么? - What is a plain English explanation of “Big O” notation?

问题: 我希望尽可能少用正式的定义和简单的数学方法。 解决方案: 参考一: https://stackoom.com/question/22l0/英文对-Big-O-符号的解释是什么 参考二: https://oldbug.net/q/22l0/What...

javail
45分钟前
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部