文档章节

消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

afreon
 afreon
发布于 04/22 01:33
字数 1028
阅读 60
收藏 8

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的消息模式,另一种是订阅\发布的消息模式。

点对点的消息模式

点对点的模式主要建立在一个队列上,当连接一个列队时,发送方不需要知道接收方是否正在接收消息,可以直接向ActiveMQ发送消息,而发送的消息将直接进入队列中,如果接收方启动着监听,则会向接收方发送消息,若接收方没有接收到消息,则会保存在ActiveMQ服务器中,直到接收方接收消息为止。点对点的消息模式可以有多个接收方和发送方,但是一条消息只会被一个接收方接收到,先连上ActiveMQ接收方,则会先接收到消息,而之后的接收方则接收不到已被接收过的消息。

Java实现ActiveMQ点对点模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

点对点的发送方逻辑代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建
            Destination destination = session.createQueue("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

点对点的接收方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的消息模式

订阅/发布模式有多个接收方和发送方,但是接收方与发送方存在时间上的依赖,如果发送方发送消息时接收方没有监听消息,那么ActiveMQ将不会保存该消息,认为消息已经发送。这个模式还有一个特点就是发送方发送的消息会被所有的接收方接收到,与点对点模式恰恰相反。

Java实现ActiveMQ订阅/发布模式,使用ActiveMQ服务器版本为5.15.3,项目使用Maven构建,其中pom.xml增加ActiveMQ依赖jar配置如下:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>

订阅/发布的发送方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MQSender {
    
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    
    public static void main(String[] args) {
        MQSender send = new MQSender();
        send.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//连接名是"textMsg"的队列,此会话将会到该队列中,若 该队列不存在,则被创建
            Destination destination = session.createTopic("textMsg");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage textMsg = session.createTextMessage("消息内容");
            for(int i = 0 ; i < 10; i ++){
                producer.send(textMsg);
            }
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

订阅/发布的接收方代码

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPReceive {
    private String userName = "root";
    private String password = "123456";
    private String url = "tcp://127.0.0.1:61616";
    public static void main(String[] args) {
        PTPReceive receive = new PTPReceive();
        receive.start();
    }
    
    public void start(){
        try {
            ConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("textMsg");
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        String text = ((TextMessage)message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            consumer.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

本文转载自:https://blog.yoodb.com/yoodb/article/detail/1501

共有 人打赏支持
afreon
粉丝 20
博文 41
码字总数 5285
作品 0
海淀
架构师
ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息...

cookqq
2013/03/04
0
0
ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
0
1
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
04/26
0
0
.Net平台下ActiveMQ入门实例

1.ActiveMQ简介 先分析这么一个场景:当我们在网站上购物时,必须经过,下订单、发票创建、付款处理、订单履行、航运等。但是,当用户下单后,立即跳转到“感谢那您的订单” 页面。不仅如此,...

postdep
2015/08/24
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux安装MySQL的两种方法

1. 运行平台:CentOS 6.3 x86_64,基本等同于RHEL 6.3 2. 安装方法: 安装MySQL主要有两种方法:一种是通过源码自行编译安装,这种适合高级用户定制MySQL的特性,这里不做说明;另一种是通过...

onedotdot
22分钟前
4
0
phpize源码安装php扩展

4、进入源码中的ext/pcntl目录 ~# cd php-5.3.29/ext/pcntl/ 5、运行 phpize 命令 ~# phpizeConfiguring for:PHP Api Version: 20090626Zend Module Api No: 20090626......

bengozhong
31分钟前
2
0
Git 常用技巧

# Git 常用技巧 ## 暖场 - Git 怎么读 ? - Git 的作者是谁 ? - 谁没有 Github 账号 ? - 谁没有在 Github 提交过 issues,PR ? - 谁没有不会用命令操作 Git ? ## Git 简介 Git 是一种代码...

帝子兮
43分钟前
2
0
MySQL学习笔记

踩坑建议 对于时间相关字段,为插入及显示毫秒数据,建议使用datetime(6)类型,并设置数据库客户端显示毫秒相关数据

OSC_fly
43分钟前
0
0
spring配置文件中xsd引用异常

异常: org.xml.sax.SAXParseException; lineNumber: 78; columnNumber: 69; schema_reference.4: 无法读取方案文档 'http://www.springframework.org/schema/tx/spring-tx-3.2.xsd', 原因为......

zaolonglei
46分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部