文档章节

JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式

济远
 济远
发布于 2017/02/12 09:05
字数 2403
阅读 3
收藏 0
点赞 0
评论 0

  JMS规范定义了2种消息传输模式:持久传送模式和非持久传输模式。发送者可以通过如下类似的代码进行设置

TopicPublisher publihser = session.createPublisher(topic);

// 设置持久化传输
publihser.setDeliveryMode(DeliveryMode.PERSISTENT);

这种方式对publisher发送的所有消息都有效,相当于是一个全局的效果。如果只是想设置某一个消息的传输模式,可以通过以下代码设置消息头的属性来实现

TextMessage message = session.createTextMessage(text);
    
message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);

     使用传输模式是一件很容易的事,直接调用API就可以了。那什么是传输模式呢?传输模式是用来控制消息属性的,DeliveryMode.PERSISTENT代表这是持久消息,DeliveryMode.NON_PERSISTENT代表是非持久消息。个人觉得传输模式和消息持久化是同一个概念,只不过是不同的叫法而已。

1.NON_PERSISTENT模式和 PERSISTENT模式

   对于非持久的消息,JMS provider不会将它存到文件/数据库等稳定的存储介质中。也就是说非持久消息驻留在内存中,如果jms provider宕机,那么内存中的非持久消息会丢失。A JMS provider must diliver a NON_PERSISTENT message at-most-once 。对于持久消息,消息提供者会使用存储-转发机制,先将消息存储到稳定介质中,等消息发送成功后再删除。如果jms provider挂掉了,那么这些未送达的消息不会丢失;jms provider恢复正常后,会重新读取这些消息,并传送给对应的消费者。 A JMS provider must diliver a PERSISTENT message once -and-only-once

2.消息是否持久和是否送达

    消息的持久特性就是为了在异常发生的时候保证消息的送达 。如果网络、jms provider、消息生产者、消息消费者都不会出现任何故障,那么持久消息和非持久消息就没有差别了。因为一旦消息成功传送给它的所有消费者,那么jms provider会从内存/硬盘上删除这些无用的消息。显然一切正常的情况下,使用PERSISTENT消息非常浪费, 因为持久传送消 息前,需要先将消息保存到硬盘;消息发送成功后,还需要将消息从硬盘上删除。 但现实情况是,网络可能出现断连、provider和消费者都有可能宕机。因此对于一些非常重要,不容许任何丢失的消息,一定要采用 PERSISTENT模式。

3.持久消息和持久订阅者

   我的另一篇博客   理解JMS规范中的持久订阅和非持久订阅    介绍了持久订阅者和非持久订阅者的差别。持久订阅者和持久消息有什么区别和联系吗?持久消息发送给持久订阅者和非持久订阅有什么差别?非持久消息能够发送给持久订阅者吗?下面通过一些测试代码,来阐述持久消息和持久订阅者的关系。测试代码是基于ActiveMQ5.8.0版本。

 

3.1生产者发送持久消息和非持久消息,但是消息没有消费者,即这是一条无用消息

package mq.aty.persistentmsg;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import mq.aty.JmsUtils;

/**
 * 直接运行该程序和activeMQ,不运行任何的消费者,然后观察持久化介质(我们使用了数据库)
 *
 */
public class NoReceiverTest
{
  private static TopicConnection connection = null;

  private static Topic topic = null;

  public static void main(String[] args) throws Exception
  {
    connection = JmsUtils.getConnection();
    topic = JmsUtils.getTopic();
    
    sentPersistent();
    sentNonPersistent();
    
    connection.close();
  }

  public static void sentPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "I am persistent message.order=" + i;

      TextMessage message = session.createTextMessage(text);
      
      message.setJMSPriority(i);

      publihser.publish(message);
    }

  }
  
  public static void sentNonPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "non-persistent message.id=" + i;

      TextMessage message = session.createTextMessage(text);

      publihser.publish(message);
    }

  }

}

我使用了mysql数据库,并配置了activeMQ将消息持久化到数据库。运行上面的程序,发现mysql数据库中activemq_msgs表没有任何数据。可以证明:持久消息和非持久消息都被MQ消息服务器丢弃了。 无论是持久消息,还是非持久消息,如果消息没有对应的消费者,那么activeMQ会认为这些消息无用,直接删除。

 

3.2生产者发送持久消息和非持久消息,只有非持久订阅者

之前的博客已经介绍了:非持久订阅者只有在活动状态,并且和jms provider的保持连接情况下,才能收到消息。如果非持久订阅者挂掉了,那么不能再接收任何消息(无论是持久消息,还是非持久消息)。如果订阅者挂掉了,后续jms provider再收到消息,就变成了3.1的情况。也就是说:消息是否持久化,和非持久订阅者没有关系。

 

3.3持久消息和非持久消息,发送给离线的持久订阅者

消息的发送者源码:

package mq.aty.persistentmsg;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import mq.aty.JmsUtils;

/**
 * 直接运行该程序和activeMQ,没有任何的消费者,然后观察持久化介质(我们使用了数据库)
 *
 */
public class NoReceiverTest
{
  private static TopicConnection connection = null;

  private static Topic topic = null;

  public static void main(String[] args) throws Exception
  {
    connection = JmsUtils.getConnection();
    topic = JmsUtils.getTopic();
    
    sentPersistent();
    sentNonPersistent();
    
    connection.close();
  }

  public static void sentPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "I am persistent message.order=" + i;

      TextMessage message = session.createTextMessage(text);
      
      message.setJMSPriority(i);

      publihser.publish(message);
    }

  }
  
  public static void sentNonPersistent() throws Exception
  {
    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    TopicPublisher publihser = session.createPublisher(topic);

    publihser.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    for (int i = 0; i < 3; i++)
    {
      String text = "non-persistent message.id=" + i;

      TextMessage message = session.createTextMessage(text);

      publihser.publish(message);
    }

  }

}

持久订阅者源码如下:

package mq.aty.persistentmsg;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import mq.aty.JmsUtils;

/**
 * <pre>
 *  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码
 *  
 *  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,
 *     也就是说activeMQ识别和接受了我们的持久订阅者
 *    
 *  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息。发现activemq_msgs中多持久消息
 *  
 *  4、运行持久订阅者。发现持久消息和非持久消息都能接受到
 * </pre>
 * 
 */
public class DurableSubscriberTest
{

  public static void main(String[] args) throws Exception
  {
    TopicConnection connection = JmsUtils.getConnection();
    Topic topic = JmsUtils.getTopic();

    // 创建持久订阅的时候,必须要设置client,否则会报错:
    // javax.jms.JMSException: You cannot create a durable subscriber
    // without specifying a unique clientID on a Connection

    // 如果clientID重复(已经存在相同id的活动连接),会报错
    // javax.jms.InvalidClientIDException: Broker: localhost - Client: 1
    // already connected from tcp://127.0.0.1:2758
    connection.setClientID("1");

    TopicSession session = connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);

    // 在同一个连接的ClientID下,持久订阅者的名称必须唯一
    // javax.jms.JMSException: Durable consumer is in use for client: 1 and
    // subscriptionName: 11

    // TopicSubscriber subscriber = session.createSubscriber(topic);
    TopicSubscriber subscriber = session.createDurableSubscriber(topic,
        "11");

    subscriber.setMessageListener(new MessageListener() {

      @Override
      public void onMessage(Message msg)
      {
        try
        {
          TextMessage textMsg = (TextMessage) msg;
          System.out.println("DurableSubscriber get:"
              + textMsg.getText());
        } catch (JMSException e)
        {
          e.printStackTrace();
        }
      }
    });

    connection.start();// 一定要start
  }
}

 

在第二步操作的时候,查看mysql数据库可以发现,数据库表activemq_acks中多了一条记录,记录我们的持久订阅者

在第三步操作的时候,查看数据库表activemq_msgs中多了3条持久消息。可以发现activeMQ会将持久消息保存到硬盘。

最后当我们重新启动持久订阅者的时候,可以发现,持久消息和非持久消息都能够接收到。这个时候 activemq_msgs中的消息被删除。

 

通过这种情况测试,只能看出持久订阅者和非持久订阅者存在差别: 持久订阅者能够接收离线消息,不管该消息是不是持久消息

我们好像还看出持久消息和非持久消息的区别,这是因为我们进行上述测试的时候, 没有关闭activeMQ服务器,所以无论是硬盘上的持久消息,还是内存中的非持久消息,都不会丢

 

接下来我们还是使用上面的发送者和接收者源码,但是改变下操作顺序。按照如下顺序进行操作:

* <pre>
 *  1、先运行监听者,向jms server注册,让jms server知道有这个持久订阅者。类似于你向腾讯申请个QQ号码
 *
 *  2、启动jms server和持久订阅者(运行该类)。查看数据库可以发现activemq_acks中多了一条记录,
 *     也就是说activeMQ识别和接受了我们的持久订阅者
 *  
 *  3、停止持久订阅者,启动生产者向MQ服务器发送持久消息和非持久消息
 *  
 *  4、消息发送成功后,停止activemq服务器、
 *  
 *  5、重新启动mq服务器和订阅者。发现只能接收到持久消息
 * 
 * </pre>

我们发现当activeMQ服务器挂掉再重启的时候,持久订阅者只能收到持久消息,不能收到非持久消息。

4.总结

   通过上述测试代码和执行结果,我们得出以下结论:

   持久订阅者/非持久订阅者,只影响离线的时候消息(包括持久消息和非持久消息)是否能接收到,和消息是否持久无关;

   持久消息/非持久消息,只是影响jms provider宕机后。消息是否会丢失,如果永远不会宕机,那么持久消息和非持久消息没有区别。

© 著作权归作者所有

共有 人打赏支持
济远
粉丝 0
博文 13
码字总数 13858
作品 0
南京
程序员
Java消息中间件的概述与JMS规范

为什么需要使用消息中间件 在介绍消息中间件之前,我们先来看一个故事: 老王的睡前故事: 在很久很久以前,小明隔壁有个姓王的邻居,姑且就叫隔壁老王吧。隔壁老王有个大女儿,名叫王兰花秀...

ZeroOne01 ⋅ 05/25 ⋅ 0

(四)SpringBoot——JPA

一、JPA JPA(Java Persistence API),通过JDK5注解或者xml描述对象-关系表的映射关系,并将运行期的实体对象持久化到数据库中。 例如,开源的Hibernate就是遵循JPA规范的,我们经常用到的M...

solidwang ⋅ 04/23 ⋅ 0

提高ActiveMQ工作性能(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/50955502 目...

yunlielai ⋅ 04/15 ⋅ 0

消息队列ActiveMQ的使用详解

通过上一篇文章 《消息队列深入解析》,我们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。 这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。 学习消息队...

snailclimb ⋅ 04/24 ⋅ 0

web项目中web.xml的作用

每个javaEE工程中都有web.xml文件,那么它的作用是什么呢?它是每个web.xml工程都必须的吗? 一个web中可以没有web.xml文件,也就是说,web.xml文件并不是web工程必须的。 web.xml文件是用来...

ChinaHYF ⋅ 04/27 ⋅ 0

ActiveMQ初探(1)——介绍与基本使用

一、ActiveMQ 1.1 什么是ActiveMQ 是Apache出品,最流行的,能力强劲的。ActiveMQ是一个完全支持和规范的 实现,尽管规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊...

yuanlaijike ⋅ 04/15 ⋅ 0

JavaWeb开发比较重要的面试题

JavaWeb开发比较重要的面试题 1. 编码转换:怎样将GB2312编码的字符串转换为ISO-8859-1编码的字符串? 答:示例代码如下: String s1=”你好”; String s2=new String(s1.getBytes(“GB2312”...

xj_9264 ⋅ 05/06 ⋅ 0

JSP 学习总结---学习笔记

什么是JSP 1)为什么说,Servlet是一个动态Web开发技术呢? Servlet是基于服务端的一种动态交互技术, HttpServletRequest表示客户端到服务端的对象 HttpServletResponse表示服务端到客户端的...

知止内明 ⋅ 04/18 ⋅ 0

Rocketmq支持用JmsTemplate发送消息吗

在Spring框架中使用JMS的JMS template同步发送消息,非常简单和强大,Activemq是标准的消息队列,其支持JMS template,现在我们用Rocketmq,据说很多没有遵循JMS规范,不知道能不能用JMS tem...

PMP4561705 ⋅ 05/14 ⋅ 0

web服务器,应用程序服务器,http服务器的区别

WEB服务器、应用程序服务器、HTTP服务器有何区别?IIS、Apache、Tomcat、Weblogic、WebSphere都各属于哪种服务器? 这个概念很重要。 Web服务器的基本功能就是提供Web信息浏览服务。它只需支持...

水墨如丹青 ⋅ 04/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

LVM

LVM: 硬盘划分分区成物理卷->物理卷组成卷组->卷组划分逻辑分区。 1.磁盘分区: fdisk /dev/sdb 划分几个主分区 输入t更改每个分区类型为8e(LVM) 使用partprobe生成分区的文件:如/dev/sd...

ZHENG-JY ⋅ 30分钟前 ⋅ 0

彻底删除Microsoft Office的方法

参照此链接彻底删除Office https://support.office.com/zh-cn/article/%e4%bb%8e-pc-%e5%8d%b8%e8%bd%bd-office-9dd49b83-264a-477a-8fcc-2fdf5dbf61d8?ui=zh-CN&rs=zh-CN&ad=CN......

Kampfer ⋅ 45分钟前 ⋅ 0

大盘与个股之间关系

大盘走多:积极出手 顺势加码 大盘走空: 少量出手 退场观望 大盘做头:逆势减码 少量操作 大盘做底 : 小量建仓 小量试单

guozenhua ⋅ 47分钟前 ⋅ 0

Day16 LVM(逻辑卷管理)与磁盘故障小案例

lvm详解 简述 LVM的产生是因为传统的分区一旦分区好后就无法在线扩充空间,也存在一些工具能实现在线扩充空间但是还是会面临数据损坏的风险;传统的分区当分区空间不足时,一般的解决办法是再...

杉下 ⋅ 53分钟前 ⋅ 0

rsync实现多台linux服务器的文件同步

一、首先安装rsync,怎样安装都行,rpm,yum,还是你用源码安装都可以。因为我用的是阿里云的ESC,yum install rsync就ok了。 二、配置rsync服务 1.先建立个同步数据的帐号 123 groupadd r...

在下头真的很硬 ⋅ 今天 ⋅ 0

前端基础(三):函数

字数:1685 阅读时间:5分钟 函数定义 在最新的ES规范中,声明函数有4中方法: -函数声明 -函数表达式 -构造函数Function -生成器函数 1.函数声明 语法: function name([param[, param2 [....

老司机带你撸代码 ⋅ 今天 ⋅ 0

Java虚拟机的Heap监狱

在Java虚拟机中,我是一个位高权重的大管家,他们都很怕我,尤其是那些Java 对象,我把他们圈到一个叫做Heap的“监狱”里,严格管理,生杀大权尽在掌握。 中国人把Stack翻译成“栈”,把Hea...

java高级架构牛人 ⋅ 今天 ⋅ 0

Spring MVC基本概念

只写Controller

颖伙虫 ⋅ 今天 ⋅ 0

微软重金收购GitHub的背后逻辑原来是这样的

全球最大的开发者社区GitHub网站花落谁家的问题已经敲定,微软最终以75亿美元迎娶了这位在外界看来无比“神秘”的小家碧玉。尽管此事已过去一些时日,但整个开发者世界,包括全球各地的开源社...

linux-tao ⋅ 今天 ⋅ 0

磁盘管理—逻辑卷lvm

4.10-4.12 lvm 操作流程: 磁盘分区-->创建物理卷-->划分为卷组-->划分成逻辑卷-->格式化、挂载-->扩容。 磁盘分区 注: 创建分区时需要更改其文件类型为lvm(代码8e) 分区 3 已设置为 Linu...

弓正 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部