文档章节

RabbitMQ初探(基于SpringBoot)

justkoding
 justkoding
发布于 2016/07/08 16:31
字数 1200
阅读 969
收藏 5
点赞 1
评论 0

0 说明

最近接触RabbitMQ,需要与Spring进行集成,于是对其做了一些了解,并做了很简单的演示。 对RabbitMQ和Spring Boot的介绍不属于本文内容,如有兴趣,请自行查阅资料。相应演示代码托管在码云,演示所用到的工具有

  • JDK1.8
  • Gradle2.11
  • Spring Boot 1.3.5
  • RabbitMQ 3.3.5

因为Spring对AMQP模块所提供的默认实现是基于RabbitMQ的,因此,只需要在依赖中引入org.springframework.boot:spring-boot-starter-amqp

1 使用Queue发送消息

根据RabbitMQ的说明,实际上发现消息的组件是exchange而不是queue,本节标题所描述使用queue发送消息,实际对应Introduction Demo,仅仅声明Queue就可以,因为发送消息的时候,RabbitMQ使用的是内置默认exchange。

本演示并没有采用String作为消息类型,而是构造了一个简单的消息结构

**注意:消息实体一定要实现Serializable**接口

FixMessage

public class FixMessage implements Serializable {
    private long messageId;
    private byte[] data;

    public FixMessage() {
        messageId = new Random().nextInt(100000000);
        data = UUID.randomUUID().toString().getBytes();
    }

    @Override
    public String toString() {
        return "FixMessage is: {\"id\": '" + messageId + "'
            , \"data\": '" + new String(data) + "'";
    }
}

由于使用了SpringBoot,因此,大部分使用Spring时的基础工作可以忽略,为了模拟消息发送场景,采用定时任务进行消息发送

Application.java

@SpringBootApplication
@EnableScheduling
public class Application {
    @Value("${default.queue}")
    public String QUEUE_NAME ;

    @Autowired
    AmqpTemplate mAmqpTemplate;

    @Bean
    Queue queue(){
        return new Queue(QUEUE_NAME, true);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Scheduled(fixedRate = 1000)
    public void sender(){
        FixMessage message = new FixMessage();
        System.out.println("Sending message..." + message);
        mAmqpTemplate.convertAndSend(queue().getName(),message);
    }
}

sender()方法每隔一秒发送一条消息,发送消息使用的是AmqpTemplate,它的主要职责就是用于消息的接收和发送,当然,因为默认实现是RabbitMQ,所以实际是使用RabbitTemplate完成消息发送工作。至于@Value所指定的队列名称,写在配置文件中就可以了。

运行gradle bootRun,可以看到应用会定时发送消息。如果RabbitMQ服务启用了rabbitmq_management插件的话,访问默认管理界面http://localhost:15672,使用guest登录,可以看到对应的queue中已经有积压未处理的消息了。

2 消息处理

用于处理消息的组件被称为consumer,我们知道consumer其实是监听器,当发现监听对象(queue)中有消息的时候,就会进行处理。所以,在构造consumer监听器实例的时候,需要提供消息处理器。

消息处理器

public void handleMessage(FixMessage message) {
    System.out.println("Received >>> " + message + " by default " +
            "message handler");
}

public void receiveMessage(FixMessage message) {
    System.out.println("Received >>> " + message + " by non-default " +
            "message handler");
}

注意:因为消息监听器提供消息处理器的时候,需要指定处理器对应的类,以及特定消息对应的处理方法,但是如果处理消息的方法签名为handleMessage的时候,可以不必指定

2.1 编码实现监听器

Application.java

@SpringBootApplication
@EnableScheduling
public class Application {
    @Value("${default.queue}")
    public String QUEUE_NAME ;

    @Autowired
    AmqpTemplate mAmqpTemplate;

    @Autowired
    AnnotationConfigApplicationContext context;

    @Bean
    Queue queue(){
        return new Queue(QUEUE_NAME, true);
    }

    @Bean
    SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory((ConnectionFactory) context.getBean("rabbitConnectionFactory"));
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(
            new MessageListenerAdapter(new FixMessageHandler())
        );
        return container;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Scheduled(fixedRate = 1000)
    public void sender(){
        FixMessage message = new FixMessage();
        System.out.println("Sending message..." + message);
        mAmqpTemplate.convertAndSend(queue().getName(),message);
    }
}

可以看到,消息监听器需要ConnectionFactory(这里用的是默认实现——RabbitConnectionFactory),需要知道监听的queue名称,以及消息处理器

根据上面的描述,当指定消息处理器,而不指定消息处理方法名称时,会采用handleMessage处理接收到的消息,否则,需要指定处理方法名称,例如

...
container.setMessageListener(
    new MessageListenerAdapter(new FixMessageHandler(), "receiveMessage")
);
...

运行gradle bootRun,可以看出这两种方式的差异。但其实这两种方式本质是相同的,而且,消息处理器只是普通的Java类,只是通过MessageListenerAdapter包装成为MessageListener;还可以通过继承org.springframework.amqp.core.MessageListener,实现onMessage方法,完成消息处理工作,比如

另外的消息处理器

public class FixMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        System.out.println("Received Message [X] : "
                + message.toString() + " by MessageListener");
    }
}

相应地,消息监听器中设置消息处理器也要进行调整

...
container.setMessageListener(
    new FixMessageListener()
);
...

运行gradle bootRun,可以发现一些差异

2.2 使用注解@RabbitListener

Spring提供了注解@RabbitListener,可将一个普通Java类标记为消息监听器,并指定消息处理器

@Component
public class AnnotatedFixMessageListener {
    @RabbitListener(queues = "${default.queue}")
    public void messageHandler(FixMessage message){
        System.out.println("Receive Message : " + message.toString()
                + " by Annotated Message Listener");
    }
}

@RabbitListener可以修饰Class,表示被修饰的类为消息监听器类,这时候可以使用@RabbitlHandler修饰消息处理方法。同一个消息处理器类中,可以修饰多个消息处理方法,消息处理器会根据监听到的不同消息类型,调用相应的消息处理方法


接下来,会演示如何动态创建queue并启用监听器。

© 著作权归作者所有

共有 人打赏支持
justkoding
粉丝 2
博文 3
码字总数 9824
作品 0
闵行
高级程序员
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年 ⋅ 04/19 ⋅ 0

SpringBoot笔记(十一)RabbitMQ

安装Erlang RabbitMQ基于Erlang,所以得先安装Erlang http://www.erlang.org/downloads 根据自己的系统选择下载,安装完了,配置一下path即可 windows默认安装路径: 验证: 有时候可能需要重...

世外大帝 ⋅ 04/25 ⋅ 0

SpringBoot整合高级消息队列RabbitMQ及原理

一 SpringBoot 与消息概述 xhesrc="https://img-blog.csdn.net/20180619001205459?watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4OTc0NjM0/font/5a6L5L2T/fontsize/400/fill/I0JBQk......

刘信坚 ⋅ 06/19 ⋅ 0

springboot之RabbitMQ详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最...

无语年华 ⋅ 05/30 ⋅ 0

springboot使用rabbitMQ(带回调)

springboot提供了各类东西的简单集成,rabbitMQ也不例外,本文重点介绍如何集成rabbitMQ以及如何使用带回调的rabbitMQ 万年不变的第一步:pom 生产者 配置文件1:RabbitConfig 配置文件2:R...

梦想修补师 ⋅ 05/24 ⋅ 0

【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305 ⋅ 05/20 ⋅ 0

Spring Boot与RabbitMQ结合实现延迟队列的示例

背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 场景一:在订单系统中,一个用户下单之后通常有...

xiaomin0322 ⋅ 05/11 ⋅ 0

springboot中健康检查AbstractHealthIndicator

继承方法重新实现dohealthCheck方法 HealthIndicator<-AbstractHealthIndicator<-RedisHealthIndicator->doHealthCheck abstract class AbstractHealthIndicator implements HealthIndicato......

writeademo ⋅ 06/06 ⋅ 0

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream

一、前言 在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。 在这篇博客中,小白向大家介绍一个消息事件驱动框架...

kisscatforever ⋅ 04/26 ⋅ 0

springboot快速开发(二)

springboot的快速上手 1. 建立目录 controller service domain Application.java 启动类 2. 支持web org.springframework.boot spring-boot-starter-web 3. 编写第一个controller @RestContr......

u011402896 ⋅ 04/16 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

CENTOS7防火墙命令记录

安装Firewall命令: yum install firewalld firewalld-config Firewall开启常见端口命令: firewall-cmd --zone=public --add-port=80/tcp --permanent firewall-cmd --zone=public --add-po......

cavion ⋅ 27分钟前 ⋅ 0

【C++】【STL】利用chromo来测量程序运行时间与日志时间打印精确到微秒

直接上代码吧,没啥好说的。头疼。 #include <iostream>#include <string>#include <ctime>#include <sstream>#include <iomanip>#include <thread>#include <chrono>using ......

muqiusangyang ⋅ 30分钟前 ⋅ 0

Mac环境下svn的使用

在Windows环境中,我们一般使用TortoiseSVN来搭建svn环境。在Mac环境下,由于Mac自带了svn的服务器端和客户端功能,所以我们可以在不装任何第三方软件的前提下使用svn功能,不过还需做一下简...

故久呵呵 ⋅ 40分钟前 ⋅ 0

破解公司回应苹果“USB限制模式”:已攻破

本周四,苹果发表声明称 iOS 中加入了一项名为“USB 限制模式”的功能,可以防止 iPhone 在连接其他设备的时候被破解,并且强调这一功能并不是针对 FBI 等执法部门,为的是保护用户数据安全。...

六库科技 ⋅ 41分钟前 ⋅ 0

MyBtais整合Spring Boot整合,TypeHandler对枚举类(enum)处理

概要 问题描述 我想用枚举类来表示用户当前状态,枚举类由 code 和 msg 组成,但我只想把 code 保存到数据库,查询处理,能知道用户当前状态,这应该怎么做呢?在 Spring 整合MyBatis 的时候...

Wenyi_Feng ⋅ 今天 ⋅ 0

synchronized与Lock的区别

# <center>王梦龙的读书笔记第一篇</center> ## <center>-synchronized与Lock的区别</centre> ###一、从使用场景来说 + synchronized 是能够注释代码块、类、方法但是它的加锁是和解锁使用一......

我不想加班 ⋅ 今天 ⋅ 0

VConsole的使用

手机端控制台打印输出,方便bug的排查。 首先需要引入vconsole.min.js 文件,然后在文件中创造实例。就能直接使用了。 var vConsole = new VConsole(); vConsole的文件地址...

大美琴 ⋅ 今天 ⋅ 0

Java NIO之字符集

1 字符集和编解码的概念 首先,解释一下什么是字符集。顾名思义,就是字符的集合。它的初衷是把现实世界的符号映射为计算机可以理解的字节。比如我创造一个字符集,叫做sex字符集,就包含两个...

士别三日 ⋅ 今天 ⋅ 0

Spring Bean基础

1、Bean之间引用 <!--如果Bean配置在同一个XML文件中,使用local引用--><ref bean="someBean"/><!--如果Bean配置在不同的XML文件中,使用ref引用--><ref local="someBean"/> 其实两种......

霍淇滨 ⋅ 今天 ⋅ 0

05、基于Consul+Upsync+Nginx实现动态负载均衡

1、Consul环境搭建 下载consul_0.7.5_linux_amd64.zip到/usr/local/src目录 cd /usr/local/srcwget https://releases.hashicorp.com/consul/0.7.5/consul_0.7.5_linux_amd64.zip 解压consu......

北岩 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部