文档章节

springboot 集成rabbitmq 并采用ack模式 以及封装队列定义

码农小胖哥
 码农小胖哥
发布于 2018/02/27 11:38
字数 2359
阅读 9857
收藏 50
  • rabbitmq简介

rabbitmq 是spring所在公司Pivotal自己的产品  是基于AMQP高级队列协议的消息中间件 采用erlang开发   因此安装需要erlang环境  具体安装根据自己的环境  因为跟spring有共同的血缘关系 所以spring 全家桶对其的支持应该是相当完善的

  •  简单概念

一般消息队列 都是生产者将消息发送到队列   消费者监听队列进行消费     rabbitmq 一个虚拟主机(默认 /)持有一个或者多个交换机(Exchange) 用户只能在虚拟主机的粒度进行权限控制  交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上 这样生产者和队列就没有直接联系 而是将消息发送的交换机  交换机再把消息转发到对应绑定的队列上   此处需要详细熟悉rabbitmq的工作流程  不清楚可以找相关资料进行学习   

上面说了 Exchange 作为rabbitmq的一个独特的重要的概念  这里有必要着重强调一下  我们从 spring对rabbitmq的封装来解读一下这个东西 

package org.springframework.amqp.core;

/**
 * Constants for the standard Exchange type names.
 *
 * @author Mark Fisher
 * @author Gary Russell
 */
public abstract class ExchangeTypes {

	public static final String DIRECT = "direct";

	public static final String TOPIC = "topic";

	public static final String FANOUT = "fanout";

	public static final String HEADERS = "headers";

	public static final String SYSTEM = "system";

	/**
	 * The constant to represent {@code x-delayed-message} exchange mode.
	 * @deprecated since 1.6.4, it's not a user-available exchange type,
	 * the delayed {@code boolean} is used for that.
	 */
	@Deprecated
	public static final String DELAYED = "x-delayed-message";
}

  上面是 交换机类型的定义类    说明了6种交换机类型  最后一种因为即将弃用 所以是五种 我们常用的有四种  下面这个建造类说明了一切

package org.springframework.amqp.core;

import java.util.Map;

/**
 * Builder providing a fluent API for building {@link Exchange}s.
 *
 * @author Gary Russell
 * @since 1.6
 *
 */
public final class ExchangeBuilder extends AbstractBuilder {

	private final String name;

	private final String type;

	private boolean durable;

	private boolean autoDelete;

	private boolean internal;

	private boolean delayed;

	/**
	 * Construct an instance of the appropriate type.
	 * @param name the exchange name
	 * @param type the type name
	 * @see ExchangeTypes
	 * @since 1.6.7
	 */
	public ExchangeBuilder(String name, String type) {
		this.name = name;
		this.type = type;
	}

	/**
	 * Return a {@link DirectExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder directExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.DIRECT);
	}

	/**
	 * Return a {@link TopicExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder topicExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.TOPIC);
	}

	/**
	 * Return a {@link FanoutExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder fanoutExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.FANOUT);
	}

	/**
	 * Return a {@link HeadersExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder headersExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.HEADERS);
	}

	/**
	 * Set the auto delete flag.
	 * @return the builder.
	 */
	public ExchangeBuilder autoDelete() {
		this.autoDelete = true;
		return this;
	}

	/**
	 * Set the durable flag to true.
	 * @return the builder.
	 * @deprecated - in 2.0, durable will be true by default
	 * @see #durable(boolean)
	 */
	@Deprecated
	public ExchangeBuilder durable() {
		this.durable = true;
		return this;
	}

	/**
	 * Set the durable flag.
	 * @param durable the durable flag (default false).
	 * @return the builder.
	 * @see #durable
	 */
	public ExchangeBuilder durable(boolean durable) {
		this.durable = durable;
		return this;
	}

	/**
	 * Add an argument.
	 * @param key the argument key.
	 * @param value the argument value.
	 * @return the builder.
	 */
	public ExchangeBuilder withArgument(String key, Object value) {
		getOrCreateArguments().put(key, value);
		return this;
	}

	/**
	 * Add the arguments.
	 * @param arguments the arguments map.
	 * @return the builder.
	 */
	public ExchangeBuilder withArguments(Map<String, Object> arguments) {
		this.getOrCreateArguments().putAll(arguments);
		return this;
	}

	/**
	 * Set the internal flag.
	 * @return the builder.
	 */
	public ExchangeBuilder internal() {
		this.internal = true;
		return this;
	}

	/**
	 * Set the delayed flag.
	 * @return the builder.
	 */
	public ExchangeBuilder delayed() {
		this.delayed = true;
		return this;
	}

	public Exchange build() {
		AbstractExchange exchange;
		if (ExchangeTypes.DIRECT.equals(this.type)) {
			exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.TOPIC.equals(this.type)) {
			exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.FANOUT.equals(this.type)) {
			exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.HEADERS.equals(this.type)) {
			exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else {
			throw new IllegalStateException("Invalid type: " + this.type);
		}
		exchange.setInternal(this.internal);
		exchange.setDelayed(this.delayed);
		return exchange;
	}

}

 

这四种的说明

  1. Direct: 先策略匹配到对应绑定的队列后 才会被投送到该队列  交换机跟队列必须是精确的对应关系 这种最为简单
  2. Topic: 转发消息主要是根据通配符 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息 这种可以认为是Direct 的灵活版  
  3. Headers:也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers 则是一个自定义匹配规则的类型
    在队列与交换器绑定时 会设定一组键值对规则 消息中也包括一组键值对( headers 属性) 当这些键值对有一对 或全部匹配时 消息被投送到对应队列
  4. Fanout : 消息广播模式 不管路由键或者是路由模式 会把消息发给绑定给它的全部队列  如果配置了routingkey会被忽略

 

 

 

  • springboot集成rabbitmq       

在熟悉了相关概念后我们开始搞一搞这个东西   首先你要安装好rabbitmq  相关方法资料很多 此处不表    在本机安装好 并启用了管理页面后打开  localhost:15672 会显示一个管理页面   如下  可以进行一些可视化操作

新建springboot工程  springboot 版本 1.5.10  依赖如下

	<dependencies>
		<!--amqp rabbitmq 依赖必须 必须-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--springboot单元测试 选-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!--springboot健康监控 选-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<!--web支持  选-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>

application.yml 配置文件 rabbitmq 相关:

spring:
  rabbitmq:
    username: rabbitAdmin
    password: 123456789
#    支持发布确认
    publisher-confirms: true
#    支持发布返回
    publisher-returns: true
    listener:
      simple:
#      采用手动应答
        acknowledge-mode: manual
#        当前监听容器数
        concurrency: 1
#        最大数
        max-concurrency: 1
#        是否支持重试
        retry:
          enabled: true
#        日志配置 
logging:
  config: classpath:logback.xml

 

定制模版类   声明交换机  队列     绑定交换机到队列

这里 声明了Direct 交换机  并通过路由键绑定到一个队列中        来测试Direct模式

        声明了Fanout交换机  并绑定到2个队列    来测试广播模式

package cn.felord.message.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 队列配置.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 14:28
 */
@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 定制化amqp模版      可根据需要定制多个
     * 
     * 
     * 此处为模版类定义 Jackson消息转换器
     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调   即消息发送到exchange  ack
     * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
     *
     * @return the amqp template
     */
//    @Primary
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
//          使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
//        开启returncallback     yml 需要 配置    publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        //        消息确认  yml 需要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }

    /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */

    /**
     * 声明Direct交换机 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个队列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 通过绑定键 将指定队列绑定到一个指定的交换机 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }

    /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */

    /**
     * 声明 fanout 交换机.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * Fanout queue A.
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * Fanout queue B .
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 绑定队列A 到Fanout 交换机.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 绑定队列B 到Fanout 交换机.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

 

编写监听器 来监听队列消息 

package cn.felord.message.comsumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听器.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/24 9:36
 */
@Component
public class Receiver {
    private static final Logger log= LoggerFactory.getLogger(Receiver.class);
    /**
     * FANOUT广播队列监听一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A "+new String(message.getBody()));
    }

    /**
     * FANOUT广播队列监听二.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception   这里异常需要处理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B "+new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT "+new String (message.getBody()));
    }
}

 

编写 发送消息接口 来进行测试

package cn.felord.message.controller;

import cn.felord.message.bean.ResponseEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 消息接口.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 17:27
 */
@RestController
@RequestMapping("/rabbit")
public class SendController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     *  测试广播模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/fanout")
    public ResponseEntity send(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        return ResponseEntity.ok();
    }

    /**
     *  测试Direct模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/direct")
    public ResponseEntity direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
        return ResponseEntity.ok();
    }
}

测试广播模式

控制台输出

同样 自己可以测试Direct模式     可以打开rabbitmq控制台进行追踪 相关运行信息

 

配套源码 :https://gitee.com/felord/springboot-message

下一篇会在 springboot中实现 rabbitmq死信队列

 

            

© 著作权归作者所有

码农小胖哥

码农小胖哥

粉丝 37
博文 61
码字总数 58326
作品 1
郑州
程序员
私信 提问
加载中

评论(14)

孙一一
孙一一

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了
请问你这个问题解决了吗?我现在也碰见这个问题了
a
a151605

引用来自“a151605”的评论

我把sender实现了RabbitTemplate.ConfirmCallback ,我自己写了cacheConnectionFactory,把配置都设置进去回调就不执行了。去掉我自己写的连接工厂就可以回调。好气
😓少配置了setMandatory
a
a151605
我把sender实现了RabbitTemplate.ConfirmCallback ,我自己写了cacheConnectionFactory,把配置都设置进去回调就不执行了。去掉我自己写的连接工厂就可以回调。好气
J
Java_微尘
setReturnCallback方法一直都没有触发,我将exchang与queue bing值特意写错,都不触发。
后台显示,发送到exchange,没有发送到queue,为什么不触发呢?
老衲L

引用来自“NotFound403”的评论

引用来自“老衲L”的评论

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了

引用来自“NotFound403”的评论

有一种可能 如果你用了 新建的账户 你的 vhost 可能因为授权问题出现问题
vhost是这样配置的 spring.rabbitmq.virtual-host=/

@老衲L 去管理控制台检查一下吧
好的 感谢
码农小胖哥
码农小胖哥 博主

引用来自“老衲L”的评论

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了

引用来自“NotFound403”的评论

有一种可能 如果你用了 新建的账户 你的 vhost 可能因为授权问题出现问题
vhost是这样配置的 spring.rabbitmq.virtual-host=/

@老衲L 去管理控制台检查一下吧
老衲L

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了

引用来自“NotFound403”的评论

有一种可能 如果你用了 新建的账户 你的 vhost 可能因为授权问题出现问题
vhost是这样配置的 spring.rabbitmq.virtual-host=/
老衲L

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了

引用来自“NotFound403”的评论

有没有完整的日志错误信息
没有了 上面的就是所有的错误信息
码农小胖哥
码农小胖哥 博主

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了
有一种可能 如果你用了 新建的账户 你的 vhost 可能因为授权问题出现问题
码农小胖哥
码农小胖哥 博主

引用来自“老衲L”的评论

请问下为什么在我的回调方法中 有时会报错 :clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0 但是消息还是发送成功了
有没有完整的日志错误信息
SpringBoot集成RabbitMQ

官方说明:http://www.rabbitmq.com/getstarted.html 什么是MQ? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往...

xuanm
2018/08/10
0
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
2018/07/25
808
0
springboot rabbitmq 之死信队列(延迟消费消息)

之前探讨了springboot 集成 rabbitmq 以及开启ack模式 传送门:https://my.oschina.net/u/2948566/blog/1624963 接着该篇 搞一下 死信队列 概念 死信队列 听上去像 消息“死”了 其实也有点这...

NotFound403
2018/02/28
8.3K
2
springboot之RabbitMQ详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,消息中间件最主要的作用是解耦,中间件最...

无语年华
2018/05/30
346
0
springboot rabbitmq整合

转载:https://www.cnblogs.com/xmzJava/p/8036591.html 这一篇我们来把消息中间件整合到springboot中 ===================================================================== 首先在服务器......

一支支穿云箭
2018/09/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Java描述设计模式(11):观察者模式

本文源码:GitHub·点这里 || GitEE·点这里 一、观察者模式 1、概念描述 观察者模式是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式。观察者模式定义了一种一对多的依赖关系,让多...

知了一笑
47分钟前
6
0
Qt 之 模态、非模态、半模态窗口的介绍及 实现QDialog的exec()方法

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/GoForwardToStep/article/details/53667566 一、简述 先简...

shzwork
50分钟前
4
0
OSChina 周一乱弹 —— 产品经理和程序员是夫妻?

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 小小编辑推荐:《Ocean Eyes》- Billie Eilish 《Ocean Eyes》- Billie Eilish 手机党少年们想听歌,请使劲儿戳(这里) @夏目Jane :风太大。...

小小编辑
今天
457
8
使用CSS自定义属性构建骨架屏

写在前面 几天前看到薄荷前端团队分享的《前端骨架屏方案小结》,突然回想起一年前看到的max bock写的《Building Skeleton Screens with CSS Custom Properties》,翻译整理写下出此文,分享...

前端老手
昨天
17
0
Docker常用命令小记

除了基本的<font color="blue">docker pull</font>、<font color="blue">docker image</font>、<font color="blue">docker ps</font>,还有一些命令及参数也很重要,在此记录下来避免遗忘。 ......

程序员欣宸
昨天
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部