文档章节

ActiveMQ发消息和收消息

cookqq
 cookqq
发布于 2013/03/05 10:44
字数 1795
阅读 5483
收藏 11
点赞 0
评论 0

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。下面详细的解释常用类的作用

ConnectionFactory 接口(连接工厂) 用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接) 连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标) 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer 接口(消息消费者) 由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者) 由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息) 是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分: 消息头(必须):包含用于识别和为消息寻找路由的操作设置。 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 消息接口非常灵活,并提供了许多方式来定制消息的内容。
Session 接口(会话) 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

JMS的消息模式有1.点对点的消息模式(Point to Point Messaging)

2.发布订阅模式(publish – subscribe Mode)

这里基于点对点的消息模式进行ActiveMQ发消息和收消息过程的分析,请看模型图:

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder(客户端A) 发送消息,receive(客户端B)接收消息。具体点就是客户端A发送Message Queue ,而 客户端B从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在 任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行 。

请看下面发消息和收消息的例子

package com.activemq.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class ActiveMqTest {
 
 private static String queueName = "activemq_queue_";


 public static void main(String[] args) {
 Receiver receiver=new Receiver();
 Sender sender =new Sender();
 try {
 sender.send();
 receiver.receive();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }


 static class Receiver {
 public static void receive() throws Exception {
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 Connection connection = connectionFactory.createConnection();
 connection.start();
 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);
 MessageConsumer consumer = session.createConsumer(destination);
 //第一种情况
 int i = 0;
 while (i < 3) {
 i++;
 TextMessage message = (TextMessage) consumer.receive();
 session.commit();
 // TODO something....
 System.out
 .println("收到消息:" +message.getText());
 }
 session.close();
 connection.close();
 //----------------第一种情况结束----------------------
 //第二种方式
//			consumer.setMessageListener(new MessageListener() {
//				public void onMessage(Message arg0) {
//					if(arg0 instanceof TextMessage){
//						try {
//							System.out.println("arg0="+((TextMessage)arg0).getText());
//						} catch (JMSException e) {
//							e.printStackTrace();
//						}
//					}
//				}
//			});
 //第三种情况
//			 while (true) {
//            Message msg = consumer.receive(1000);
//            TextMessage message = (TextMessage) msg;
//            if (null != message) { 
//           	 System.out.println("收到消息:" + message.getText());
//            } 
//        }
 }
 }


 static class Sender {
 public static void send() throws Exception {
 ConnectionFactory connectionFactory = null;
 connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, //null
                ActiveMQConnection.DEFAULT_PASSWORD, //null
                "tcp://localhost:61616");


 Connection connection = connectionFactory.createConnection();
 connection.start();


 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);


 MessageProducer producer = session.createProducer(destination);
 for (int i = 0; i < 3; i++) {
 TextMessage message = session.createTextMessage("count"+new Date().getTime());
 Thread.sleep(1000);
 // 通过消息生产者发出消息
 System.out.println("发送消息"+i+new Date());
 producer.send(message);
 }
 session.commit();
 session.close();
 connection.close();
 }
 }
}

Sender主要的作用是发送消息,Receiver主要的作用是接受消息,并且显示一下接收消息的内容,这里详细的解释接受消息的方法:

(1)第一种方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者会一直等待下去,直到有消息到达,或者超时。

其实第一种方法和第三种方法接受原理一样,区别是第一种知道要接受消息的条数,接受完消息,手动关系连接。而第三种不知道要接受多少条数据,所以使用while (true) 死循环直接在接受消息

(2)第二种方法:消息消费者注册一个MessageListener当有消息到达的时候,会回调它的onMessage()方法。

这里需要注意的是,你注册完成MessageListener,千万不要关闭连接session.close();和connection.close();因为你刚刚注册完成监听器,就把连接关闭,就不会受到消息,所以监听器中也不会有处理。(这个问题可把我整哭了,搞了半天,才弄明白)

请看ActiveMQ 页面上显示队列的信息

name是队列名称

Number Of Pending Messages  是队列中有多少个消息等待出队列

Number Of Consumers  是队列中有多少个消费者

Messages Enqueued  队列共有多少个信息

Messages Dequeued  是队列中已经出列多少个消息

开发中遇到的异常:

(1)javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect

拒绝连接,原因是activemq服务器没有开启。

解决办方法:开启activemq服务器,请参照《activemq跑起来



© 著作权归作者所有

共有 人打赏支持
cookqq

cookqq

粉丝 116
博文 268
码字总数 156096
作品 0
海淀
技术主管
ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
0
1
ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现

初次发博文,勿喷~~ 最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化; 真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,...

JingHaiChao
2012/05/14
0
7
.Net平台下ActiveMQ入门实例

1.ActiveMQ简介 先分析这么一个场景:当我们在网站上购物时,必须经过,下订单、发票创建、付款处理、订单履行、航运等。但是,当用户下单后,立即跳转到“感谢那您的订单” 页面。不仅如此,...

postdep
2015/08/24
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0
ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息...

cookqq
2013/03/04
0
0
Linux 安装ActiveMQ(使用Mac远程访问)

阅读本文需要安装JDK 一 ActiveMQ简介 activemq是用java语言编写的一款开源消息总线 activemq是apache出品 activemq消息的传递有两种类型 一种是点对点(即一个生产者和一个消费者一一对应) 另...

梦三
前天
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
activeMQ5官方文档翻译-运行消息中间件

把activeMQ中间件运行起来 注:如果你想以内嵌的方式来使用消息中间件的话你可以参照怎么在一个连接中间嵌入消息中间件 以二进制文件的方式分发的ActiveMQ自带了一个'activemq'的脚本文件来运...

z_jordon
2015/05/31
0
0
springJMS+activeMQ实践

运行环境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3. 一定要注意activeMQ的版本与jdk的兼容性,最新的activeMQ版本估计要在jdk1.7以上才能运行。 先说一下activeMQ的安装: 1、下载:h...

wangrikui
2015/06/28
0
2
ActiveMQ初探(2)——与Spring整合

在上一章:ActiveMQ(1)——介绍与基本使用中,我们通过测试方法熟悉了ActiveMQ的流程,本章将ActiveMQ与Spring进行整合。 与Spring整合,除了原本的外,还需导入和包,如果Spring为5.0+,需...

yuanlaijike
05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

spring-@RequestBody

@RequestMapping("/login")    public void login(@RequestBody String userName,@RequestBody String pwd){      System.out.println(userName+" :"+pwd);    }    ......

说回答
6分钟前
0
0
Redis安装

大家可以通过该链接获取安装详情(这是一个Word文档,支持下载): http://note.youdao.com/noteshare?id=7a327ed6c58fb2037ba537e58ecf7510&sub=480DB8EF349747C3983B73AE94D45BB1 其他参考...

一梦心草
6分钟前
0
0
MySQL按天,按周,按月,按时间段统计【转载】

https://blog.csdn.net/qq_28056641/article/details/78306870 select DATE_FORMAT(create_time,'%Y%m%d') days,count(caseid) count from tc_case group by days; select DATE_FORMAT(creat......

李道福
8分钟前
0
0
浅谈parallelStream

parallelStream是什么,它是一个集合的并发处理流.其作用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度. 比如说这样一段代码 private Set<SysRole> sysRoles;private Set<St...

算法之名
10分钟前
3
0
器者,道之所载

形而上者谓之道,形而下者谓之器,化而裁之谓之变;推而行之谓之通,举而措之天下之民,谓之事业。—— 《道德经》

了凡川
12分钟前
0
0
C#命名规范中文版/C#编码规范中文版

最新文档地址https://github.com/hiramtan/CSharpNamingGuidelines_Chinese C#命名规范中文版/C#编码规范中文版 示例 /*****************************************************************......

海贝Hibey
13分钟前
0
0
刚从eclipse转到Intellij IDEA,分享一些配置经验

刚从eclipse转到Intellij IDEA,分享一些配置经验,IntelliJ IDEA作为最好的Java开发工具,在智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查、 创新的GUI设计等...

舒文joven
14分钟前
1
0
lombok 引入后,测试类始终找不到get,set方法。

开发环境为idea,jdk1.7,maven3.5. 网上直接搜出来的方法有: 1、在setting里安装lombok的plugins; 2、如下图,勾选enable annocation processing选项 3、升级maven plugins插件 我尝试了以...

Kidult
20分钟前
0
0
Duang,HUAWEI DevEco IDE全面升级啦

想感受全新UI带来的视觉及交互体验、 HiKey970开发板调测、 HiAI API推荐和收藏、 深度AI模型分析等新功能, 体验高清晰度和流畅度的远程AI真机调测吗? 全新的UI设计 采用最优秀的视觉及交互...

华为终端开放实验室
29分钟前
1
0
阻止事件冒泡,阻止默认事件

1.event.stopPropagation()方法 这是阻止事件的冒泡方法,不让事件向documen上蔓延,但是默认事件任然会执行,当你掉用这个方法的时候,如果点击一个连接,这个连接仍然会被打开, 2.event....

闫亚亚
31分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部