文档章节

ActiveMQ 消息服务(三)

Andy市民
 Andy市民
发布于 2015/11/09 10:13
字数 2231
阅读 104
收藏 10

想象场景:

有一条任务,需要在特定环境下进行。用ActiveMQ 来讲分两步,第一:发送者发布一条消息;第二:接收者接收到这条消息后需要干某些事情。

本文依然直接贴出demo代码!

1、项目结构图:

2、activeMQ的jar包依赖,部分pom.xml文件代码:

<dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.7.0</version>
		</dependency>
		<!-- activemq-spring -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-spring</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>3.0.7.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.6.1</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.16</version>
		</dependency>
	</dependencies>

3、日志属性文件log4j.properties:


log4j.rootLogger=DEBUG,INFO,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} [%c]-[%p] %m%n

4、消息接收配置属性文件receive.properties:


jsm_url=tcp://localhost:61616
jms_name=com.andy.demo.test
jsm_type=topic
fliter=
test_key=com.andy.demo.util.activeMQ.DoSomethingImpl
max_caches=100

5、消息发送配置属性文件send.properties:


jsm_url=tcp://localhost:61616
jms_name=com.andy.demo.test
jsm_type=topic
max_caches=100
persist=persist

6、场景中说到的,当我们收到消息后需要处理一些事情

本例中将需要处理的事情摘出来,分成需要处理事情的接口以及实现类两部分:

(一)接口IDoSomething.java:

package com.andy.demo.activeMQ.work;

import javax.jms.Message;

// 处理一些事情 的接口
public interface IDoSomething {
	// 干点实事1
	public void doSomeThing01(Message message);

	// 干点实事2
	public void doSomeThing02(Message message);
}
(二)接口实现类DoSomethingImpl.java:
package com.andy.demo.activeMQ.work.impl;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

import com.andy.demo.activeMQ.work.IDoSomething;

// 处理一些事情 的接口实现类
public class DoSomethingImpl implements IDoSomething {

	@Override
	public void doSomeThing01(Message message) {
		// TODO Auto-generated method stub
		if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			try {
				System.out.println("doSomeThing01 处理的消息内容:" + msg.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

	@Override
	public void doSomeThing02(Message message) {
		// TODO Auto-generated method stub
		if (message instanceof TextMessage) {
			TextMessage msg = (TextMessage) message;
			try {
				System.out.println("doSomeThing02 处理的消息内容:" + msg.getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}

}

7、消息生产者或者叫消息发送者Sender.java:

package com.andy.demo.activeMQ;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :Sender.java<br>
 * @describe :消息发送者<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:21:33<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class Sender {
	private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);

	private ActiveMQConnectionFactory factory;
	private Connection conn;
	private Session session;
	private Destination destination;
	private MessageProducer producer;

	private String url;
	private String jmsname;
	private boolean isTopic;
	private boolean isPersist;
	private boolean isConnection;
	private BlockingQueue<String> queue;

	private String msg;

	public Sender(String url, String jmsname, boolean isTopic,
			boolean isPersist, int maxcaches) {
		super();
		System.out.println("Sender.Sender(): 通过构造函数实例化对象......");
		this.url = url;
		this.jmsname = jmsname;
		this.isTopic = isTopic;
		this.isPersist = isPersist;
		this.queue = new LinkedBlockingQueue<String>(maxcaches);
	}

	public static Sender getSenderCase(String url, String jmsname,
			boolean isTopic, boolean isPersist, int maxcaches) {
		System.out.println("Sender.getSenderCase(): 通过静态方法实例化对象......");
		return new Sender(url, jmsname, isTopic, isPersist, maxcaches);
	}

	public void addMessage(String msg) throws InterruptedException {
		System.out.println("Sender.addMessage(): 向队列添加消息......");
		queue.put(msg);
	}

	private void sendMsg(String msg) throws InterruptedException, JMSException {
		System.out.println("Sender.sendMsg(): 向服务器发送消息......");
		Thread.sleep(5 * 1000);
		producer.send(session.createTextMessage(msg));
	}

	public void send() {
		System.out.println("Sender.send(): 从队列中取出消息......");
		while (!queue.isEmpty()) {
			try {
				msg = queue.take();
				initActiveMQ();
				sendMsg(msg);
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
				close();
			}
		}
		close();
	}

	public void sendMessage(String msg) throws JMSException,
			InterruptedException {
		System.out.println("Sender.sendMessage(): 发送消息主方法开始运行......");
		initActiveMQ();
		sendMsg(msg);
		close();
	}

	// 初始化 activeMQ
	private void initActiveMQ() throws JMSException {
		System.out.println("Sender.initActiveMQ(): 初始化 activeMQ......");
		if (isConnection) {
			return;
		}
		factory = new ActiveMQConnectionFactory(url);
		conn = factory.createConnection();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

		destination = isTopic ? session.createTopic(jmsname) : session
				.createQueue(jmsname);
		producer = session.createProducer(destination);
		producer.setDeliveryMode(isPersist ? DeliveryMode.PERSISTENT
				: DeliveryMode.NON_PERSISTENT);

		isConnection = true;
	}

	// 关闭释放资源
	private void close() {
		System.out.println("Sender.close(): 关闭释放资源......");
		try {
			producer.close();
			session.close();
			conn.close();
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
		isConnection = false;
	}
}

8、消息订阅者或者叫消息接收者Receiver.java:

package com.andy.demo.activeMQ;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.andy.demo.activeMQ.work.IDoSomething;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :Receive.java<br>
 * @describe :消息接收者<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:20:27<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class Receiver extends Thread implements MessageListener,
		ExceptionListener, Runnable {
	private static final Logger LOGGER = LoggerFactory
			.getLogger(Receiver.class);

	private ActiveMQConnectionFactory factory;
	private Connection conn;
	private Session session;
	private Destination destination;
	private MessageConsumer consumer;

	private String url;
	private String jmsname;
	private boolean isTopic;
	private String filter;
	private BlockingQueue<Message> queue;

	private IDoSomething doSomething;

	public Receiver(String url, String jmsname, boolean isTopic, String filter,
			IDoSomething doSomething, int maxcaches) {
		System.out.println("Receiver.Receiver(): 构造函数实例化对象......");
		this.url = url;
		this.jmsname = jmsname;
		this.isTopic = isTopic;
		this.filter = filter;
		this.doSomething = doSomething;
		queue = new LinkedBlockingQueue<Message>(maxcaches);
	}

	public static Receiver getReceiverCase(String url, String jmsname,
			boolean isTopic, String filter, IDoSomething doSomething,
			int maxcaches) throws JMSException {
		System.out.println("Receiver.getReceiverCase(): 静态方法实例化对象......");
		Receiver receiver = new Receiver(url, jmsname, isTopic, filter,
				doSomething, maxcaches);
		receiver.initActiveMQ();
		receiver.start();
		return receiver;
	}

	// 初始化 activeMQ 参数
	private void initActiveMQ() throws JMSException {
		System.out.println("Receiver.initActiveMQ():初始化activeMQ.......");
		factory = new ActiveMQConnectionFactory(url);
		conn = factory.createConnection();
		conn.start();
		session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

		destination = isTopic ? session.createTopic(jmsname) : session
				.createQueue(jmsname);
		consumer = (isNull(filter)) ? session.createConsumer(destination)
				: session.createConsumer(destination, filter);
		consumer.setMessageListener(this);
		conn.setExceptionListener(this);
	}

	public void close() {
		System.out.println("Receiver.close(): 关闭释放资源......");
		try {
			session.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			LOGGER.error(e.getMessage());
		}
		try {
			conn.stop();
			conn.close();
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
	}

	// 判断是否为空
	private boolean isNull(String param) {
		return param == null || param.equals("");
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
	 */
	@Override
	public void onException(JMSException exception) {
		// TODO Auto-generated method stub
		System.out.println("Receiver.onException():activeMQ 异常监听......");
		while (true) {
			try {
				initActiveMQ();
				break;
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
				try {
					Thread.sleep(10 * 1000);
				} catch (InterruptedException e1) {
					// TODO Auto-generated catch block
					LOGGER.error(e1.getMessage());
				}
			}
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
	 */
	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		System.out.println("Receiver.onMessage(): activeMQ 消息接收监听......");
		try {
			if (isTopic) {
				queue.put(message);
			} else {
				doSomethingWork(message);
			}
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		}
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see java.lang.Thread#run()
	 */
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("Receiver.run(): Runnble接口监听......"
				+ Thread.currentThread().getName());
		while (true) {
			Message message = null;
			try {
				message = queue.take();
				doSomethingWork(message);
			} catch (Exception e) {
				// TODO: handle exception
				LOGGER.error(e.getMessage());
			}
		}
	}

	// 具体任务
	public void doSomethingWork(Message message) {
		System.out.println("Receiver.doSomethingWork(): 开始干实事了......");
		doSomething.doSomeThing01(message);
		doSomething.doSomeThing02(message);
	}

}

9、消息发送者和消息接收者的封装类ActiveMQUtils.java:

package com.andy.demo.activeMQ;

import java.io.InputStream;
import java.util.Properties;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.andy.demo.activeMQ.work.IDoSomething;

/**
 * @package :com.andy.demo.util.activeMQ<br>
 * @file :ActiveMQUtils.java<br>
 * @describe :<br>
 * @author :wanglongjie<br>
 * @createDate :2015年11月6日下午1:22:13<br>
 * @updater :<br>
 * @updateDate :<br>
 * @updateContent :<br>
 */
public class ActiveMQUtils {
	private static final Logger LOGGER = LoggerFactory
			.getLogger(ActiveMQUtils.class);

	public static final String jms_url = null;
	public static final String jms_name = null;
	public static final String jms_filter = null;
	public static final String jms_type = "topic";
	public static final String test_key = null;
	public static final String persist = "persist";

	/**
	 * 
	 * @method :getReceiverBean<br>
	 * @describe :获取 消息接收者实例<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午4:03:31 <br>
	 * @param properties
	 * @param doSomething
	 * @return Receiver
	 * @throws JMSException
	 * 
	 */
	public static Receiver getReceiverBean(String properties,
			IDoSomething doSomething) throws JMSException {
		Properties p = loadProperties(properties);
		String url = p.getProperty("jsm_url");
		String jmsname = p.getProperty("jms_name");
		boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic");
		String filter = p.getProperty("fliter");
		int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000"));
		Receiver receiver = Receiver.getReceiverCase(url, jmsname, isTopic,
				filter, doSomething, maxcaches);
		return receiver;
	}

	/**
	 * 
	 * @method :getSenderCase<br>
	 * @describe :获取 消息发送者实例<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午4:03:48 <br>
	 * @param properties
	 * @return Sender
	 */
	public static Sender getSenderCase(String properties) {
		Properties p = loadProperties(properties);
		String url = p.getProperty("jsm_url");
		String jmsname = p.getProperty("jms_name");
		boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic");
		boolean isPersist = p.getProperty("persist", "persist").equals(
				"persist");
		int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000"));

		Sender sender = Sender.getSenderCase(url, jmsname, isTopic, isPersist,
				maxcaches);
		return sender;
	}

	/**
	 * 
	 * @method :loadProperties<br>
	 * @describe :加载 属性文件<br>
	 * @author :wanglongjie<br>
	 * @createDate :2015年11月6日下午1:36:44 <br>
	 * @param properties
	 * @return Properties
	 */
	private static Properties loadProperties(String properties) {
		InputStream in = null;
		try {
			in = ActiveMQUtils.class.getResourceAsStream(properties);
			Properties p = new Properties();
			p.load(in);
			return p;
		} catch (Exception e) {
			// TODO: handle exception
			LOGGER.error(e.getMessage());
		} finally {
			try {
				in.close();
			} catch (Exception e2) {
				// TODO: handle exception
				LOGGER.error(e2.toString());
			}
		}
		return null;
	}
}

10、测试类,分发送消息测试、接收消息测试两部分:

(一)消息发送测试类SenderAPPTest.java:

package com.andy.demo.activeMQ.test;

import java.util.Date;

import javax.jms.JMSException;

import com.andy.demo.activeMQ.ActiveMQUtils;
import com.andy.demo.activeMQ.Sender;

// 发送消息测试类
public class SenderAPPTest {
	public static void main(String[] args) {
		String properties = "/send.properties";
		Sender sender = ActiveMQUtils.getSenderCase(properties);
		int sum = 5;
		for (int i = 0; i < sum; i++) {
			String msg = new Date(System.currentTimeMillis()) + " ; Hello, I am andy And this is a activeMQ test[" + (i + 1 ) + "]!";
			try {
				sender.sendMessage(msg);
			} catch (JMSException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			try {
				Thread.sleep(10 * 1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}
(二)接收消息测试类ReceiverAPPTest.java:

package com.andy.demo.activeMQ.test;

import javax.jms.JMSException;

import com.andy.demo.activeMQ.ActiveMQUtils;
import com.andy.demo.activeMQ.Receiver;
import com.andy.demo.activeMQ.work.impl.DoSomethingImpl;

// 接收消息测试类
public class ReceiverAPPTest {
	public static void main(String[] args) {
		String properties = "/receive.properties";
		DoSomethingImpl doSomething = new DoSomethingImpl();
		int num = 5;
		Receiver[] receivers = new Receiver[num];
		try {
			for (int i = 0; i < num; i++) {
				receivers[i] = ActiveMQUtils.getReceiverBean(properties,
						doSomething);
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

11、本例的源码已贴完,再说一下JMS中topic和queue的区别:

(一)topic:

在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
所以把配置文件中的jsm_type属性写成topic测试时,必须先启动ReceiverAPPTest.java接收者,然后在启动SenderAPPTest.java 发送者。也就是说在topic情况下,是先有接收者存在的情况下才能接收到发送者发送的消息。

(二)queue:

MS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡
所以把配置文件中的jms_type属性写成queue测试时,先启动发送者或者接收者都可以。

12、更加详细的ActiveMQ的通信方法可以参考上一篇博客。


© 著作权归作者所有

Andy市民
粉丝 11
博文 37
码字总数 32801
作品 0
房山
程序员
私信 提问
ActiveMQ安装配置和使用简例

本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/48173665 ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件...

开开心心过
2015/09/02
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
2018/04/15
0
0
ActiveMQ队列消息积压问题调研

摘要 公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。 调研提出了三种方案。这里是相关记录和说明。 问题 运维同事对生产环境使用的ActiveM...

winters1224
2018/06/26
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
2018/06/26
0
0
ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
12.4K
1

没有更多内容

加载失败,请刷新页面

加载更多

面向对象编程

1、类和对象 类是对象的蓝图和模板,而对象是实例;即对象是具体的实例,类是一个抽象的模板 当我们把一大堆拥有共同特征的对象的静态特征(属性)和动态特征(行为)都抽取出来后,就可以定...

huijue
今天
11
0
redis异常解决 :idea启动本地redis出现 jedis.exceptions.JedisDataException: NOAUTH Authentication required

第一次安装在本地redis服务,试试跑项目,结果却出现nested exception is redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required错误,真是让人头疼 先检查一...

青慕
今天
20
0
Spring 之 IoC 源码分析 (基于注解方式)

一、 IoC 理论 IoC 全称为 Inversion of Control,翻译为 “控制反转”,它还有一个别名为 DI(Dependency Injection),即依赖注入。 二、IoC方式 Spring为IoC提供了2种方式,一种是基于xml...

星爵22
今天
28
0
Docker安装PostgresSql

Docker安装PostgresSql 拉取docker镜像 # docker pull postgres:10.1010.10: Pulling from library/postgres9fc222b64b0a: Pull complete 38296355136d: Pull complete 2809e135bbdb: Pu......

Tree
今天
13
0
内容垂直居中

方法一: 采用上下 padding 形式,将内容放置在垂直居中 .line { padding: 2% 0; text-align: center; height: 5px;} <div class="line"> 内容垂直居中</div> 方法二: 采......

低至一折起
今天
23
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部