文档章节

activemq高级客户端选项

泡海椒
 泡海椒
发布于 2016/02/15 18:10
字数 3226
阅读 278
收藏 1
点赞 1
评论 0

我们经常希望维持队列中的消息,按一定次序转发给消息者。然而当有多个JMS Session和消息消费者实例的从同一个队列中获取消息的时候,就不能保证消息顺序处理。因为消息被多个不同线程并发处理着。
在ActiveMQ4.x中可以采用Exclusive Consumer或者Exclusive Queues,避免这种情况,Broker会从消息队列中,一次发送消息给一个消息消费者来保证顺序。
A.         当在接收信息的时候有一个或者多个备份接收消息者和一个独占消息者的同时接收时候,无论两者创建先后,在接收的时候,均为独占消息者接收。
B.         当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。
C.         当有多个备份消息者和多个独占消费者的时候,当所有的独占消费者均close的时候,只有一个备份消费者接到到消息。
备注:备份消费者为不带任何参数的消费者。
12.1.1选择一个独占的message consumer
对于应用来说,那些重要的order ,或者,你需要确保这里仅仅只有一个message consumer对于queen,activemq提供了一个客户端选项来确保只有一个active message consumer来处理message
activemq meaasge broker也会在queen上选择一个consumer来处理消息,这样的好处就是允许broker来选择,即使consumer失败或者停止了,然后另外一个message consumer能够被选择成为
active的
如果你混合了标准consumer和exclusive consumer在同一个queen上 ,the activemq将会仅仅选择exclusive的其中一个consumer,,如果所有的exclusive consumer都变为inactive那么就会选择
标准的consumer,然后queen的消费将会变为正常的传输模式,
        queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
        consumer = session.createConsumer(queue);
12.1.2利用exclusive consumer来提供分布式锁的功能
通常你用message从外部资源来广播数据,如果你想构建一个冗余的, 即使你有一个实例阅读和广播changedate失败了【改变数据库记录,在文件里面的内容用逗号分隔】,另一个实例都将要接管,通常你依靠锁住资源【行锁或者文件锁】来确保
仅仅只有一个程序能够acess data并且广播over topic ,但是当你不想利用数据库,或者想要运行一个程序跨越一个机器(不能用分布式锁),然后你就只能用独占consumer来创建一个分布式锁
为了能够使用独占consumer来创建分布式锁,我们需要我没得producer订阅独占的queen, 如果message producer接收到queen,他就便激活了, 并且能够 订阅实时的feed和把实时数据变为jms message
   this.connection = this.factory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = this.session.createQueue(this.queueName + "?consumer.exclusive=true");
        Message message = this.session.createMessage();
        MessageProducer producer = this.session.createProducer(destination);
        producer.send(message);
        MessageConsumer consumer = this.session.createConsumer(destination);
        consumer.setMessageListener(this);
在这个代码片中,我们总是send a message到queen这一步总是被外部的管理程序执行的,注意到Session.CLIENT_ACKNOWLEDGE模式来消费这个消息,尽管我们想要被通知我们是独占的consumer, 因此我们有锁,我们不想要remove,我们
不想要remove这一条消息吗, 如果我们失败了, 我们的另一个独占producer将会active
正在这个列子中我们实现了MessageListener,如果我们没有active, 我们将要call一个功能性方法start producing ,如果我们是实时应用, 这个方法将要订阅一个实时的并且转换实时的data 进入jms message
    public void onMessage(Message message) {
        if (message != null && this.active==false) {
            this.active=true;
            startProducing();
        }
    }
    
12.2 message groups
全部的message 都将要转向单一的message consumer,message也能够分组来给予单一的consumer, 一个message producer也能指定一个group,通过指定message header JMSXGroupId,
ActiveMQ将要确保全部相同的JmsxGroupID的message发送给相同的consumer
如果Activemq broker制定了consumer接受消息通过JmsxGroupID,那么他就应该close掉,然后activemq broker将要选择一个不同的message consumer来dispatch给不同的message
为了创建一个group,你需要设置JmsGroupID string property在消息上
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("group.queue");
        MessageProducer producer = session.createProducer(queue);
        Message message = session.createTextMessage("<foo>test</foo>");
        message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
        producer.send(message);
这个列子显示了message producer已经被创建了, 并且设置好textmessage 属于message group TEST_GROUP_A
message group利用正常的message consumer,因此没有额外的工作需要group来消费message, 全部的工作都被message producer来定义一个group的消息属于什么, activemqbroker选择一个
message consumer来处理全部的分组消息
activemqbroker对于group里面的每一条消息都会添加一条sequeence no,[通过JMSXGroupSeq,从1开始]
    但是从consumer视角来说,你不能假定你从一个新的group里收到的第一条JMSXGroupSeq设置1, 如果一个存在的group close掉或者死掉之后, 任何消息route到这个group里的都会分配给一个新的consumer
为了帮助识别一个消息的consumer 从一个新的group里收到消息,或者一个新的group从来没有被看见过, 一个boolean 参数叫做JMSXGroupFirstForConsumer被设置了对于第一个message, 你也能够核对是否
他是为第一条message设置的【对于新组】, 你也能够核对消息是否被
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("group.queue");
        MessageConsumer consumer = session.createConsumer(queue);
        Message message = consumer.receive();
        String groupId = message.getStringProperty("JMSXGroupId");
        if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
            // do processing for new group
        }
The Activemq message代理允许 分配各种各样的消息groups跨越多个consumer,但是如果这里早已经有message等着dispatch, the message group典型的分配给第一个consumer,为了确保一个基数de
的分布式负载均衡,他可能考虑message broker等着开启更多的messgae consumer , 为了这样做, 你不得不设置destination policy在active  broker 配置里面,设置好consumersBeforeDispatchStarts参数

        
<destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="5000">
        </policyEntry>
      </policyEntries>
    </policyMap>
</destinationPolicy>
这个配置告诉ActiveMq broker, 都应该等着两个consumer在dispatch之前, 另外我们也可以看到timeBeforeDispatchStarts参数5000ms来通知activemq broker如果两个message consumer在5s
内没有砸queen上得到消息,利用messgae group添加最小化的active broker 就每个消息group存储routing 信息而言。这是明晰的关掉message group通过发送message从activemq broker 的JMSXGroupID
 设置为-1
 Connection connection = new ActiveMQConnectionFactory().createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("group.queue");
        MessageProducer producer = session.createProducer(queue);
        Message message = session.createTextMessage("<foo>close</foo>");
        message.setStringProperty("JMSXGroupID", "TEST_GROUP_A");
        message.setIntProperty("JMSXGroupSeq", -1);
        producer.send(message);
12.3activemqstream
Activemq stream是一个高级的特色,他允许使用activemq来作为Jave Io stream,activeMQ将要break 一个outputstream对于不同的data chunk并且send每一个chunk通过activemq作为jms message
一个相应的activemq jms inputstream应该用在consumer边重新结合data chunk
如果你用queen 作为streamd的destination,使用不止一个consumer 在queen上(或者一个独占的consumer)是很好的, 由于group的这个特色【用同样的groupid指向一个单一的consumer】,使用超过一个的producer可能会造成message排序order
的问题
利用jms的好处就是activemq 把breank stream 分为了管理的块【chunk】, 并且允许你在consumer端给合并, 因此这是允许你传输大文件用这个功能
为了证明这个用stream
 //source of our large data
        FileInputStream in = new FileInputStream("largetextfile.txt");
        
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue destination = session.createQueue(QUEUE_NAME);
        OutputStream out = connection.createOutputStream(destination);
        
        //now write the file on to ActiveMQ
        byte[] buffer = new byte[1024];
        while(true){
            int bytesRead = in.read(buffer);
            if (bytesRead==-1){
                break;
            }
            out.write(buffer,0,bytesRead);
        }
        out.close();
    
    
在下面的这个例子中我们创建了一个ActiveMQConnection并且创建了一个inputstream利用一个queen, 注意到我们利用一个独占的consumer通过apend"?consumer.exclusive=true";
我们确保仅仅一个consumer 能够阅读到一个queen,我们read InputStream并且通过FileOutputStream来重组file在硬盘上
你也能够使用topic, 尽管这个
        //destination of our large data
        FileOutputStream out = new FileOutputStream("copied.txt");
        
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //we want be be an exclusive consumer
        String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";
        Queue destination = session.createQueue(exclusiveQueueName);
        
        InputStream in = connection.createInputStream(destination);
        
        //now write the file from ActiveMQ
        byte[] buffer = new byte[1024];
        while(true){
            int bytesRead = in.read(buffer);
            if (bytesRead==-1){
                break;
            }
            out.write(buffer,0,bytesRead);
        }
        out.close();
    }
12.4 Blob消息
activemq引进了blob来处理large message
自己处理中转
如果自己处理文件的话,一个简单方式是使用共享或ftp、dfs等方式,先把文件发送到一个大家都可以拿到的地方,然后发送message,payload或properties中包含文件的路径信息。这样,consumer拿到文件路径后去指定的地方,按照给定的方式去获取文件数据即可。
优势:这种方式可以用来处理大数据,并且不需要client或broker在内存中持有文件数据本身,非常的节省资源。而且文件是通过额外的方式处理,跟ActiveMQ本身无关,所以符合jms协议、处理的效率也相对比较高。
劣势:需要自己处理很多文件相关的操作。
BlobMessage对文件中转的封装
幸运的是,ActiveMQ把上面繁复的文件处理工作进行了封装,屏蔽掉文件中转的整个处理过程,使得我们可以使用类似jms规范的API来简单操作文件传输。
    String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(destination);
        BlobMessage message = session.createBlobMessage(new URL("http://some.shared.site.com"));
        producer.send(message);

consumer for blob::

      FileOutputStream out = new FileOutputStream("blob.txt");
        String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
        Connection connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(destination);
        BlobMessage blobMessage = (BlobMessage) consumer.receive();
        InputStream in = blobMessage.getInputStream();
        // now write the file from ActiveMQ
        byte[] buffer = new byte[1024];
        while (true) {
            int bytesRead = in.read(buffer);
            if (bytesRead == -1) {
                break;
            }
            out.write(buffer, 0, bytesRead);
        }
        out.close();
    }
12.5网络存活 或者代理失败后的失效转移协议
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)
activemq将会随机的选择list中的其中一个用失效转移协议,如果仅仅只有一个uri那么客户端将会隔断时间查看是否broker available,你可以利用TransportListener来监听activemq的连接
public class ClientTransportListener implements TransportListener {  
 
  protected final Logger logger = LoggerFactory.getLogger(ClientTransportListener.class);  
 
  public void onCommand(Object o) {  
    logger.debug("onCommand检测到服务端命令:{}", o);  
  }  
 
  public void onException(IOException error) {  
    logger.error("onException,与服务器连接发生错误......");  
  }  
 
  public void transportInterupted() {  
    logger.error("transportInterupted,与服务器连接发生中断......");  
    IConnector connector = new Connector();  
    connector.reConnect();  
  }  
 
  public void transportResumed() {  
    logger.info("transportResumed,恢复与服务器连接....");  
  }  
 
}  
当你想要按照顺序来启动
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?random=false
如果个了段时间还是连不上,the failover protocol将会增加一段总量来连接activemq broker,这个叫做指数退避算法Exponential Backoff默认的useExponentialBackoff是enable
参数                                     默认值                    含义
initialReconnectDelay                   10ms,               重连之前等待的时间(ms)
backOffMultiplier                       1.5                   增大等待时间的系数
maxReconnectDelay                       30000               重连之前等待的最大时间(ms)
failover:(tcp://host1:61616,tcp://host2:61616,ssl://host3:61616)?backOffMultiplier=2,initialReconnectDelay=1000
在maxInactivityDuration时间里没有连接上话就是invalidate
failover:(tcp://host1:61616?wireformat.maxInactivityDuration=0,tcp://host2:61616,ssl://host3:61616?wireformat.maxInactivityDuration=0)
默认的话activemq传输是持久化的,如果你使用非持久化的方式传输的话,为了防止丢失你就要使用trackMessages=true
maxCachesize
backup=true,backupPoolSize=2
updateClusterClients
rebalanceClusterClients
updateClusterClientOnRemove
updateClusterFilter           
12.6在future传输message
Property name                  type                            description

AMQ_SCHEDULED_DELAY            long                              The time in milliseconds that a message will wait before being scheduled to
                                                                 be delivered by the broker

AMQ_SCHEDULED_PERIOD           long                              The time in milliseconds to wait after the start time to wait before scheduling
                                                                 the message again

AMQ_SCHEDULED_REPEAT           int                               The number of times to repeat scheduling a message for delivery

AMQ_SCHEDULED_CRON             String                            Use a Cron entry to set the schedule

例如,有一个消息,原定在60秒-交付你需要设置amq_scheduled_delay
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);


你可以设置一个消息,等待一个初始延迟,并重复传送10次,等待10秒之间的每一个重新交付

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);


你也可以使用cron调度信息,例如,如果你想要一个消息如期交付的每一个小时,你就需要设置cron入口是0 * -例如

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);


cron调度优先使用消息延迟,然而,如果一个重复周期设置一个cron入门,ActiveMQ调度器将安排每次cron进入火灾的消息传递。用一个例子来解释更容易。
假设你想要一个消息,10次,一一秒的延迟之间的每一个消息-你希望这个发生每小时-你会这样做:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message)





    

© 著作权归作者所有

共有 人打赏支持
泡海椒
粉丝 10
博文 256
码字总数 291526
作品 0
成都
程序员
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai ⋅ 04/15 ⋅ 0

ActiveMQ初探(1)——介绍与基本使用

一、ActiveMQ 1.1 什么是ActiveMQ 是Apache出品,最流行的,能力强劲的。ActiveMQ是一个完全支持和规范的 实现,尽管规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊...

yuanlaijike ⋅ 04/15 ⋅ 0

ActiveMQ集群方案(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51124749 目...

yunlielai ⋅ 04/15 ⋅ 0

ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike ⋅ 05/08 ⋅ 0

爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场 ⋅ 04/26 ⋅ 0

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的...

afreon ⋅ 04/22 ⋅ 0

activeMQ发送邮件小例子

先安装apache-activemq-5.11.1 ,然后启动activemq.bat 启动activeMQ的控制台:http://localhost:8161/admin/,展示图如下: 解释:Number Of Pending Messages :等待消费的消息,这个是当前...

zhouyuzhoujing ⋅ 05/07 ⋅ 0

ActiveMQ RabbitMQ KafKa对比

前言: ActiveMQ和 RabbitMq 以及Kafka在之前的项目中都有陆续使用过,当然对于三者没有进行过具体的对比,以下摘抄了一些网上关于这三者的对比情况,我自己看过之后感觉还 是可以的,比较清...

xiaomin0322 ⋅ 05/11 ⋅ 0

ActiveMQ初探(3)——与SpringBoot整合

在上一章:ActiveMQ(2)——与Spring整合中,我们已经完成了ActiveMQ与Spring进行整合,本章将进行与SpringBoot整合。 发送即时消息 Step1:添加依赖包: 这里我们不使用网上大多数的,因为...

yuanlaijike ⋅ 05/08 ⋅ 0

ActiveMq笔记2-消息持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。 ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB, 无论使用哪种持久化方式,消息...

狂小白 ⋅ 02/25 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

MySQL 数据库设计总结

规则1:一般情况可以选择MyISAM存储引擎,如果需要事务支持必须使用InnoDB存储引擎。 注意:MyISAM存储引擎 B-tree索引有一个很大的限制:参与一个索引的所有字段的长度之和不能超过1000字节...

OSC_cnhwTY ⋅ 41分钟前 ⋅ 0

多线程(四)

线程池和Exector框架 什么是线程池? 降低资源的消耗 提高响应速度,任务:T1创建线程时间,T2任务执行时间,T3线程销毁时间,线程池没有或者减少T1和T3 提高线程的可管理性。 线程池要做些什...

这很耳东先生 ⋅ 48分钟前 ⋅ 0

使用SpringMVC的@Validated注解验证

1、SpringMVC验证@Validated的使用 第一步:编写国际化消息资源文件 编写国际化消息资源ValidatedMessage.properties文件主要是用来显示错误的消息定制 [java] view plain copy edit.userna...

瑟青豆 ⋅ 今天 ⋅ 0

19.压缩工具gzip bzip2 xz

6月22日任务 6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 xz压缩工具 6.1 压缩打包介绍: linux中常见的一些压缩文件 .zip .gz .bz2 .xz .tar .gz .tar .bz2 .tar.xz 建立一些文...

王鑫linux ⋅ 今天 ⋅ 0

6. Shell 函数 和 定向输出

Shell 常用函数 简洁:目前没怎么在Shell 脚本中使用过函数,哈哈,不过,以后可能会用。就像java8的函数式编程,以后获取会用吧,行吧,那咱们简单的看一下具体的使用 Shell函数格式 linux ...

AHUSKY ⋅ 今天 ⋅ 0

单片机软件定时器

之前写了一个软件定时器,发现不够优化,和友好,现在重写了 soft_timer.h #ifndef _SOFT_TIMER_H_#define _SOFT_TIMER_H_#include "sys.h"typedef void (*timer_callback_function)(vo...

猎人嘻嘻哈哈的 ⋅ 今天 ⋅ 0

好的资料搜说引擎

鸠摩搜书 简介:鸠摩搜书是一个电子书搜索引擎。它汇集了多个网盘和电子书平台的资源,真所谓大而全。而且它还支持筛选txt,pdf,mobi,epub、azw3格式文件。还显示来自不同网站的资源。对了,...

乔三爷 ⋅ 今天 ⋅ 0

Debian下安装PostgreSQL的表分区插件pg_pathman

先安装基础的编译环境 apt-get install build-essential libssl1.0-dev libkrb5-dev 将pg的bin目录加入环境变量,主要是要使用 pg_config export PATH=$PATH:/usr/lib/postgresql/10/bin 进......

玛雅牛 ⋅ 今天 ⋅ 0

inno安装

#define MyAppName "HoldChipEngin" #define MyAppVersion "1.0" #define MyAppPublisher "Hold Chip, Inc." #define MyAppURL "http://www.holdchip.com/" #define MyAppExeName "HoldChipE......

backtrackx ⋅ 今天 ⋅ 0

Linux(CentOS)下配置php运行环境及nginx解析php

【part1:搭建php环境】 1.选在自己需要安装的安装包版本,wget命令下载到服务器响应目录 http://php.net/releases/ 2.解压安装包 tar zxf php-x.x.x 3.cd到解压目录执行如下操作 cd ../php-...

硅谷课堂 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部