文档章节

ActiveMQ发消息和收消息

cookqq
 cookqq
发布于 2013/03/05 10:44
字数 1795
阅读 1.1W
收藏 11

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。下面详细的解释常用类的作用

ConnectionFactory 接口(连接工厂) 用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接) 连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标) 目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
MessageConsumer 接口(消息消费者) 由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者) 由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息) 是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分: 消息头(必须):包含用于识别和为消息寻找路由的操作设置。 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 消息接口非常灵活,并提供了许多方式来定制消息的内容。
Session 接口(会话) 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

JMS的消息模式有1.点对点的消息模式(Point to Point Messaging)

2.发布订阅模式(publish – subscribe Mode)

这里基于点对点的消息模式进行ActiveMQ发消息和收消息过程的分析,请看模型图:

点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder(客户端A) 发送消息,receive(客户端B)接收消息。具体点就是客户端A发送Message Queue ,而 客户端B从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在 任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行 。

请看下面发消息和收消息的例子

package com.activemq.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class ActiveMqTest {
 
 private static String queueName = "activemq_queue_";


 public static void main(String[] args) {
 Receiver receiver=new Receiver();
 Sender sender =new Sender();
 try {
 sender.send();
 receiver.receive();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }


 static class Receiver {
 public static void receive() throws Exception {
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 Connection connection = connectionFactory.createConnection();
 connection.start();
 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);
 MessageConsumer consumer = session.createConsumer(destination);
 //第一种情况
 int i = 0;
 while (i < 3) {
 i++;
 TextMessage message = (TextMessage) consumer.receive();
 session.commit();
 // TODO something....
 System.out
 .println("收到消息:" +message.getText());
 }
 session.close();
 connection.close();
 //----------------第一种情况结束----------------------
 //第二种方式
//			consumer.setMessageListener(new MessageListener() {
//				public void onMessage(Message arg0) {
//					if(arg0 instanceof TextMessage){
//						try {
//							System.out.println("arg0="+((TextMessage)arg0).getText());
//						} catch (JMSException e) {
//							e.printStackTrace();
//						}
//					}
//				}
//			});
 //第三种情况
//			 while (true) {
//            Message msg = consumer.receive(1000);
//            TextMessage message = (TextMessage) msg;
//            if (null != message) { 
//           	 System.out.println("收到消息:" + message.getText());
//            } 
//        }
 }
 }


 static class Sender {
 public static void send() throws Exception {
 ConnectionFactory connectionFactory = null;
 connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, //null
                ActiveMQConnection.DEFAULT_PASSWORD, //null
                "tcp://localhost:61616");


 Connection connection = connectionFactory.createConnection();
 connection.start();


 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);


 MessageProducer producer = session.createProducer(destination);
 for (int i = 0; i < 3; i++) {
 TextMessage message = session.createTextMessage("count"+new Date().getTime());
 Thread.sleep(1000);
 // 通过消息生产者发出消息
 System.out.println("发送消息"+i+new Date());
 producer.send(message);
 }
 session.commit();
 session.close();
 connection.close();
 }
 }
}

Sender主要的作用是发送消息,Receiver主要的作用是接受消息,并且显示一下接收消息的内容,这里详细的解释接受消息的方法:

(1)第一种方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者会一直等待下去,直到有消息到达,或者超时。

其实第一种方法和第三种方法接受原理一样,区别是第一种知道要接受消息的条数,接受完消息,手动关系连接。而第三种不知道要接受多少条数据,所以使用while (true) 死循环直接在接受消息

(2)第二种方法:消息消费者注册一个MessageListener当有消息到达的时候,会回调它的onMessage()方法。

这里需要注意的是,你注册完成MessageListener,千万不要关闭连接session.close();和connection.close();因为你刚刚注册完成监听器,就把连接关闭,就不会受到消息,所以监听器中也不会有处理。(这个问题可把我整哭了,搞了半天,才弄明白)

请看ActiveMQ 页面上显示队列的信息

name是队列名称

Number Of Pending Messages  是队列中有多少个消息等待出队列

Number Of Consumers  是队列中有多少个消费者

Messages Enqueued  队列共有多少个信息

Messages Dequeued  是队列中已经出列多少个消息

开发中遇到的异常:

(1)javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect

拒绝连接,原因是activemq服务器没有开启。

解决办方法:开启activemq服务器,请参照《activemq跑起来



© 著作权归作者所有

上一篇: shell-基础命令
下一篇: ActiveMQ跑起来
cookqq

cookqq

粉丝 119
博文 268
码字总数 156096
作品 0
海淀
技术主管
私信 提问
加载中

评论(0)

ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
1.3W
1
请问网站项目怎么和ActiveMQ服务器交互?

ActiveMQ是个单独的服务器,用的是\apache-activemq-5.4.0。在服务器上activemq.bat启动时,能正常的打开 http://localhost:8161/demo/ 和 http://localhost:8161/admin/。 那我们现在一个网...

wangyingdong
2010/10/20
1.7K
5
ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现

初次发博文,勿喷~~ 最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化; 真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,...

JingHaiChao
2012/05/14
1W
7
品优购商城项目(五)消息中间件 ActiveMQ

消息中间件用于降低各个项目模块的耦合,适用于不需要等待返回消息才能进入下一个业务环节的模块,以及实时要求性不高的业务模块。 一、JMS   JMS(Java Messaging Service)是Java平台上...

osc_uwpj27el
2019/09/09
1
0
Springboot整合ActiveMQ(Queue和Topic两种模式)

写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目。这里对学习Springboot的一些知识总结记录一下。如果你也在学习SpringBoot,可以关注我,一起学习,一起进步。 文章目...

一枕江风
04/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

比较好用的自媒体分发工具是哪个?

正是源于互联网生态的健康发展,从15年开始到现在,自媒体文化产业特别有关注度,其品牌形象鲜明、优秀的转化能力的数不胜数的亮点,故而吸引着数不清的创作者,而为了达到更出彩的播放量,这...

易媒助手
16分钟前
25
0
idea2020.1 pojie

前几天最新版的idea2020.1终于发布了,它有多香相信小伙伴们已经有所耳闻。现在就拿出来分享给小伙伴们。   所需要的的东西放在这里了,同时里面也放了一个idea2020.1,小伙伴可以选择性的...

我喜欢你有用吗__
20分钟前
32
0
从企业微信机器人到小爱同学,用 Serverless 实现生活智能化!

通过定时触发器,可以简单快速地定制一个企业微信机器人。我们可以用它来实现喝水、吃饭提醒等小功能,还能实现定时推送新闻、天气,甚至是监控告警的小功能。 使用企业微信机器人 在企业微信...

腾讯云Serverless
23分钟前
32
0
socket编程基础,从了解到实现

java生下来一开始就是为了计算机之间的通信,因此这篇文章也将开始介绍一下java使用socket进行计算机之间的通信,在上一篇文章中已经对网络通信方面的基础知识进行了总结,这篇文章将通过代码...

虹越云霄
23分钟前
41
0
Spring Enable***功能

spring博大精深,衍生出了两大系列:spring boot 和sping cloud快速业务开发模式。 我们进行具体开发时,经常看到这样以Enable*开头的注解,如图 等等好多例子。。。框架自带的Enable* @En...

董广明
25分钟前
41
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部