文档章节

activemq发布订阅

Zero零_度
 Zero零_度
发布于 2016/03/08 17:37
字数 588
阅读 39
收藏 4
点赞 1
评论 0

关键代码,创建topic

Destination destination = session.createTopic("topic1");

发布者:

package com.sniper.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布者
 * @author audaque
 *
 */
public class Sender {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session,没有事务
            // 消息的目的地
            Destination destination = session.createTopic("topic1");
            // 创建消息生产者
            MessageProducer messageProducer = session.createProducer(destination); 
            //设置消息不做持久化
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            // 发送消息
            for(int i=0; i<2; i++){
                TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i);
                System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
                messageProducer.send(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

订阅者1:

package com.sniper.jms.topic;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 订阅者1
 * @author audaque
 *
 */
public class Receiver1 {
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            //自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            
            Destination destination = session.createTopic("topic1");
            MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if(message != null){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.err.println("收到的消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            try {
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

订阅者2:

package com.sniper.jms.topic;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 订阅者2
 * @author audaque
 *
 */
public class Receiver2 {
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            //自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            
            Destination destination = session.createTopic("topic1");
            MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if(message != null){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.err.println("Receiver2收到的消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            try {
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}


© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 67
博文 1245
码字总数 252866
作品 0
程序员
Linux 安装ActiveMQ(使用Mac远程访问)

阅读本文需要安装JDK 一 ActiveMQ简介 activemq是用java语言编写的一款开源消息总线 activemq是apache出品 activemq消息的传递有两种类型 一种是点对点(即一个生产者和一个消费者一一对应) 另...

梦三
07/15
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的...

afreon
04/22
0
0
activeMQ入门(发布订阅消息)

发送主题(topic)类 package com.jason.testmq; import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jm......

z_jordon
2015/05/26
0
2
SpringBoot JMS(ActiveMQ) 使用实践

ActiveMQ 1. 下载windows办的activeMQ后,在以下目录可以启动: 2. 启动后会有以下提示 3. 所以我们可以通过http://localhost:8161访问管理页面,通过tcp://localhost:61616来连接消息服务器...

yawnSilence
2017/10/28
0
12
2.ActiveMQ消息队列安装使用

全程是MOM (Message Oriented Middleware) 消息中间件 消息中间件有很多,比如: 1.ActiveMQ java语言编写的和java系统结合紧密 2.RabbitMQ Erlong语言开发的,天生支持高并发,性能优于A...

小杰java
2017/10/26
0
0
ActiveMQ在C#中的应用

ActiveMQ是个好东东,不必多说。ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等。由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提...

postdep
2015/08/24
0
0
ActiveMQ集群方案(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51124749 目...

yunlielai
04/15
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
04/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

实现异步有哪些方法

有哪些方法可以实现异步呢? 方式一:java 线程池 示例: @Test public final void test_ThreadPool() throws InterruptedException { ScheduledThreadPoolExecutor scheduledThre......

黄威
今天
0
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

六库科技
今天
0
0
牛客网刷题

1. 二维数组中的查找(难度:易) 题目描述 在一个二维数组中(每个一维数组的长度相同),每一行都按照从左到右递增的顺序排序,每一列都按照从上到下递增的顺序排序。请完成一个函数,输入...

大不了敲一辈子代码
今天
0
0
linux系统的任务计划、服务管理

linux任务计划cron 在linux下,有时候要在我们不在的时候执行一项命令,或启动一个脚本,可以使用任务计划cron功能。 任务计划要用crontab命令完成 选项: -u 指定某个用户,不加-u表示当前用...

黄昏残影
昨天
0
0
设计模式:单例模式

单例模式的定义是确保某个类在任何情况下都只有一个实例,并且需要提供一个全局的访问点供调用者访问该实例的一种模式。 实现以上模式基于以下必须遵守的两点: 1.构造方法私有化 2.提供一个...

人觉非常君
昨天
0
0
《Linux Perf Master》Edition 0.4 发布

在线阅读:https://riboseyim.gitbook.io/perf 在线阅读:https://www.gitbook.com/book/riboseyim/linux-perf-master/details 百度网盘【pdf、mobi、ePub】:https://pan.baidu.com/s/1C20T......

RiboseYim
昨天
1
0
conda 换源

https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/conda config --add channels https://mir......

阿豪boy
昨天
1
0
Confluence 6 安装补丁类文件

Atlassian 支持或者 Atlassian 缺陷修复小组可能针对有一些关键问题会提供补丁来解决这些问题,但是这些问题还没有放到下一个更新版本中。这些问题将会使用 Class 类文件同时在官方 Jira bug...

honeymose
昨天
0
0
非常实用的IDEA插件之总结

1、Alibaba Java Coding Guidelines 经过247天的持续研发,阿里巴巴于10月14日在杭州云栖大会上,正式发布众所期待的《阿里巴巴Java开发规约》扫描插件!该插件由阿里巴巴P3C项目组研发。P3C...

Gibbons
昨天
1
0
Tomcat介绍,安装jdk,安装tomcat,配置Tomcat监听80端口

Tomcat介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 java程序写的网站用tomcat+jdk来运行...

TaoXu
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部