文档章节

ActiveMQ入门教程(四) - ActiveMQ Pub/Sub版的HelloWorld

chaun
 chaun
发布于 2015/04/21 18:28
字数 890
阅读 155
收藏 0

在上一篇中,我们说了P2P版的HelloWorld,在这一篇,我们简要说一下,基于发布,订阅模式的HelloWorld。

基础知识就不在介绍了,需要的会一点一点讲。

1. pom.xml

这个和上一篇是一样的:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.ygy</groupId>
	<artifactId>activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>activemq</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>

		<!-- activemq,学习中 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.5.6</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.5.6</version>
		</dependency>


	</dependencies>
</project>

2. Pub/Sub版的HelloWorld

生产者:

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;

public class HelloTopicProducer {

	public void send(String msg) {
		// 生产者的主要流程
		Connection connection = null;

		try {
			// 1.初始化connection工厂,使用默认的URL
			// failover://tcp://localhost:61616
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

			// 2.创建Connection
			connection = connectionFactory.createConnection();

			// 3.打开连接
			connection.start();

			// 4.创建Session,(是否支持事务)
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 5.创建消息目标
			Destination destination_send = session.createTopic(MQConstants.DESTINATION_SEND);

			// 6.创建生产者
			MessageProducer producer = session.createProducer(destination_send);

			// 7.配置消息是否持久化
			/*
			 * DeliverMode有2种方式:
			 * 
			 * public interface DeliveryMode { static final int NON_PERSISTENT =
			 * 1;//不持久化:服务器重启之后,消息销毁
			 * 
			 * static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在 }
			 */
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			// 8.初始化要发送的消息
			TextMessage message = session.createTextMessage(msg);

			// 9.发送消息
			producer.send(message);
			
			connection.close();

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new HelloTopicProducer().send("我来试一试发布/订阅...");
	}

}

消费者:

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.ygy.mq.constants.MQConstants;

public class HelloTopicConsumer implements MessageListener {

	@Override
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			TextMessage txtMsg = (TextMessage) message;

			try {
				System.out.println("哈,我接收到了消息:" + txtMsg.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}

		}
	}

	public void receive() {
		// 消费者的主要流程
		Connection connection = null;

		try {
			// 1.初始化connection工厂
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

			// 2.创建Connection
			connection = connectionFactory.createConnection();

			// 3.打开连接
			connection.start();

			// 4.创建session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 5.创建消息目标
			Destination destination = session.createTopic(MQConstants.DESTINATION_SEND);

			// 6.创建消费者
			MessageConsumer consumer = session.createConsumer(destination);

			// 7.配置监听
			consumer.setMessageListener(new HelloTopicConsumer());

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new HelloTopicConsumer().receive();
	}

}

3.测试

访问网页:http://localhost:8161/admin/topics.jsp

单击那个Topics连接。

这里显示的是服务器上的主题,这些显示的都没有用,可以都删掉。

Name:主题的名称

Number Of Consumers:正在运行的消费者

Message Enqueued:进入消息队列的

Message Dequeued:出消息队列的

Operations:操作

下面就可以开始运行程序了,

注意顺序:先运行消费者:

这里会产生好几个主题,我们只看我们自己用的那个,(其实,其他几个是干嘛的,暂时还不清楚,以后再研究吧.....)

我们的消费者一直在运行


接下来,运行生产者:

控制台会输出:

再一次,刷新界面:

消费者还在运行,只生产了一条消息,而且已经被消费了。


在这里遇到了一个问题,就是运行顺序的问题,

我们如果先运行生产者,再运行消费者,消费者是接收不到消息的,郁闷了好久

猜想,应该是对概念,规范的理解出了问题,就找了一下,发现了原因:

这是上一篇介绍的JMS消息模型,哎,委屈,对概念的理解不清晰。

至于,持久的订阅,会在以后的博客中分享,HelloWorld,就到此结束了。


本文转载自:http://blog.csdn.net/yuguiyang1990/article/details/12129405

chaun
粉丝 92
博文 269
码字总数 91059
作品 0
深圳
高级程序员
私信 提问
初识ActiveMQ消息中间件

ActiveMQ官方网站:https://activemq.apache.org/ 关于ActiveMQ消息传递的方式详见: https://segmentfault.com/a/1190000014958916 https://www.cnblogs.com/cyfonly/p/6380860.html 本篇博......

帅得拖网速
05/12
27
0
ActiveMQ入门以及整合spring boot

一、 ActiveMQ 消息队列的使用 MQ MQ:Message Queue 消息队列,就是用来在系统之间进行消息传递的 这个队列有一系列具体的实现技术:ActiveMQ、rabbitMQ、kafka、RocketMQ(alibaba) HttpCil...

Armymans
2018/11/10
0
0
SpringBoot整合ActiveMQ消息队列

首先要讲什么是ActiveMQ:AciveMQ是Apache出品的目前最流行,能力强劲的开源消息总线 主要功能: 1、 解决服务之间代码耦合 2、 使用消息队列,增加系统并发处理量 主要应用场景: 1、 当系统...

逆风局局长熊
2018/12/06
0
0
跟我学习dubbo-ActiveMQ的安装-单节点与使用(9)

ActiveMQ 的安装与使用(单节点) 1、 安装 JDK 并配置环境变量 JAVA_HOME=/usr/local/java/jdk1.7.0_72 2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz) $ ...

HI曲奇饼干
2016/01/19
238
0
SpringBoot整合ActiveMq要分以下几个步骤:

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013115157/article/details/79413429 第一步:从ActiveMq官方上下载ActiveMq服务 http://activemq.apache.o...

MorganLai
2018/03/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

分布式协调服务zookeeper

ps.本文为《从Paxos到Zookeeper 分布式一致性原理与实践》笔记之一 ZooKeeper ZooKeeper曾是Apache Hadoop的一个子项目,是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它...

ls_cherish
今天
4
0
redis 学习2

网站 启动 服务端 启动redis 服务端 在redis 安装目录下 src 里面 ./redis-server & 可以指定 配置文件或者端口 客户端 在 redis 的安装目录里面的 src 里面 ./redis-cli 可以指定 指定 连接...

之渊
昨天
2
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
昨天
4
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
昨天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
昨天
24
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部