消息中间件 ActiveMQ

原创
2019/11/28 16:53
阅读数 87

常见的消息中间件产品:

【1】ActiveMQ:是 Apache 出品,最流行的,能力强劲的开源消息总线,ActiveMQ 是一个完全支持 JMS1.1 和 J2EE1.4 规范的 JMS Provider 实现,我们下面主要说明 ActiveMQ 的实现。

【2】RabbitMQ:AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯。OpenStack 开源云平台的通信组件,最先在金融行业得到运用。

【3】ZeroMQ:史上最快的消息队列系统。

【4】Kafka:Apache 下的一个子项目,特点:高吞吐,在一台普通的服务器上既可以达到 10w/s 的吞吐速率,完全的分布式系统,适合处理海量数据。

 

一、什么是JMS?

JMS(Java Message Service)是Java 平台上有关面向消息中间件的技术规范,它使消息系统中的 Java 应用程序进行消息交流,并且通过提供标准的产生、发送、接受消息的接口简化企业应用的开发。JMS 本身只定义了一系列的接口规范,是一种与厂商无关的 API。用来访问消息收发系统。它类似于 JDBC (java DataBase Connecivity),JDBC 是可以访问许多不同关系数据库的 API ,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。目前许多厂商都支持 JMS,包括 IBM 的 MQServer、BEA 的 Weblogic JMS Service 和 Progress 的 SonicMQ ,JMS 使你能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS 客户机发送消息。消息是 JMS 中的一种类型对象,有两部分组成:报头和消息主体,报头由路由信息以及有关该消息的元数据组成。消息主体则携带这应用程序的数据或有效负载。

JMS 定义了 5 种不同的消息正文格式,以及调用的消息类型,允许你发送并接受一些不同形式的数据,消息类型如下:

✔ TextMessage:一个字符串对象。

✔ MapMessage:一套键值对。

✔ ObjectMessage:一个序列化的 Java 对象。

✔ BytesMessage:一个字节的数据流。

✔ StreamMessage:Java 原始值的数据流。

二、JMS 消息传递类型

☛ 对于消息的传递有两种类型:1)、点对点模式:一个生产者对应一个消费者(一个消息只能被一个消费者消费)。

2)、另一种是发布/订阅模式:一个生产者产生消息,可由多个消费者进行接收。

三、ActiveMQ 下载与安装

1)、下载官网地址:http://activemq.apache.org/components/classic/download/

2)、安装(Linux):下载 tar 包:apache-activemq-5.15.8-bin.tar.gz 通过 tar -zxvf apache-activemq-5.15.8-bin.tar.gz 解压该压缩包。并通过 chmod 777 apache-activemq-5.15.8-bin 赋值所有权限。进入 active 的 /bin 目录,赋予执行权限 chmod 755 activemq,启动:./activemq start 如下:(可以通过 ./activemq console 查看控制台。./activemq status 查看启动状态,或者查看 active 的服务默认端口61616:netstat -anp|grep 61616 。8161是mq自带的管理后台的端口)

[root@learnVM bin]# ./activemq start

INFO: Loading '/usr/install/apache-activemq-5.15.8//bin/env'

INFO: Using java '/usr/install/jvm/jdk1.8.0_121/bin/java'

INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details

INFO: pidfile created : '/usr/install/apache-activemq-5.15.8//data/activemq.pid' (pid '3734')

3)、测试地址:http://IP:8161/ 进入 ActiveMQ 的主页面,点击:Manage ActiveMQ broker ,[输入用户名:admin] 和 [密码:admin] 进入服务页面,如下:

四、点对点模式

点对点模式主要建立在一个队列(queue)上面,当连接一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向 ActiveMQ 发送消息,此消息将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 ActiveMQ 服务器,直到有接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息只会被一个接收端接收到,哪个接收端先连上 ActiveMQ 则会先接收到,而后来的接收端则接收不到消息。通过一个 demo 简单了解一下中间件。

依赖的 jar 包:

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-client</artifactId>

<version>5.13.4</version>

</dependency>

生产者创建队列及发送消息:

public class QueueDemo {

public static void main(String[] args) throws JMSException {

//创建连接工厂,通过tcp 协议和 61616 端口进行连接

ConnectionFactory connectionFactory =

new ActiveMQConnectionFactory("tcp://192.168.159.130:61616");

//创建连接

Connection connection = connectionFactory.createConnection();

//启动连接

connection.start();

//获取会话对象session , 参数1:是否启动事务 参数2:消息确认方式

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//创建队列对象

Queue queue = session.createQueue("test-queue");

//创建消息生产者对象

MessageProducer producer = session.createProducer(queue);

//创建消息对象

TextMessage message = session.createTextMessage("欢迎学习JMS");

//通过生产者发送消息

producer.send(message);

//关闭资源

producer.close();

session.close();

connection.close();

}

}

执行成功后,看下 ActiveMQ 的 Queues 中是不是多了一条信息。Name:表示队列的名称(自己程序中定义的那个),Number Of Pending Message:表示队列中未消费的记录数,Number Of Consumers:表示消费者的个数(表示正在连接的客户端,之前连接成功后断开了不算),Message Enqueued:表示总消息记录数,Message Dequeued:表示消费掉的消息。

创建消费者: 与生产者基本相同,通过创建消费者来监听队列(相同的队列名)即可。

public class QueueConsumer {

public static void main(String[] args) throws JMSException, IOException {

//创建连接工厂,通过tcp 协议和 61616 端口进行连接

ConnectionFactory connectionFactory =

new ActiveMQConnectionFactory("tcp://192.168.159.130:61616");

//创建连接

Connection connection = connectionFactory.createConnection();

//启动连接

connection.start();

//获取会话对象session , 参数1:是否启动事务 参数2:消息确认方式

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//创建队列对象

Queue queue = session.createQueue("test-queue");

//创建消息消费者对象 ****

MessageConsumer consumer = session.createConsumer(queue);

//消费者监听消息

consumer.setMessageListener(new MessageListener() {

@Override

public void onMessage(Message message) {

TextMessage textMessage = (TextMessage)message;

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

});

//通过生产者发送消息

System.in.read();

//关闭资源

consumer.close();

session.close();

connection.close();

}

}

执行后 ActiveMQ 的变化如下图所示:

五、发布订阅模式

创建生产者对象,与队列不同之处在于将创建队列改为创建主题 topic 即可。

//创建队列对象

//Queue queue = session.createQueue("test-queue");

//创建主题对象

Topic topic = session.createTopic("test-topic");

//创建消息生产者对象

//MessageProducer producer = session.createProducer(queue);

MessageProducer producer = session.createProducer(topic);

进入 Topics 目录,查看发送的信息:

创建消费者:与队列的消费者类似,只有创建队列的部分,改为创建主题就可以:

//创建队列对象

//Queue queue = session.createQueue("test-queue");

//创建主题对象

Topic topic = session.createTopic("test-topic");

//创建消息消费者对象 ****

MessageConsumer consumer = session.createConsumer(topic);

六、Spring 整合 JMS

通过Spring配置 xml 的方式配置 JMS(将点对点和发布订阅一块说明,主要是消息的类型不同而已)

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://192.168.159.130:61616"/>

</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">

<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

<property name="targetConnectionFactory" ref="targetConnectionFactory"/>

</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

<property name="connectionFactory" ref="connectionFactory"/>

</bean>

<!--这个是队列模式,点对点的 文本信息-->

<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg value="queue_text"/>

</bean>

<!--这个是订阅模式 文本信息 如果是topic 时,使用此类,注释掉 queueTextDestination 类的注入

<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="topic_text"/>

</bean>-->

生产者公用方法:queueTextDestination:表示 xml 中注入的 queue 队列的 id 。

@Component

public class QueueProducer {

@Autowired

private JmsTemplate jmsTemplate;

//如果是topic 类型则 属性为topicTextDestination 替换 queueTextDestination 两者均为 xml 中的 id

@Autowired

private Destination queueTextDestination;

public void sendTextMessage(String text) {

jmsTemplate.send(queueTextDestination, new MessageCreator() {

@Override

public Message createMessage(Session session) throws JMSException {

return session.createTextMessage(text);

}

});

}

}

测试方法如下:

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration(locations="classpath:applicationContext-jms-producer.xml")

public class TestQueue {

@Autowired

private QueueProducer queueProducer;

@Test

public void test() {

queueProducer.sendTextMessage("Spring JMS Message");

}

}

测试结果:

消费者配置文件xml的写法:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://192.168.25.129:61616"/>

</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">

<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

<property name="targetConnectionFactory" ref="targetConnectionFactory"/>

</bean>

<!--这个是广播目的地, 文本信息

<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="topic_text"/>

</bean> -->

<!--这个是队列目的地,点对点的 文本信息-->

<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg value="queue_text"/>

</bean>

<!-- 我的监听类 -->

<bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>

<!-- 消息监听容器 -->

<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectionFactory" />

<property name="destination" ref="queueTextDestination" />

<!--<property name="destination" ref="topicTextDestination" />-->

<property name="messageListener" ref="myMessageListener" />

</bean>

完成配置中定义的 myMessageListener 类的定义,实现 MessageListener 接口:

public class myMessageListener implements MessageListener{

@Override

public void onMessage(Message message) {

TextMessage textMessage = (TextMessage)message;

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部