文档章节

ActiveMQ入门教程(三) - ActiveMQ P2P版的HelloWorld

chaun
 chaun
发布于 2015/04/21 18:28
字数 2429
阅读 139
收藏 2

在这篇博客,我们来写一个ActiveMQ版的HelloWorld。

其实,要想写程序的话,还是要先了解一下JMS规范里的东西。

可以参考这里提供的下载文件:http://blog.csdn.net/yuguiyang1990/article/details/12084929

这两篇转载的博客:

消息中间件原理及JMS简介之一

消息中间件原理及JMS简介之二


1.预备知识    (ps:摘自 维基百科)

1.1JMS消息模型:

1)点对点(队列)模型

Point to Point

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将

消息发送到消费者的队列。这种模式被概括为:

  • 只有一个消费者将获得消息

  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。

  • 每一个成功处理的消息都由接收者签收

2)发布/订阅模型

Publisher/Subscriber Model

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和

订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:

  • 多个消费者可以获得消息

  • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

1.2 JMS应用程序接口

ConnectionFactory:

用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员

在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

Connection:

连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允

许用户创建会话,以发送和接收队列和主题到目标。

Session:

表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会

话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以

使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

Destination:

目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然

后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

MessageConsumer:

由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

MessageProducer:

由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

Message:

是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

消息头(必须):包含用于识别和为消息寻找路由的操作设置。

一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。


消息接口非常灵活,并提供了许多方式来定制消息的内容。

2.Hello World

2.0 基本配置

使用Maven,pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.ygy</groupId>
	<artifactId>activemq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>activemq</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>

		<!-- activemq,学习中 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.5.6</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.5.6</version>
		</dependency>


	</dependencies>
</project>

这里只要引入ActiveMQ的依赖就可以了。

2.1 P2P版的HelloWorld

生产者:HelloQueueProducer

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 最简单的生产者
 * 
 * @author yuguiyang
 * 
 */
public class HelloQueueProducer {
	public static void main(String[] args) {
		// 生产者的主要流程
		Connection connection = null;

		try {
			// 1.初始化connection工厂,使用默认的URL
			//failover://tcp://localhost:61616
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

			// 2.创建Connection
			connection = connectionFactory.createConnection();

			// 3.打开连接
			connection.start();

			// 4.创建Session,(是否支持事务)
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 5.创建消息目标
			Destination destination = session.createQueue("queue_lesson");

			//6.创建生产者
			MessageProducer producer = session.createProducer(destination);
			
			//7.配置消息是否持久化
			/*	DeliverMode有2种方式:
			 * 
			 	public interface DeliveryMode {
				    static final int NON_PERSISTENT = 1;//不持久化:服务器重启之后,消息销毁
				
				    static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在
				}
			 */
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			
			//8.初始化要发送的消息
			TextMessage message = session.createTextMessage("Hello World ! from yuguiyang");
			
			//9.发送消息
			producer.send(message);

		} catch (JMSException e) {
			e.printStackTrace();
		} finally{
			try {
				//10.关闭连接
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}

消费者:HelloQueueConsumer

package org.ygy.mq.lesson01;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单的消费者
 * 
 * @author yuguiyang
 * 
 */
public class HelloQueueConsumer implements MessageListener {

	@Override
	public void onMessage(Message message) {
		//如果消息是TextMessage
		if (message instanceof TextMessage) {
			//强制转换一下
			TextMessage txtMsg = (TextMessage) message;
			try {
				//输出接收到的消息
				System.out.println("HaHa: I'v got " + txtMsg.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	public void receive() {
		// 消费者的主要流程
		Connection connection = null;

		try {
			// 1.初始化connection工厂
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

			// 2.创建Connection
			connection = connectionFactory.createConnection();

			// 3.打开连接
			connection.start();

			// 4.创建session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 5.创建消息目标
			Destination destination = session.createQueue("queue_lesson");

			//6.创建消费者
			MessageConsumer consumer = session.createConsumer(destination);
			
			//7.配置监听
			consumer.setMessageListener(this);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		new HelloQueueConsumer().receive();
	}

}

3.测试

代码写好了,我们测试一下

3.1 启动ActiveMQ服务器

上一篇博客中,说过,进入到bin目录下,双击 activemq.bat,启动

启动后,访问 http://localhost:8161/admin/

可能会让你输入用户名和密码 ,这里默认的用户名:admin;密码:admin

然后,我们单击那个 Queues菜单:

这里默认应该什么都没有,有的话,也没事

3.2运行程序

我们先运行生产者,运行完之后,刷新一下,上面的界面:

可以看到上面的记录

这里显示的是服务器上的队列,

Name:就是队列的名字啦,其中 queue_lesson就是我们程序中新建队列

Number Of Pending Messages:是等待消费的消息,因为我们只运行了生产者,而且只产生了一条消息,因此队列中有一条未消费的消息。

Number Of Consumers:当前运行着的消费者,我们还没有

Messages Enqueued :进入队列的消息,我们只产生了一次,也只有一条消息

Message Dequeued:出了队列的消息,指被消费的消息

Views:查看当前队列的一些信息

Operations:对当前队列的一些操作

在这里,我们单击Browse连接:

在这里,我们能看到当前队列中的消息

Message ID:应该是自动生成的,还不了解

Correlation ID:这个以后再研究,他主要是用来关联多个Message,例如需要回复一个消息时,通常把回复的消息的JMSCorrelationID设置为原来消息的ID

Persistence:是否持久化,我们在代码里,没有设置持久化

Priority:权重,默认应该为4

Redelivered:消息是否被重发

Reply To:回复,以后会说到

TimeStamp:消息的时间戳

Type:消息类型

Operations:操作


下面,我们点击Message ID 连接,进入到消息的详细界面:

这里,可以看到,消息的内容和消息头信息


好了,到这里,我们就可以运行消费者了,先回到最开始的界面:

运行消费者,之后,控制台输出:

我们接受到了消息。

刷新界面:

可以看到,这里的内容变了,

因为消息被我们消费了,所以被消费消息加1,而且,当前消费者还在运行,所以有一个消费者。


4.总结

到这里,我们P2P版的 HelloWorld就完成了,个人感觉,这个JMS用起来,就好像和Socket差不多,这个消费的例子和用Socket写的聊天程序差不多。



本文转载自:http://blog.csdn.net/yuguiyang1990/article/details/12090505

chaun
粉丝 92
博文 269
码字总数 91059
作品 0
深圳
高级程序员
私信 提问
.Net平台下ActiveMQ入门实例

1.ActiveMQ简介 先分析这么一个场景:当我们在网站上购物时,必须经过,下订单、发票创建、付款处理、订单履行、航运等。但是,当用户下单后,立即跳转到“感谢那您的订单” 页面。不仅如此,...

postdep
2015/08/24
71
0
跟我学习dubbo-ActiveMQ的安装-单节点与使用(9)

ActiveMQ 的安装与使用(单节点) 1、 安装 JDK 并配置环境变量 JAVA_HOME=/usr/local/java/jdk1.7.0_72 2、 下载 Linux 版的 ActiveMQ(当前最新版 apache-activemq-5.11.1-bin.tar.gz) $ ...

HI曲奇饼干
2016/01/19
238
0
SpringBoot整合ActiveMq要分以下几个步骤:

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013115157/article/details/79413429 第一步:从ActiveMq官方上下载ActiveMq服务 http://activemq.apache.o...

MorganLai
2018/03/01
0
0
ActiveMQ入门以及整合spring boot

一、 ActiveMQ 消息队列的使用 MQ MQ:Message Queue 消息队列,就是用来在系统之间进行消息传递的 这个队列有一系列具体的实现技术:ActiveMQ、rabbitMQ、kafka、RocketMQ(alibaba) HttpCil...

Armymans
2018/11/10
0
0
ActiveMQ+ZooKeeper 伪集群整合

原理简介: 一般在部署ActiveMQ集群的时候,更倾向于使用基于ZooKeeper的Replicated LevelDB Store方式,该方式是Master Slave部署方案的其中一种策略,也是在多台主机实现ActiveMQ集群的主流...

watermelon11
02/18
22
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
昨天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
昨天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部