文档章节

【java-activemq】 activemq发布订阅模式

_______-
 _______-
发布于 2017/09/06 16:14
字数 638
阅读 11
收藏 0
点赞 0
评论 0

Destination destination = session.createTopic("xxxx");

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicPublisher {
	private static String user = ActiveMQConnection.DEFAULT_USER;  
	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
	private static String url =  "tcp://localhost:61616";  

	public static void main(String[] args)throws Exception {  
		// ConnectionFactory :连接工厂,JMS 用它创建连接  
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
		// Connection :JMS 客户端到JMS Provider 的连接  
		Connection connection = connectionFactory.createConnection();  
		// Connection 启动  
		connection.start();  
		System.out.println("Connection is start...");  
		// Session: 一个发送或接收消息的线程  
		Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
		// Topicr :消息的目的地;消息发送给谁.  
		Topic  topic = session.createTopic("example.A");  
		// MessageProducer:消息发送者  
		MessageProducer producer = session.createProducer(topic);  
		// 设置不持久化,此处学习,实际根据项目决定  
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
		// 构造消息,此处写死,项目就是参数,或者方法获取  
		sendMessage(session, producer);  
		session.commit();  

		connection.close();  
		System.out.println("send text ok.");  
	}  

	public static void sendMessage(Session session, MessageProducer producer)  throws Exception {  
		for (int i = 1; i <= 100; i++) {//有限制,达到1000就不行  
			TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);  
			// 发送消息到目的地方  
			System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  
			producer.send(message);  
		}  
	}  
}


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber{
	private static String user = ActiveMQConnection.DEFAULT_USER;  
	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
	private static String url = "tcp://localhost:61616";  
	private static boolean flag=true;

	public static void main(String[] args) throws Exception{  
		// ConnectionFactory :连接工厂,JMS 用它创建连接  
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
		// Connection :JMS 客户端到JMS Provider 的连接  
		Connection connection = connectionFactory.createConnection();  
		connection.start();  
		// Session: 一个发送或接收消息的线程  
		final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
		// Destination :消息的目的地;消息发送给谁.  
		Topic topic=session.createTopic("example.B");  
		// 消费者,消息接收者  
		MessageConsumer consumer = session.createConsumer(topic);  
		consumer.setMessageListener(new MessageListener(){//有事务限制  
			@Override  
			public void onMessage(Message message) {  
				try {  
					TextMessage textMessage=(TextMessage)message;  
					System.out.println(textMessage.getText());  
				} catch (JMSException e1) {  
					e1.printStackTrace();  
				}  
				try {  
					session.commit();  
				} catch (JMSException e) {  
					e.printStackTrace();  
				}  
			}  
		});  

		//		  另外一种接受方式 
		//		while(flag){  
		//			TextMessage message = (TextMessage)consumer.receive(1);  
		//			if(message != null){  
		//				if("stop".equals( message.getText())){  
		//					flag = false;  
		//				}  
		//			}  
		//		}  
	}
}


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicSubscriber2{
	private static String user = ActiveMQConnection.DEFAULT_USER;  
	private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
	private static String url = "tcp://localhost:61616";  
	private static boolean flag=true;

	public static void main(String[] args) throws Exception{  
		// ConnectionFactory :连接工厂,JMS 用它创建连接  
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
		// Connection :JMS 客户端到JMS Provider 的连接  
		Connection connection = connectionFactory.createConnection();  
		connection.start();  
		// Session: 一个发送或接收消息的线程  
		final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
		// Destination :消息的目的地;消息发送给谁.  
		Topic topic=session.createTopic("example.A");  
		// 消费者,消息接收者  
		MessageConsumer consumer = session.createConsumer(topic);  
		consumer.setMessageListener(new MessageListener(){//有事务限制  
			@Override  
			public void onMessage(Message message) {  
				try {  
					TextMessage textMessage=(TextMessage)message;  
					System.out.println(textMessage.getText());  
				} catch (JMSException e1) {  
					e1.printStackTrace();  
				}  
				try {  
					session.commit();  
				} catch (JMSException e) {  
					e.printStackTrace();  
				}  
			}  
		});  

		//		  另外一种接受方式 
		//		while(flag){  
		//			TextMessage message = (TextMessage)consumer.receive(1);  
		//			if(message != null){  
		//				if("stop".equals( message.getText())){  
		//					flag = false;  
		//				}  
		//			}  
		//		}  
	}
}

© 著作权归作者所有

共有 人打赏支持
_______-
粉丝 3
博文 92
码字总数 36164
作品 0
浦东
程序员
Linux 安装ActiveMQ(使用Mac远程访问)

阅读本文需要安装JDK 一 ActiveMQ简介 activemq是用java语言编写的一款开源消息总线 activemq是apache出品 activemq消息的传递有两种类型 一种是点对点(即一个生产者和一个消费者一一对应) 另...

梦三
前天
0
0
2.ActiveMQ消息队列安装使用

全程是MOM (Message Oriented Middleware) 消息中间件 消息中间件有很多,比如: 1.ActiveMQ java语言编写的和java系统结合紧密 2.RabbitMQ Erlong语言开发的,天生支持高并发,性能优于A...

小杰java
2017/10/26
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
消息队列篇—详谈ActiveMQ消息队列模式的分析及使用

消息队列(Message Queue)是分布式系统中重要的组件,通用使用场景可以简单地描述为当不需要立即获得结果,但是并发量需控制时就需要使用消息队列。消息列队有两种消息模式,一种是点对点的...

afreon
04/22
0
0
springJMS+activeMQ实践

运行环境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3. 一定要注意activeMQ的版本与jdk的兼容性,最新的activeMQ版本估计要在jdk1.7以上才能运行。 先说一下activeMQ的安装: 1、下载:h...

wangrikui
2015/06/28
0
2
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
06/26
0
0
ActiveMQ 5.15.x Release安装和配置--Linux篇

阅读目录: 1. 关闭防火墙和Selinux 2. 下载并安装ActiveMQ 5.15.x Release版本 3. 启动并验证 4.配置ActiveMQ 5.15.x Release自启动 5.注意事项以及说明 1. 关闭防火墙和Selinux Linux的防火...

loubobooo
2017/11/26
0
0
ActiveMQ学习记录 之 消息持久化

1:前言 这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。 A:持久化为文件 这...

李格尔楞
2017/11/01
0
0
activeMQ5官方文档翻译-初始化配置

首先你需要把jar包加到classpath 所需的jar包 为了使ActiveMQ更容易使用,默认的activemq-all.jar包包含了所有需要用到的库文件。如果你喜欢以明确的控制jar包的方式来使用ActiveMQ,那下面是...

z_jordon
2015/05/29
0
0
ActiveMQ初探(1)——介绍与基本使用

一、ActiveMQ 1.1 什么是ActiveMQ 是Apache出品,最流行的,能力强劲的。ActiveMQ是一个完全支持和规范的 实现,尽管规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊...

yuanlaijike
04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

打印斐波那契数

package com.jerry.ch04;public class PrintFibonacci {public static void main(String[] args) {for (int i=0; i<10; i++) {System.out.print(fib(i) + " ");}......

JerryNing
10分钟前
0
0
shell编程

一、shell脚本介绍

人在艹木中
11分钟前
0
0
istio 0.8 遥测 案例

==============遥测===================================== 演示如何从网格中收集遥测信息。 分布式跟踪。如何配置代理以向Zipkin或Jaeger发送跟踪请求 收集度量标准和日志。此任务说明如何配...

xiaomin0322
13分钟前
0
0
ND4J求多元线性回归以及GPU和CPU计算性能对比

上一篇博客《梯度下降法求多元线性回归及Java实现》简单了介绍了梯度下降法,并用Java实现了一个梯度下降法求回归的例子。本篇博客,尝试用dl4j的张量运算库nd4j来实现梯度下降法求多元线性回...

冷血狂魔
14分钟前
0
0
springboot常用注解

@SpringBootApplication: 包含@Configuration、@EnableAutoConfiguration、@ComponentScan 通常用在主类上。 @Service: 用于标注业务层组件。 @RestController: 用于标注控制层组件(如strut...

GoldenVein
20分钟前
1
0
如何进行大数据的入门级学习?

不知道你是计算机专业应届生还是已经从业者。总之,有java基础的学生学习大数据会轻松很多,零基础的小白都需要从java和linux学起。 如果你是一个学习能力特别强,而且自律性也很强的人的话可...

董黎明
34分钟前
0
0
使用Parcelable传递复杂参数

最近做AIDL传递对象,对象必须实现Parcelable的方法才可以被传递。 @Override    public int describeContents() {//这个 默认返回0就行了。        return 0;    }    ...

火云
35分钟前
0
0
十大Intellij IDEA快捷键

Intellij IDEA中有很多快捷键让人爱不释手,stackoverflow上也有一些有趣的讨论。每个人都有自己的最爱,想排出个理想的榜单还真是困难。以前也整理过Intellij的快捷键,这次就按照我日常开发...

HJCui
45分钟前
0
0
word 使用mathtype 编写 数学公式

下载安装,这个链接命名。。。。 http://www.mathtype.cn/xiazai.html 安装之后会多出一个选项 使用内联方式插入图表 编写公式的界面 设置支持latex 语法 输入公式回车就可以看到结果...

阿豪boy
今天
0
0
Promise

定义 Promise是异步编程的一种解决方案,所谓Promise就是一个容器,里面保存着某个未来才会结束的事件(通常是一个一步操作)的结果。 特点: 2.1 对象的状态不受外界影响,三种状态pending...

litCabbage
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部