文档章节

RabbitMQ初探(基于SpringBoot)

justkoding
 justkoding
发布于 2016/07/08 16:31
字数 1200
阅读 1046
收藏 5

0 说明

最近接触RabbitMQ,需要与Spring进行集成,于是对其做了一些了解,并做了很简单的演示。 对RabbitMQ和Spring Boot的介绍不属于本文内容,如有兴趣,请自行查阅资料。相应演示代码托管在码云,演示所用到的工具有

  • JDK1.8
  • Gradle2.11
  • Spring Boot 1.3.5
  • RabbitMQ 3.3.5

因为Spring对AMQP模块所提供的默认实现是基于RabbitMQ的,因此,只需要在依赖中引入org.springframework.boot:spring-boot-starter-amqp

1 使用Queue发送消息

根据RabbitMQ的说明,实际上发现消息的组件是exchange而不是queue,本节标题所描述使用queue发送消息,实际对应Introduction Demo,仅仅声明Queue就可以,因为发送消息的时候,RabbitMQ使用的是内置默认exchange。

本演示并没有采用String作为消息类型,而是构造了一个简单的消息结构

**注意:消息实体一定要实现Serializable**接口

FixMessage

public class FixMessage implements Serializable {
    private long messageId;
    private byte[] data;

    public FixMessage() {
        messageId = new Random().nextInt(100000000);
        data = UUID.randomUUID().toString().getBytes();
    }

    @Override
    public String toString() {
        return "FixMessage is: {\"id\": '" + messageId + "'
            , \"data\": '" + new String(data) + "'";
    }
}

由于使用了SpringBoot,因此,大部分使用Spring时的基础工作可以忽略,为了模拟消息发送场景,采用定时任务进行消息发送

Application.java

@SpringBootApplication
@EnableScheduling
public class Application {
    @Value("${default.queue}")
    public String QUEUE_NAME ;

    @Autowired
    AmqpTemplate mAmqpTemplate;

    @Bean
    Queue queue(){
        return new Queue(QUEUE_NAME, true);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Scheduled(fixedRate = 1000)
    public void sender(){
        FixMessage message = new FixMessage();
        System.out.println("Sending message..." + message);
        mAmqpTemplate.convertAndSend(queue().getName(),message);
    }
}

sender()方法每隔一秒发送一条消息,发送消息使用的是AmqpTemplate,它的主要职责就是用于消息的接收和发送,当然,因为默认实现是RabbitMQ,所以实际是使用RabbitTemplate完成消息发送工作。至于@Value所指定的队列名称,写在配置文件中就可以了。

运行gradle bootRun,可以看到应用会定时发送消息。如果RabbitMQ服务启用了rabbitmq_management插件的话,访问默认管理界面http://localhost:15672,使用guest登录,可以看到对应的queue中已经有积压未处理的消息了。

2 消息处理

用于处理消息的组件被称为consumer,我们知道consumer其实是监听器,当发现监听对象(queue)中有消息的时候,就会进行处理。所以,在构造consumer监听器实例的时候,需要提供消息处理器。

消息处理器

public void handleMessage(FixMessage message) {
    System.out.println("Received >>> " + message + " by default " +
            "message handler");
}

public void receiveMessage(FixMessage message) {
    System.out.println("Received >>> " + message + " by non-default " +
            "message handler");
}

注意:因为消息监听器提供消息处理器的时候,需要指定处理器对应的类,以及特定消息对应的处理方法,但是如果处理消息的方法签名为handleMessage的时候,可以不必指定

2.1 编码实现监听器

Application.java

@SpringBootApplication
@EnableScheduling
public class Application {
    @Value("${default.queue}")
    public String QUEUE_NAME ;

    @Autowired
    AmqpTemplate mAmqpTemplate;

    @Autowired
    AnnotationConfigApplicationContext context;

    @Bean
    Queue queue(){
        return new Queue(QUEUE_NAME, true);
    }

    @Bean
    SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory((ConnectionFactory) context.getBean("rabbitConnectionFactory"));
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(
            new MessageListenerAdapter(new FixMessageHandler())
        );
        return container;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Scheduled(fixedRate = 1000)
    public void sender(){
        FixMessage message = new FixMessage();
        System.out.println("Sending message..." + message);
        mAmqpTemplate.convertAndSend(queue().getName(),message);
    }
}

可以看到,消息监听器需要ConnectionFactory(这里用的是默认实现——RabbitConnectionFactory),需要知道监听的queue名称,以及消息处理器

根据上面的描述,当指定消息处理器,而不指定消息处理方法名称时,会采用handleMessage处理接收到的消息,否则,需要指定处理方法名称,例如

...
container.setMessageListener(
    new MessageListenerAdapter(new FixMessageHandler(), "receiveMessage")
);
...

运行gradle bootRun,可以看出这两种方式的差异。但其实这两种方式本质是相同的,而且,消息处理器只是普通的Java类,只是通过MessageListenerAdapter包装成为MessageListener;还可以通过继承org.springframework.amqp.core.MessageListener,实现onMessage方法,完成消息处理工作,比如

另外的消息处理器

public class FixMessageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
        System.out.println("Received Message [X] : "
                + message.toString() + " by MessageListener");
    }
}

相应地,消息监听器中设置消息处理器也要进行调整

...
container.setMessageListener(
    new FixMessageListener()
);
...

运行gradle bootRun,可以发现一些差异

2.2 使用注解@RabbitListener

Spring提供了注解@RabbitListener,可将一个普通Java类标记为消息监听器,并指定消息处理器

@Component
public class AnnotatedFixMessageListener {
    @RabbitListener(queues = "${default.queue}")
    public void messageHandler(FixMessage message){
        System.out.println("Receive Message : " + message.toString()
                + " by Annotated Message Listener");
    }
}

@RabbitListener可以修饰Class,表示被修饰的类为消息监听器类,这时候可以使用@RabbitlHandler修饰消息处理方法。同一个消息处理器类中,可以修饰多个消息处理方法,消息处理器会根据监听到的不同消息类型,调用相应的消息处理方法


接下来,会演示如何动态创建queue并启用监听器。

© 著作权归作者所有

共有 人打赏支持
上一篇: Stream API
下一篇: Spring起步
justkoding
粉丝 2
博文 4
码字总数 9824
作品 0
闵行
高级程序员
私信 提问
SpringBoot笔记(十一)RabbitMQ

安装Erlang RabbitMQ基于Erlang,所以得先安装Erlang http://www.erlang.org/downloads 根据自己的系统选择下载,安装完了,配置一下path即可 windows默认安装路径: 验证: 有时候可能需要重...

世外大帝
04/25
0
0
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
04/19
0
0
Spring Boot配置多套RabbitMQ

背景介绍: 为什么要单独来讲解SpringBoot如何配置RabbitMQ? 因为在项目中,有可能会用到多套RabbitMQ,如果只用一套那则用SpringBoot自带的配置就可以了,但多套则不行,需要自行配置。 说...

woter
07/10
0
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
07/25
0
0
SpringBootBucket 1.0.0 发布,SprintBoot 全家桶

Spring Boot 现在已经成为Java 开发领域的一颗璀璨明珠,它本身是包容万象的,可以跟各种技术集成。 本项目对目前Web开发中常用的各个技术,通过和SpringBoot的集成,并且对各种技术通过“一...

一刀
03/05
7.3K
17

没有更多内容

加载失败,请刷新页面

加载更多

为什么要学习Python?这10个理由足够了!

摘要: 看完这十个理由,我决定买本python从入门到精通! 如果你定期关注现今的科技发展,那么你可能想知道我为什么要写这篇文章告诉人们学习Python?因为几年前我提倡Java而不是Python。 在...

阿里云官方博客
17分钟前
1
0
spring服务方式配置okhttp3

问题 如果把OKhttp以Spring服务方式配置,就解决了从配置中心运行时刷新配置参数的问题。 OkHttpConfig.java package com.zyl.config;import okhttp3.OkHttpClient;import org.springfra...

亚林瓜子
17分钟前
2
0
8张图让你一步步看清 async/await 和 promise 的执行顺序

**摘要:**面试必问 原文:8张图帮你一步步看清 async/await 和 promise 的执行顺序 作者:ziwei3749 Fundebug经授权转载,版权归原作者所有。 为什么写这篇文章? 说实话,关于js的异步执行顺...

Fundebug
18分钟前
1
0
Linux 命令菜单

#!/bin/bash #menu.sh menu(){ source ~/.bashrc echo "=================================" echo "Please enter your choise:" echo "(0) Kill all java" echo "(1) Start all tomcat" echo......

mellen
24分钟前
3
0
原来云数据库也是有思想的...

本文由一刻talks发表 邵宗文,腾讯云数据库专家副总监。十余年数据库从业经验,2009年加入腾讯,曾负责腾讯网,新闻客户端,快报,视频,财经,体育等数据库平台部署、规划及运维支持工作。本...

腾讯云加社区
24分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部