使用最为广泛的三款消息中间件:RabbitMQ、RocketMQ、Kafka。
RabbitMQ
RabbitMQ开始是用在电信业务的可靠通信的,也是少有的几款支持AMQP协议的产品之一。
优点: 1. 轻量级,快速,部署使用方便 2. 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路 由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。 3. RabbitMQ的客户端支持大多数的编程语言。
缺点: 1. 如果有大量消息堆积在队列中,性能会急剧下降 2. RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。 3. RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高。
为什么消息的堆积导致性能下降?
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息 的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继 而情况变得越来越恶化,使得系统的处理能力大大降低。
RocketMQ
RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。 RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。
RocketMQ几乎具备了消息队列应该具备的所有特性和功能。 java开发,阅读源代码、扩展、二次开发很方便。 对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。 性能比RabbitMQ高一个数量级,每秒处理几十万的消息。
缺点: 跟周边系统的整合和兼容不是很好。
Kafka
Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。 跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持 Kafka,Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。Kafka是Scala和Java开发的,对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息。 如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。 但是由于是异步的和批处理的,延迟也会高,不适合电商场景。
消息中间件应用场景
电商秒杀场景
当秒杀开始前,用户在不断的刷新页面,系统应该如何应对高并发的读请求呢?
在秒杀开始时,大量并发用户瞬间向系统请求生成订单,扣减库存,系统应该如何应对高并发的写请求呢?
系统应该如何应对高并发的读请求:
使用缓存策略将请求挡在上层中的缓存中
能静态化的数据尽量做到静态化
加入限流(比如对短时间之内来自某一个用户,某一个IP、某个设备的重复请求做丢弃处理)
系统应该如何应对高并发的写请求:
生成订单,扣减库存,用户这些操作不经过缓存直达数据库。如果在 1s内,有 1 万个数据连接同时到达,系统的数据库会濒临崩溃。如何解决这个问题呢?我们可以使用消息队列。
消息队列的作用:
削去秒杀场景下的峰值写流量——流量削峰(削去秒杀场景下的峰值写流量,将秒杀请求暂存于消息队列,业务服务器响应用户“秒杀结果正在处理中。。。”,释放系统资源去 处理其它用户的请求)
通过异步处理简化秒杀请求中的业务流程——异步处理(先处理主要的业务,异步处理次要的业务。 如主要流程是生成订单、扣减库存;次要流程比如购买成功之后会给用户发优惠券,增加用户的积分)
解耦,实现秒杀系统模块之间松耦合——解耦(实现秒杀系统模块之间松耦合,将数据全部发送给消息队列,然后数据服务订阅这个消息队列,接收数据进行处理)
B端C端数据同步场景
B端面向企业用户,C端面向求职者。这两个模块业务处理逻辑不同,数据库表结构不同,实际上是处于解耦的状态。但是各自又需要对方的数据,需要共享:如 1. 当C端求职者在更新简历之后,B端企业用户如何尽早看到该简历更新? 2. 当B端企业用户发布新的职位需求后,C端用户如何尽早看到该职位信息?
如何解决B端C端数据共享的问题?
1. 同步方式:B端和C端通过RPC或WebService的方式发布服务,让对方来调用,以获取对方的信息。求职者每更新一次简历,就调用一次B端的服务,进行数据的同步;B端企业用户每更新职位需求,就调用C端的服务,进行数据的同步。
2. 异步方式:使用消息队列,B端将更新的数据发布到消息队列,C端将更新的数据发布到消息队列,B端订阅C端的消息队列,C端订阅B端的消息队列。
使用同步方式,B端和C端耦合比较紧密,如果其中一个服务有问题,可能会导致另一个服务不可用。使用消息队列的异步方式,对B端C端进行解耦,只要消息队列可用,双方都可以将需要同步的信息 发送到消息队列,对方在收到消息队列推送来的消息的时候,各自更新自己的搜索引擎,更新自己的缓存数据。
JMS规范
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送 消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。 它类似于JDBC(Java Database Connectivity)。消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
根据有效负载的类型来划分,可以将消息分为几种类型: 1. 简单文本(TextMessage) 2. 可序列化的对象(ObjectMessage) 3. 属性集合(MapMessage) 4. 字节流(BytesMessage) 5. 原始值流(StreamMessage) 6. 无有效负载的消息(Message)。
对象模型:ConnectionFactory 接口(连接工厂)、Connection 接口(连接)、Destination 接口(目标)、Session 接口(会话)、MessageConsumer 接口(消息消费者)、MessageProducer 接口(消息生产者)、Message 接口(消息)
点对点模式
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费 者的队列,并直接将消息发送到消费者的队列,概括为: 一条消息只有一个消费者获得 生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行 状态。 每一个成功处理的消息要么自动确认,要么由接收者手动确认。
发布/订阅模式
支持向一个特定的主题发布消息。0或多个订阅者可能对接收特定消息主题的消息感兴趣。发布者和订阅者彼此不知道对方。多个消费者可以获得消息。
在发布者和订阅者之间存在时间依赖性。 发布者需要建立一个主题,以便客户能够订阅。 订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。 对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。
JMS在应用集群中的问题
点对点和发布订阅模式在集群下都存在问题,点对点浪费空间,发布订阅对业务侵入较大,ActiveMQ通过“虚拟主题”解决了这个问题。
JMS规范文档(jms-1_1-fr-spec.pdf)下载地址: https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/
AMQP协议
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于 JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0,AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
Publisher:消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。
Consumer:消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个 queue中接收消息。 Server:一个具体的MQ服务实例,也称为Broker。
Virtual host:虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。
Exchange:交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。
Routing key:路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通 常需要和具体的Exchange类型、Binding的Routing key结合起来使用
Message Queue:实际存储消息的容器,并把消息传递给最终的Consumer。
AMQP 使用的数据类型如下:
Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。 Bits(统一为8个字节):用于表示开/关值。
Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
Long strings:用于保存二进制数据块。
Field tables:包含键值对,字段值一般为字符串,整数等。