文档章节

初试RocketMQ消息中间件

LinkedBear
 LinkedBear
发布于 08/02 17:40
字数 1426
阅读 1988
收藏 5

1. 为什么要用MQ

  1. 在使用SpringCloud或Dubbo进行SOA架构后,不同的应用层模块(web)与业务层模块(service)要建立调用关系,也就是依赖/耦合
  2. 当模块变多时,模块间的耦合度也会逐步上升,这就需要一个解耦工具:消息中间件
  3. 另外,如果某个业务流程分为很多步,某一步特别耗时间且不稳定,整个业务的稳定性就会受很大影响,这时也需要用消息中间件来分离这些不稳定的业务过程

2. 到底什么是MQ

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

在这里面,关键的部分是“消息传递”和“消息排队”,可以保证事件的顺序性,也可以在高并发下使用。

3. 什么时候可以用MQ

执行过程长,且不需要返回结果的功能,可以利用MQ传递(MQ的异步通信特征)

4. MQ与JMS

JMS(Java Message Service),是一套接口规范,在jdk中已定义好接口(类似于JDBC,只有JDBC无法操作数据库,需要具体的驱动来实现功能)。

4.1 JMS预定义的五种消息正文格式

  1. TextMessage(String)——普通文本(用得最多)
  2. MapMessage(Map)——键值对集合(用的次多)
  3. ObjectMessage(Serializable Object)——可序列化的对象
  4. BytesMessage(byte[])——字节数组
  5. StreamMessage(Stream)——流数据

4.2 JMS的消息传递

JMS的传递模式非常像观察者模式的思路:

定义对象间的一种一对多的依赖关系,让多个观察者同时监听某一个主题现象,当一个对象的状态发生改变时,会通知所有观察者对象,所有依赖于它的对象都得到通知并被自动更新。

观察者模式——https://my.oschina.net/LinkedBear/blog/1791975

消息传递的方式有两种:

4.2.1 Queue点对点(生产者与消费者的一对一关系)

4.3.2 Topic发布-订阅(生产者与消费者的一对多关系)

5. MQ的工作原理

6. 不同MQ之间的对比

引用文章图片:https://blog.csdn.net/jasonhui512/article/details/53231566

7. 怎么用MQ

选用阿里巴巴的RocketMQ(现已被Apache接手),搭建Demo工程

参考文档:http://rocketmq.apache.org/docs/simple-example/

7.1 安装RocketMQ

参考文章:https://www.jianshu.com/p/4a275e779afa

从Apache的官网上下载运行包

配置环境变量

依次运行mqnamesrv.cmd脚本和mqbroker.cmd脚本

https://github.com/apache/rocketmq-externals.git下载监控插件,并解压

进入“rocketmq-console\src\main\resources”文件夹,打开“application.properties”进行配置

进入“rocketmq-console”文件夹,执行“mvn clean package -Dmaven.test.skip=true”,编译生成

进入“target”文件夹,执行“java -jar rocketmq-console-ng-1.0.0.jar”,启动“rocketmq-console-ng-1.0.0.jar”(此jar为SpringBoot项目)

7.2 搭建Maven工程框架

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

     <modelVersion>4.0.0</modelVersion>

     <groupId>com.linkedbear</groupId>

     <artifactId>MQ-Demo</artifactId>

     <version>0.0.1-SNAPSHOT</version>

    

     <properties>

        <rocketmq.version>4.3.0</rocketmq.version>

    </properties>



     <parent>

         <groupId>org.springframework.boot</groupId>

         <artifactId>spring-boot-starter-parent</artifactId>

         <version>2.0.0.RELEASE</version>

     </parent>



     <dependencies>

         <dependency>

              <groupId>org.springframework.boot</groupId>

              <artifactId>spring-boot-starter-web</artifactId>

         </dependency>

         <!-- RocketMQ -->

         <dependency>

             <groupId>org.apache.rocketmq</groupId>

             <artifactId>rocketmq-client</artifactId>

             <version>${rocketmq.version}</version>

         </dependency>

        

         <!-- 热部署 -->

         <dependency>

              <groupId>org.springframework.boot</groupId>

              <artifactId>spring-boot-devtools</artifactId>

         </dependency>

     </dependencies>



     <build>

         <plugins>

              <plugin>

                   <artifactId>maven-compiler-plugin</artifactId>

                   <configuration>

                       <source>1.8</source>

                       <target>1.8</target>

                   </configuration>

              </plugin>

         </plugins>

     </build>

</project>

 

 

7.3 创建工程目录结构

 

 

7.4 生产者Controller

/**
 * 生产者Controller
 * @Title ProducerController
 * @author LinkedBear
 * @Time 2018年8月2日 下午3:22:02
 */
@Controller
public class ProducerController {
    //此分组名必须保证全局唯一(考虑到负载均衡等后续问题),故封装为静态常量
    public static final String PRODUCE_GROUP_NAME = "TestGroup";
    //MQ的运行地址
    public static final String MQ_IP = "127.0.0.1:9876";
    
    @RequestMapping("/produceMessage")
    @ResponseBody
    public Map<String, Object> produceMessage() throws Exception {
        //1. 创建生产者连接(类似于JDBC中的Connection),要传入MQ的分组名
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_GROUP_NAME);
        //2. 设置MQ的运行地址
        producer.setNamesrvAddr(MQ_IP);
        //3. 开启连接
        producer.start();
        
        //4. 构造消息(重载方法较多,此处选择topic, tag, message的三参数方法)
        Message message = new Message("test_topic", "test_tag", ("test_message。。。" + Math.random()).getBytes());
        //5. 发送消息,该方法会返回一个发送结果的对象
        SendResult result = producer.send(message);
        System.out.println(result.getSendStatus());
        //6. 关闭连接
        producer.shutdown();
        
        //此处将发送结果显示在页面上,方便查看
        Map<String, Object> map = new HashMap<>();
        map.put("消息", result.getSendStatus());
        return map;
    } 
}

7.5 消费者Controller

/**
 * 消费者Controller
 * @Title ConsumerController
 * @author LinkedBear
 * @Time 2018年8月2日 下午3:22:11
 */
@Controller
public class ConsumerController {
    @RequestMapping("/getMessage")
    @ResponseBody
    public void getMessage() throws Exception {
        //1. 创建消费者连接,要传入MQ的分组名,该分组名在ProducerController中
        //此处创建的是pushConsumer,它使用监听器,给人的感觉是消息被推送的
        //pullConsumer,取消息的过程需要自己写      
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ProducerController.PRODUCE_GROUP_NAME);
        //2. 设置MQ的运行地址
        consumer.setNamesrvAddr(ProducerController.MQ_IP);
        //3. 设置消息的提取顺序
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //4. 设置消费者接收消息的Topic和Tag,此处对Tag不作限制
        consumer.subscribe("test_topic", "*");
        
        //5. 使用监听器接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt messageExt : msgs) {
                        String message = new String(messageExt.getBody(), "utf-8");
                        System.out.println("收到消息【主题:" + messageExt.getTopic() + ", 正文:" + message + "】");
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    //转换出现问题,稍后重新发送
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        
        //6. 启动消费者
        consumer.start();
    }
}

7.6 测试运行

执行http://localhost:8080/produceMessage

 

执行http://localhost:8080/getMessage

© 著作权归作者所有

共有 人打赏支持
LinkedBear
粉丝 11
博文 44
码字总数 46147
作品 0
济南
程序员
消息中间件 - Apache RocketMQ

RocketMQ是什么? 2016.11,阿里巴巴宣布捐赠RocketMQ到Apache软件基金会孵化项目 【Apache RocketMQ】RocketMQ捐赠给Apache那些鲜为人知的故事 阿里开源消息中间件RocketMQ的前世今生 Apac...

vintagewan
2014/02/18
0
21
《Apache RocketMQ用户指南》官方文档

RocketMQ–导读 原文链接 译者:小村长 最近两个项目中用到了RocketMQ消息中间件,每次都是在网上找几个Demo,而没有去看它的官方文档。年前面试某大型互联网企业。RocketMQ多有提及。今借此...

小村长
01/23
0
0
Kafka vs RocketMQ—— Topic数量对单机性能的影响

引言 上一期我们对比了三类消息产品(Kafka、RabbitMQ、RocketMQ)单纯发送小消息的性能,受到了程序猿们的广泛关注,其中大家对这种单纯的发送场景感到并不过瘾,因为没有任何一个网站的业务只...

a137268431
04/15
0
0
消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

癫狂侠
08/12
0
0
解锁事务消息,发力大数据流计算,Apache RocketMQ 开发者再聚深圳,干货满满获开源爱好者好评

7月29日,阿里中间件(Aliware)联合阿里巴巴技术协会,在深圳举办了Apache RocketMQ毕业后的第二次线下Meetup。当天现场的700名和线上三个直播平台的开源技术爱好者一起,与活动现场的Commi...

中间件小哥
08/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

windbg调试C源码级驱动

联机方式不多说了。我博客里有,英文的。 windbg联机文档 https://docs.microsoft.com/zh-cn/windows-hardware/drivers/debugger/debug-universal-drivers---step-by-step-lab--echo-kernel......

simpower
25分钟前
0
0
redis快照和AOF简介

数据持久化到硬盘:一是快照(snapshotting),二是只追加文件(append-only file AOF) 快照 核心原理:redis某个时间内存内的所有数据写入硬盘 场景:redis快照内存里面的数据 1. 用户发送bgsav...

拐美人
26分钟前
0
0
这个七夕,送你一份程序员教科书级别的告白指南

给广大爱码士们的高能预警: 今天,就是七夕了…… (单身非作战人群请速速退场!) 时常有技术GG向个推君抱怨 经过网民多年的教育 以及技术人持之以恒的自黑 冲锋衣狂热分子·格子衫骨灰级粉...

个推
30分钟前
0
0
python爬虫日志(15)cookie详解

转载:原文地址 早期Web开发面临的最大问题之一是如何管理状态。服务器端没有办法知道两个请求是否来自于同一个浏览器。那时的办法是在请求的页面中插入一个token,并且在下一次请求中将这个...

茫羽行
31分钟前
0
0
qlv视频格式转换器

  腾讯视频中的视频影视资源有很多,小编经常在里面下载视频观看,应该也有很多朋友和小编一样吧,最近热播的电视剧也不少,如《香蜜沉沉烬如霜》、《夜天子》还有已经完结的《扶摇》,这么...

萤火的萤火
35分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部