文档章节

activemq发布订阅

Zero零_度
 Zero零_度
发布于 2016/03/08 17:37
字数 588
阅读 39
收藏 4

关键代码,创建topic

Destination destination = session.createTopic("topic1");

发布者:

package com.sniper.jms.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布者
 * @author audaque
 *
 */
public class Sender {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            
            connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session,没有事务
            // 消息的目的地
            Destination destination = session.createTopic("topic1");
            // 创建消息生产者
            MessageProducer messageProducer = session.createProducer(destination); 
            //设置消息不做持久化
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            // 发送消息
            for(int i=0; i<2; i++){
                TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i);
                System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
                messageProducer.send(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

订阅者1:

package com.sniper.jms.topic;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 订阅者1
 * @author audaque
 *
 */
public class Receiver1 {
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            //自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            
            Destination destination = session.createTopic("topic1");
            MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if(message != null){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.err.println("收到的消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            try {
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

订阅者2:

package com.sniper.jms.topic;

import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 订阅者2
 * @author audaque
 *
 */
public class Receiver2 {
    
    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 连接工厂
        Connection connection = null; // 连接
        Session session = null;
        
        try {
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory("sniper", "sniper", "tcp://sniper0:61616");
            connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
            connection.start(); // 启动连接
            
            //自动签收,就是客户端接收到消息之后,会自动给服务端发送消息表示消息已经签收
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
            
            Destination destination = session.createTopic("topic1");
            MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息消费者
            
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if(message != null){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.err.println("Receiver2收到的消息:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            try {
                TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}


© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 67
博文 1245
码字总数 252866
作品 0
程序员
Linux 安装ActiveMQ(使用Mac远程访问)

阅读本文需要安装JDK 一 ActiveMQ简介 activemq是用java语言编写的一款开源消息总线 activemq是apache出品 activemq消息的传递有两种类型 一种是点对点(即一个生产者和一个消费者一一对应) 另...

梦三
07/15
0
0
ActiveMQ专题2: 持久化

AMQ的持久化问题 前言 前面一篇AMQ专题中,我们发现对于Topic这种类型的消息,即使将deliveryMode设置为持久化,只要生产者在消费者之前启动。消息生产者发布的消息还是会丢失。这是符合JMS...

槟城码农
08/30
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的...

afreon
04/22
0
0
activeMQ入门(发布订阅消息)

发送主题(topic)类 package com.jason.testmq; import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jm......

z_jordon
2015/05/26
0
2

没有更多内容

加载失败,请刷新页面

加载更多

Bash各类扩展详解

Bash各类扩展详解 Bash中主要包括大括号扩展、波浪号扩展、变量扩展、子命令扩展、文件名扩展和算数扩展。这些扩展组合在一起为Bash带来了极大的易用性。掌握这些扩展的用法和功能,能够为B...

小陶小陶
40分钟前
1
0
EventBus原理深度解析

一、问题描述 在工作中,经常会遇见使用异步的方式来发送事件,或者触发另外一个动作:经常用到的框架是MQ(分布式方式通知)。如果是同一个jvm里面通知的话,就可以使用EventBus。由于Event...

yangjianzhou
今天
5
0
OpenCV图像处理实例:libuv+cvui显示摄像头视频

#include <iostream>#include <opencv2/opencv.hpp>#define CVUI_IMPLEMENTATION#include <cvui.h>extern "C"{#include <uv.h>}using namespace std;#define WINDOW_NAM......

IOTService
今天
1
0
openJDK之JDK9的String

1.openJDK8的String 先来看下openJDK8的String的底层,如下图1.1所示: 图1.1 底层上使用的是char[],即char数组 每个char占16个bit,Character.SIZE的值是16。 2.openJDK9中的String 图2.1...

克虏伯
今天
1
0
UEFI 模式下如何安装 Ubuntu 16.04

作者:知乎用户 链接:https://www.zhihu.com/question/52092661/answer/259583475 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 针对UEFI模式下安装U...

寻知者
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部