文档章节

activeMq消息转投rabbitMq研究

Mr_Qi
 Mr_Qi
发布于 2015/10/10 15:11
字数 1950
阅读 757
收藏 3

在研究activemq转投消息到rabbitmq的过程中还是发现了很多有趣的细节。    消息发送端分为PERSISTENT与NON_PERSISTENT,该类型表示是否持久化消息到数据库中。

1. Activemq默认使用kahaDB。我大Q9使用的也是kahaDB。当然也是支持mysql等数据库的。

具体配置在${activemq.base}/conf/activemq.xml中。

<persistenceAdapter>
   <kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>

<!--
   The systemUsage controls the maximum amount of space the broker will
   use before slowing down producers. For more information, see:

   http://activemq.apache.org/producer-flow-control.html
   -->
<systemUsage>
   <systemUsage>
      <memoryUsage>
         <memoryUsage limit="4 gb"/>
      </memoryUsage>
      <storeUsage>
         <storeUsage limit="15 gb"/>
      </storeUsage>
      <tempUsage>
         <tempUsage limit="100 mb"/>
      </tempUsage>
   </systemUsage>
</systemUsage>

和大家理解不一样的地方是NON_PERSISTENT是会使用文件作为存储介质的。主要是为了防止内存挤爆。当发送者发送过快或者接受者处理过慢都会导致使用大量内存。此时将消息临时存储在临时文件中(swap)。

2.    对于PERSISTENT与NON_PERSISTENT区别在于是否在mq服务器重启后能够正常发送消息。PERSISTENT的消息在服务器重启后依然能够将message发送出去。

如果服务端的topic没有订阅者该消息将被直接丢弃。

3.       消费者的持久化则有一定区别。当为queue的时候,若客户端不在线等到某个客户端消费了该消息时则会将该消息删除。当为topic时,若客户端未设置subscriptionDurable,则该客户端必须要在线才能收到订阅。当客户端设置subscriptionDurable为true时,则服务器会保存该消息直到被所有的订阅者均消费一次(消费是指服务器收到ack回复)

        

消息发送端 消息接收端 可靠性及因素
PERSISTENT queue receiver/durable subscriber 消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。
PERSISTENT non-durable subscriber 最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。
NON_PERSISTENT queue receiver/durable subscriber 最多消费一次。这是由于服务器的宕机会造成消息丢失
NON_PERSISTENT non-durable subscriber 最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定

服务端可以根据clientId及durableSubscriptionName来辨别指定的订阅者以便将该订阅者尚未消费的消息供消费。

记得在设置subscriptionDurable时候也需要设置durableSubscriptionName如下注释。否则该名称会变成listener的名称。

/**  * Set whether to make the subscription durable. The durable subscription name  * to be used can be specified through the "durableSubscriptionName" property.  * <p>Default is "false". Set this to "true" to register a durable subscription,  * typically in combination with a "durableSubscriptionName" value (unless  * your message listener class name is good enough as subscription name).  * <p>Only makes sense when listening to a topic (pub-sub domain).  * @see #setDurableSubscriptionName  */

如下为一配置示例

<bean id="jmsbillChoiceOfAviationContainer"    class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="expressJmsFactory" />    <property name="destination" ref="expressDssScanTopicDestination" />    <property name="messageListener" ref="billChoiceOfAviationTopicListener" />    <property name="durableSubscriptionName" value="billChoiceOfAviation" />    <property name="clientId" value="billChoiceOfAviationClient" />    <property name="subscriptionDurable" value="true" /> </bean>

请注意在配置destination属性是topic。该类还有一个属性为destinationName。配置该属性时会默认为queue。

在设置destination时执行如下,可以看到还有setPubSubDomain(true);才会表示为订阅模式否则为点对点模式

/**  * Set the destination to receive messages from.  * <p>Alternatively, specify a "destinationName", to be dynamically  * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.  * <p>Note: The destination may be replaced at runtime, with the listener  * container picking up the new destination immediately (works e.g. with  * DefaultMessageListenerContainer, as long as the cache level is less than  * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!  * @see #setDestinationName(String)  */ public void setDestination(Destination destination) {    Assert.notNull(destination, "'destination' must not be null");    this.destination = destination;    if (destination instanceof Topic && !(destination instanceof Queue)) {       // Clearly a Topic: let's set the "pubSubDomain" flag accordingly.       setPubSubDomain(true);    } }

因此在设置destinationName属性时如果是topic需要增加

<property name="pubSubDomain" value="true"/>

4.      rabbitMQ和activeMq很大的不同在于rabbitMq的消息发送完全基于queue。

在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

l  消费者是无法订阅或者获取不存在的MessageQueue中信息。

l  消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的

当rabbitmq的生产者发送消息出来后该消息会发送到指定的exchange中。Exchange分为如下几种常用类型:direct, fanout,topic

1)    fanout
所有bind到此exchangequeue都可以接收消息

通常此处routingkey””

 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

 

2)    direct
通过routingKeyexchange决定的那个唯一的queue可以接收消息

 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
//需要绑定路由键  
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

 

 

3)    topic
所有符合routingKey(此时可以是一个表达式)routingKeybindqueue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.aaa.aaaa.a
*.a会匹配a.ab.ac.a
注:使用RoutingKey#Exchange Typetopic的时候相当于使用fanout

Channel channel = connection.createChannel();  
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic  
channel.queueDeclare("queueName");  
channel.queueBind("queueName", "exchangeName", "routingKey.*");  
  
byte[] messageBodyBytes = "hello world".getBytes();  
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

 

 

Rabbitmq和activemq区别很大的一点是在于当生产者发送消息给topic时,activemq是将该消息广播至该处所有的订阅者(包括离线持久订阅者),而rabbitmq的消息在发送时不一样是还需要配合routingkey。只有符合表达式的订阅者才会被转发。二订阅者依旧是关注被转发的queue,符合该表达式的消息会被转发至对应的queue中,这样客户端消费者才可以消费到。

因此想要持久化订阅topic在rabbitmq中还需要对应在exchange中增加一个名称唯一的queue来进行转发。

如下如果多个listener需要订阅该topic,则需要每个listener对应一个不同的queue,以便转发。

<rabbit:topic-exchange  id="expressDssScanTopicExchange" name="expressDssScanTopicExchange" durable="true" >       <rabbit:bindings>           <rabbit:binding queue="express.scan" pattern="express.dssScan"/>    <rabbit:binding queue="express.dss" pattern="express.dssScan"/>       </rabbit:bindings>   </rabbit:topic-exchange>

 


 

 

 

 

© 著作权归作者所有

Mr_Qi

Mr_Qi

粉丝 288
博文 12
码字总数 13288
作品 0
南京
程序员
私信 提问
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
2018/04/26
0
0
消息队列RabbitMQ和ActiveMQ的生产者流量控制

20120825 郑昀 Q:MQ 们为什么要做生产者流量控制? A:麻烦就在于:『像 Erlang 的虚拟机实现和设计上都没有阻止用户往一个进程的消息队列里扔消息,当消息的生产速度过快,超过进程的处理能...

旁观者-郑昀
2012/10/11
0
1
Linux环境下Apache ActiveMQ 基本安装

在linux上安装mq,并映射到外网。 1.Apache ActiveMQ安装基本条件请参考链接: 2.下载Apache ActiveMQ 3.解压安装 将下载好的apache-activemq-5.13.3-bin.tar解压至安装目录 命令:tar -zxvf...

zhuyuansj
2018/04/02
0
0
ActiveMQ RabbitMQ KafKa对比

前言: ActiveMQ和 RabbitMq 以及Kafka在之前的项目中都有陆续使用过,当然对于三者没有进行过具体的对比,以下摘抄了一些网上关于这三者的对比情况,我自己看过之后感觉还 是可以的,比较清...

xiaomin0322
2018/05/11
0
0
消息中间件系列第2讲:如何进行消息队列选型?

要做技术选型,那么必须对现今的各个消息中间件有个深入的理解才能做技术选型。否则别人问你,你为什么要用这个消息中间件,你说不出个所以然来,怎么做架构师呢? 截止到目前为止,现在业界...

陈树义
01/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

c 基础教程六:c 循环结构

有的时候,我们可能需要多次执行同一块代码,c 语言提供了如下几种循环,各有特色。 while 循环 for 循环 do-while 循环 while 循环 只要给定的条件为真,C 语言中的 while 循环语句会重复执...

故城以南丶思念不安
21分钟前
4
0
spark 常见操作

为spark DataFrom 添加一个为 空的新列,使用UDF函数 想产生一个IntegerType类型列为null的DataFrame该怎么做。 import org.apache.spark.sql.functions._import org.apache.spark.sql.type...

蜉先生
32分钟前
2
0
Flutter for Web 详细预研

首先感谢@栖冰 @祖建国 一起对FFW的预研做的投入! 背景 Google在最新的Google I/O上推出了Flutter for Web,旨在进一步解决一次代码,多端运行的问题。Flutter for Web还处于早期试验版,官...

阿里云云栖社区
42分钟前
1
0
mongodb自动备份脚本

mongodb自动备份脚本 2019年04月08日 13:27:28 遗失的曾经! 阅读数 73 #!/bin/bash# 要备份的数据库名'多个数据库用空格分开# 备份文件要保存的目录basepath="/data/backup/dump$(da...

linjin200
43分钟前
1
0
如何使用pagehelper分页

<c:if test="${page != null && page.getTotal() > 0 }"> <nav style="text-align: center"><ul class="pagination pagination-lg"><li><a>共 ${page.total } 条记录</a></l......

南桥北木
52分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部