文档章节

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

afreon
 afreon
发布于 04/22 01:33
字数 1028
阅读 53
收藏 8
点赞 0
评论 0

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另一种是订阅\发布的消息模式。

点对点的消息模式

点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。

Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

点对点的发送方逻辑代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建
            Destination destination = session.createQueue("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

点对点的接收方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的消息模式

订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。

Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

订阅/发布的发送方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建
            Destination destination = session.createTopic("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的接收方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

本文转载自:https://blog.yoodb.com/yoodb/article/detail/1501

共有 人打赏支持
afreon
粉丝 20
博文 41
码字总数 5285
作品 0
海淀
架构师
ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息...

cookqq
2013/03/04
0
0
ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
0
1
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
04/26
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0
.Net平台下ActiveMQ入门实例

1.ActiveMQ简介 先分析这么一个场景:当我们在网站上购物时,必须经过,下订单、发票创建、付款处理、订单履行、航运等。但是,当用户下单后,立即跳转到“感谢那您的订单” 页面。不仅如此,...

postdep
2015/08/24
0
0
ActiveMQ队列消息积压问题调研

摘要 公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。 调研提出了三种方案。这里是相关记录和说明。 问题 运维同事对生产环境使用的ActiveM...

winters1224
06/26
0
0
#研发中间件介绍#异步消息可靠推送Notify

电商系统为什么需要 NotifyServer? 如子柳所说,电商系统『需要两种中间件系统,一种是实时调用的中间件(淘宝的HSF,高性能服务框架)、一种是异步消息通知的中间件(淘宝的Notify)』。那...

旁观者-郑昀
2014/12/16
0
3
ActiveMQ发消息和收消息

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE ...

cookqq
2013/03/05
0
0
ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现

初次发博文,勿喷~~ 最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化; 真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,...

JingHaiChao
2012/05/14
0
7
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

垃圾回收算法

一 如何判断对象可以回收 1 引用计数法 思路大概为:给对象添加一个引用计数器,每当有一个地方引用它时,计数器值加1;当引用失效时,计数器减1;任何时刻计算器为0的对象就是不可能再被使用...

sen_ye
7分钟前
0
0
Activiti简介(学习总结一)

一、介绍 activiti是使用命令模式设计基于bpmn2.0的一款开源工作流引擎。 工作流简单举例:提交请假申请->经理审批->结束。这就是一个简单流程。activiti支持用户自定义流程。配置各个流程对...

沙shasha
7分钟前
0
0
VCL界面控件DevExpress VCL Controls发布v18.1.3|附下载

DevExpress VCL Controls是 Devexpress公司旗下最老牌的用户界面套包。所包含的控件有:数据录入,图表,数据分析,导航,布局,网格,日程管理,样式,打印和工作流等,让您快速开发出完美、...

Miss_Hello_World
8分钟前
0
0
加米谷大数据培训:云计算、大数据和人工智能之间的关系

一般谈云计算的时候会提到大数据、谈人工智能的时候会提大数据、谈人工智能的时候会提云计算……感觉三者之间相辅相成又不可分割。 一、云计算最初的目标 云计算最初的目标是对资源的管理,管...

加米谷大数据
13分钟前
1
0
java集合元素的默认大小

当底层实现涉及到扩容时,容器或重新分配一段更大的连续内存(如果是离散分配则不需要重新分配,离散分配都是插入新元素时动态分配内存),要将容器原来的数据全部复制到新的内存上,这无疑使...

竹叶青出于蓝
15分钟前
1
0
Java快速开发平台,JEECG 3.7.7闪电版本发布,增加多套主流UI代码生成器模板

JEECG 3.7.7 闪电版本发布,提供5套主流UI代码生成器模板 导读 ⊙平台性能优化,速度闪电般提升 ⊙提供5套新的主流UI代码生成器模板(Bootstrap表单+BootstrapTable列表\ ElementUI列表表单)...

Jeecg
19分钟前
0
0
export 和 module.export 的区别

在浏览器端 js 里面,为了解决各模块变量冲突等问题,往往借助于 js 的闭包把左右模块相关的代码都包装在一个匿名函数里。而 Nodejs 编写模块相当的自由,开发者只需要关注 require,exports,...

孟飞阳
21分钟前
1
0
技术教育的兴起

技术教育的兴起 作者: 阮一峰 1、 有一年,我在台湾环岛旅行。 花莲的海边,我遇到一对台湾青年夫妻,带着女儿在海滩上玩。我们聊了起来。 当时,我还在高校当老师。他们问我,是否觉得台湾...

吕伯文
21分钟前
0
0
Linux服务器下的HTTP抓包分析

说到抓包分析,最简单的办法莫过于在客户端直接安装一个Wireshark或者Fiddler了,但是有时候由于客户端开发人员(可能是第三方)知识欠缺或者其它一些原因,无法顺利的在客户端进行抓包分析,...

mylxsw
26分钟前
0
0
mybatis3-javaapi

sqlSessionFactoryBuilder->sqlSessionFactory->sqlSession<-rowbound<-resultHandler myBatis uses a Java enumeration wrapper for transaction isolation levels, called TransactionIsol......

writeademo
29分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部