文档章节

ActiveMQ学习笔记(4)——通过ActiveMQ收发消息

郭寻抚
 郭寻抚
发布于 2015/02/27 14:52
字数 1716
阅读 3592
收藏 5

1.队列和主题

 1.1概念

        在MQ中,消息模型有两种,一种是队列(Queue),一种是主题(Topic)。队列是Point-To-Point的,队列中的消息,仅能被消费一次。主题是Pub/Sub模型,主题中的消息,可以由多个订阅者消费;订阅者只能消费它订阅以后的消息。这是遵循的JMS规范。

 1.2收发消息对象创建过程

      如上图所示,JMS规范中,收发消息的对象创建过程如下,下面的示例代码中也将注释这些过程:

       1. 初始化ConnetionFactory

       2. ConnetionFactory创建Connection

       3. Connection创建Session

       4. Session创建Destination(包括Queue 和 Topic两种)

       5.发: Session创建消息生产者MessageProducer(收:Session创建消息消费者MessageConsumer

       6.Seesion创建Message,(发:)MessageProducer发送到Destination,(收:)MessageConsumer从Destination接受消息。

1.3接口间的关系

        JMS规范定义了通用接口(JMS Common Interfaces)、队列接口(PTP-specific Interfaces)和 主题接口(Pub/Sub-specific Interfaces),队列接口和主题接口分别继承于通用接口,具体关系如下表所示。

       ActiveMQ对这些规范接口都有相应的实现。在实际的编程过程中,声明通用接口基本就够用了。如何区分Queue和Topic也很简单,参看下面的代码。

//Queue,队列
Destination destination = session.createQueue(subject);

//Topic,主题
Destination destination = session.createTopic(subject);



2.通过队列发送和接受消息

       运行代码的时候,可以先run起来接受消息的程序,再run发送消息的程序,来观察消息发送的过程。

       别忘记启动ActiveMQ服务器,安装部署和启动的办法,参见http://my.oschina.net/xiaoxishan/blog/380352


2.1通过Queue发送消息

package guo.examples.mq01.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 演示如何向MQ发送消息,和JDBC操作数据库的过程很像
 * 
 * 1.初始化连接工厂ConnectionFactory
 * 
 * 2.创建连接Connection
 * 
 * 3. 创建会话session
 * 
 * 4.打开队列createQueue
 * 
 * 5.获得消息生产者MessageProducer
 * 
 * 6.使用消息生产者发送消息
 * 
 * 7. 关闭会话session和连接Connection
 * 
 * 可以看出,使用JMS发送一个这么简单的消息,需要这么多的步骤,不方便。
 *
 */
public class Sender {

  public static void main(String[] args) {
    Sender sender = new Sender();
    String msg = "Hello World!";
    sender.sendMessage(msg);
    System.out.println("发送消息结束:" + msg);
  }

  /**
   * 使用JMS向MQ发送消息
   * 
   * @param msg 消息内容
   */
  public void sendMessage(String msg) {
    // defualt user & password both are null
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    // DEFAULT_BROKER_URL =failover://tcp://localhost:61616
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "TOOL.DEFAULT";
    // 1. 初始化连接工厂
    ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
    try {
      // 2. 创建连接
      Connection connection = contectionFactory.createConnection();
      connection.start();
      // 3.创建会话
      Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      // 4. 打开队列
      Destination destination = session.createQueue(subject);
      // 5. MessageProducer负责发送消息
      MessageProducer producer = session.createProducer(destination);
      TextMessage message = session.createTextMessage();
      for (int i = 0; i < 10; i++) {
        String tmp = i + ":" + msg;
        message.setStringProperty("hello", tmp);
        // 6. 发送消息
        producer.send(message);
        System.out.println("send: " + tmp);
        Thread.sleep(3000);
        //只有commit之后,消息才会进入队列
        session.commit();
        
      }
      // 7. 关闭会话和连接
      session.close();
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}


2.2通过Queue接受消息

package guo.examples.mq01.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 演示如何从MQ接受消息,和发送差不多
 * 
 * 1.初始化连接工厂ConnectionFactory
 * 
 * 2.创建连接Connection
 * 
 * 3. 创建会话session
 * 
 * 4.打开队列createQueue
 * 
 * 5.获得消息消费者MessageConsumer
 * 
 * 6.使用MessageConsumer接受消息
 * 
 * 7. 关闭会话session和连接Connection
 * 
 */
public class Receiver {

  public static void main(String[] args) {
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "TOOL.DEFAULT";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createQueue(subject);
      // MessageConsumer负责接受消息
      MessageConsumer consumer = session.createConsumer(destination);
      consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message msg) {
          TextMessage message = (TextMessage) msg;
          try {
            String hello = message.getStringProperty("hello");
            System.out.println("收到消息:\t" + hello);
            session.commit();
          } catch (JMSException e) {
            e.printStackTrace();
          }
        }
      });
      // 为了演示接受消息,这里把关闭会话和连接注释掉了。
      // session.close();
      // connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }

}



3.通过Topic发布和订阅消息

       为了使订阅者能够订阅消息,在运行程序时,需要先运行订阅者(Subscriber),后运行发布者(Publisher)。

3.1通过Topic发布消息

package guo.examples.mq01.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 演示如何向MQ发送消息,和JDBC操作数据库的过程很像
 * 
 * 1.初始化连接工厂ConnectionFactory
 * 
 * 2.创建连接Connection
 * 
 * 3. 创建会话session
 * 
 * 4.创建topic
 * 
 * 5.获得消息生产者MessageProducer
 * 
 * 6.使用消息生产者发送消息
 * 
 * 7. 关闭会话session和连接Connection
 * 
 * 只有那些在线的订阅者可以收到消息,所以我们需要先启动Subscriber
 *
 */
public class Publisher {

  public static void main(String[] args) {
    Publisher pb = new Publisher();
    String msg = "Hello World!~~~~~";
    pb.sendMessage(msg);
    System.out.println("发送消息结束:" + msg);
  }

  /**
   * 使用JMS向MQ发送消息
   * 
   * @param msg 消息内容
   */
  public void sendMessage(String msg) {
    // defualt user & password both are null
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    // DEFAULT_BROKER_URL =failover://tcp://localhost:61616
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "MQ.TOPIC";

    // 1. 初始化连接工厂
    ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user, password, url);
    try {
      // 2. 创建连接
      Connection connection = contectionFactory.createConnection();
      connection.start();
      // 3.创建会话
      Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      // 4. 创建要发布的主题,和Queue的区别就在此
      Destination destination = session.createTopic(subject);
      // 5. MessageProducer负责发送消息
      MessageProducer producer = session.createProducer(destination);
      TextMessage message = session.createTextMessage();
      message.setStringProperty("hello", msg);
      // 6. 发送消息
      producer.send(message);
      // 7. 关闭会话和连接
      session.commit();
      session.close();
      connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


3.2通过Topic订阅消息

这里我们只有1个订阅者,想要验证多个订阅者,拷贝多份代码,改个类名即可。再次提醒,先运行订阅者。

package guo.examples.mq01.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 演示如何从MQ接受消息,和发送差不多
 * 
 * 1.初始化连接工厂ConnectionFactory
 * 
 * 2.创建连接Connection
 * 
 * 3. 创建会话session
 * 
 * 4.打开队列createQueue
 * 
 * 5.获得消息消费者MessageConsumer
 * 
 * 6.使用MessageConsumer接受消息
 * 
 * 7. 关闭会话session和连接Connection
 * 
 */
public class Subscriber {


  public static void main(String[] args) {
    String user = ActiveMQConnection.DEFAULT_USER;
    String password = ActiveMQConnection.DEFAULT_PASSWORD;
    String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    String subject = "MQ.TOPIC";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
      Topic topic = session.createTopic(subject);
      // MessageConsumer负责接受消息
      MessageConsumer consumer = session.createConsumer(topic);
      consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message msg) {
          TextMessage message = (TextMessage) msg;
          try {
            String hello = message.getStringProperty("hello");
            System.out.println("订阅者---SecondSubscriber---收到消息:\t" + hello);
            session.commit();
          } catch (JMSException e) {
            e.printStackTrace();
          }
        }
      });
      // 为了测试效果,注释掉了两行代码,使Session和connection一直处于打开状态
      //session.close();
      //connection.close();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}


4.总结和代码地址

向ActiveMQ收发消息的编码过程和JDBC操作数据库的过程很相似,也有同样的毛病,就是重复代码很多,Spring-JMS为我们提供了更为便利的解决方案,我们后续再谈。

本文中的代码见 http://pan.baidu.com/s/1c02GTDQ  。



© 著作权归作者所有

郭寻抚
粉丝 58
博文 26
码字总数 20965
作品 0
东城
程序员
私信 提问
加载中

评论(3)

余魁
余魁
应该是看懂了:不同的用户创建和监听不同的队列,内外网只能是配置两个jmsTemplate……谢谢老板的博文!
余魁
余魁
一个工作流中,第一步完成了想通知第二步的待办人,第二步完成了想通知第三步的待办人……待办人都会监听消息,怎么区分监听的是哪个消息呢?
余魁
余魁
请教大侠: “try {
String hello = message.getStringProperty("hello");……”不知道有没有用户的概念?发送消息的时候指定发送给某个用户,接收消息的时候只接收某个用户发过来的?
另外:局域网和外网都想访问AMQ服务,该怎么设置呢?……估计可以在项目中通过登录ip判断用户是内外网,然后该怎么设置AMQ访问ip呢?
只需三步:内嵌ActiveMQ到SpringBoot应用中

不知你是否有过这样的体验: 在调试带JMS组件的应用时, 需要额外启动一个JMS服务器来配合测试。 这样既操作繁琐,又不利于实现单元测试——不符合单元测试AIR原则(自动化,独立性,可重复,...

天上只北
2018/06/29
0
0
ActiveMQ安装配置和使用简例

本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/48173665 ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件...

开开心心过
2015/09/02
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
2018/04/15
0
0
Win7环境下安装ActiveMQ

参考ActiveMQ官方文档:http://activemq.apache.org/getting-started.html 安装ActiveMQ 近来要学习JMS,在网上查了些资料,发现ActiveMQ是比较流行的JMS开源框架,决定使用ActiveMQ来学习J...

纠结名字
2015/08/09
1K
0
ActiveMQ-inAction-Broker-学习笔记

Running Broker(运行 broker) ActiveMQ5.0 的二进制发布包中bin 目录中包含一个名为activemq 的脚本, 直接运行这个脚本就可以启动一个broker。 此外也可以通过一下对其进行配置: Broker...

Java搬砖工程师
2018/12/24
43
0

没有更多内容

加载失败,请刷新页面

加载更多

官方来源的 Duo Mobile App 解决了我的 Network Difficulties 问题

https://help.duo.com/s/article/2094?language=en_US 我利用百度搜索下载了一个 Duo Mobile App (由于 Google Play)在大陆不可用。 在扫描旧手机上的 Duo Mobile App 的二维码时, 显示出错...

圣洁之子
17分钟前
3
0
Zabbix监控Mysql容器(Docker容器)主从是否存活

1、在Zabbix Web端创建模板 2、为该模板创建监控项 3、创建触发器 4、在zabbix-agent端操作 在/etc/zabbix/zabbix_agentd.d新建customize.confw文件 内容如下 UserParameter=mysql.replicat...

abowu
18分钟前
2
0
基于 RocketMQ 的同城双活架构在美菜网的挑战与实践

本文整理自李样兵在北京站 RocketMQ meetup分享美菜网使用 RocketMQ 过程中的一些心得和经验,偏重于实践。 嘉宾李样兵,现就职于美菜网基础服务平台组,负责 MQ ,配置中心和任务调度等基础...

大涛学长
24分钟前
5
0
设计模式之:外观模式和桥接模式

作者:DevYK 链接:https://juejin.im/post/5d7e01f4f265da03b5747aac 外观模式 介绍 外观模式 (Facade) 在开发过程中的运用评率非常高,尤其是在现阶段,各种第三方 SDK “充斥” 在我们周边...

Java架构Monster
26分钟前
2
0
人证合一核验设备

人脸身份验证机,人证合一设备1:N如我们现在在车站或一些重要的场所如步行街、城中村等人流密集的场所应用的人脸识别布控系统,其特点是动态和非配合。所谓的动态也就是识别的不是照 片,不是...

非思丸智能
27分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部