今天继续给大家分享的是ActiveMQ,如有不足,敬请指教。
上次我们说到,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?
这就需要使用ActiveMQ监听器来监听队列,持续消费消息。
一、ActiveMQ监听器
1.1 配置步骤说明
- 创建一个监听器对象。
- 修改消费者代码,加载监听器
1.2 配置步骤
1.2.1 创建监听器MyListener类
package com.xkt.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @author lzx
*
*/
public class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (null != message) {
if (message instanceof TextMessage) {
try {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("监听到的消息是 " + content);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
1.2.2 修改MyConsumer代码,加载监听器
- 监听器需要持续加载,因此消费程序不能结束。这里我们使用输入流阻塞消费线程结束
package com.xkt.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.xkt.listener.MyListener;
/**
* @author lzx
*
*/
public class Myconsumer {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private Destination destination;
private MessageConsumer consumer;
public void receiveFromMq() {
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地, 目的地命名即队列命名, 消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("queue");
// 5.创建消息消费者, 创建的消息消费者与某目的地对应, 即方法参数目的地
consumer = session.createConsumer(destination);
// 7.加载监听器
consumer.setMessageListener(new MyListener());
// 监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。监听指定队列,只要有消息进来,就消费这条消息
System.in.read();
// 在java项目中,可以通过IO阻塞程序,持续加载监听器
// 在web项目中,可以通过配置文件,直接加载监听器。
} catch (Exception e) {
e.printStackTrace();
System.out.println("读取失败");
} finally {
if (null != consumer) {
try {
consumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
1.2.3 测试
- 多次运行生产者,发送多条消息到队列中
图示 |
---|
![]() |
- 运行消费者。观察结果
图示 |
---|
![]() |
- 查看ActiveMQ管理控制界面,所有消息都被消费了!
图示 |
---|
![]() |
在以上示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?
二、Topic模式实现
2.1 配置步骤说明
- 搭建ActiveMQ消息服务器。(略)
- 创建主题订阅者。
- 创建主题发布者。
2.2 配置步骤
2.2.1 创建主题订阅者MySubscriber
- 说明:主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟
package com.xkt.subscriber;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MySubscriber implements Runnable {
/**
* 多线程的线程安全问题 解决方案:
*
* (1)加锁 --极不推荐 (2)不使用全局变量 ---> SpringMVC是线程安全的吗? 答:默认不是 解决办法:(1)使用原型模式--不推荐
* (2)不使用全局变量 (3)ThreadLocal (3)其它框架来代替,比如redis
*/
private TopicConnectionFactory factory;
private TopicConnection connection;
private TopicSession session;
private Topic topic;
private TopicSubscriber subscriber;
private Message message;
@Override
public void run() {
try {
// 1、创建连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2、创建连接
connection = factory.createTopicConnection();
connection.start();
// 3、创建会话
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建topic主题
topic = session.createTopic("topic-gzsxt");
// 5、创建订阅者
subscriber = session.createSubscriber(topic);
// 6、订阅
while (true) {
message = subscriber.receive();
if (null != message) {
if (message instanceof TextMessage) {
TextMessage tMsg = (TextMessage) message;
String content = tMsg.getText();
System.out.println("订阅者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
}
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.2.2 修改测试类
package com.xkt.test;
import com.xkt.subscriber.MySubscriber;
/**
* @author lzx
*
*/
public class TestMQ {
public static void main(String[] args) {
MySubscriber sub = new MySubscriber();
Thread t1 = new Thread(sub);
Thread t2 = new Thread(sub);
t1.start();
t2.start();
}
}
2.2.3 查看测试结果
- 查看AcitveMQ管理界面 |图示 | | :------------: | |
|
2.2.4 创建主题发布者MyPublisher
package com.xkt.publish;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @author lzx
*
*/
public class MyPublisher {
private TopicConnectionFactory factory;
private TopicConnection connection;
private TopicSession session;
private Topic topic;
private TopicPublisher publisher;
private Message message;
public void publish(String msg) {
try {
// 1、创建连接工厂
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");
// 2、创建连接
connection = factory.createTopicConnection();
connection.start();
// 3、创建会话
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建topic主题
topic = session.createTopic("topic-gzsxt");
// 5、创建发布者
publisher = session.createPublisher(topic);
// 6、创建消息对象
message = session.createTextMessage(msg);
// 7、发布消息
publisher.publish(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != publisher) {
try {
publisher.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.stop();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (null != session) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
2.2.5 修改测试类
package com.xkt.test;
import org.junit.Test;
import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;
/**
* @author lzx
*
*/
public class TestMQ {
public static void main(String[] args) {
MySubscriber sub = new MySubscriber();
Thread t1 = new Thread(sub);
Thread t2 = new Thread(sub);
t1.start();
t2.start();
}
@Test
public void publish() {
MyPublisher publisher = new MyPublisher();
publisher.publish("hello,欢迎收听FM 89.9频道-交通频道");
}
}
2.2.6 查看测试结果
2.3 Topic小结
- Topic模式能够实现多个订阅者同时消费消息。
- Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。
- 通常可以用来解决公共消息推送的相关业务。
版权说明:欢迎以任何方式进行转载,但请在转载后注明出处!