文档章节

RabbitMQ(6)-Spring AMQP,Spring集成RabbitMQ

你我他有个梦
 你我他有个梦
发布于 2015/12/20 15:35
字数 944
阅读 798
收藏 15

一.Qucik Start

1.rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang128" username="admin"
                               password="admin" port="5672"  />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--direct: exchange queue binging key 绑定 -->
    <!--<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange1">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列-->
    <bean id="jsonMessageConverter"  class="com.rayootech.rabbitmq.demo.springamqp.FastJsonMessageConverter"></bean>
    <!-- spring template声明-->
    <!--direct-->
    <!--<rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    <!--topic-->
    <rabbit:template exchange="myexchange1" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter" routing-key="foo.bar"/>
    <!--fanout-->
   <!-- <rabbit:template exchange="myexchange2" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    

</beans>

2.rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang129" username="admin"
                               password="admin" port="5672"  />
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange2">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--direct: exchange queue binging key 绑定 -->
   <!-- <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <bean id="consumerMessage" class="com.rayootech.rabbitmq.demo.springamqp.ConsumeMessage"></bean>
    <!-- 配置线程池 -->
    <bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <!-- 线程池维护线程的最少数量 -->
        <property name ="corePoolSize" value ="5" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name ="keepAliveSeconds" value ="30000" />
        <!-- 线程池维护线程的最大数量 -->
        <property name ="maxPoolSize" value ="1000" />
        <!-- 线程池所使用的缓冲队列 -->
        <property name ="queueCapacity" value ="200" />
    </bean>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="consumerMessage"/>
    </rabbit:listener-container>
</beans>

3.监听器(接收器):

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.UnsupportedEncodingException;

public class ConsumeMessage implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            //使用fastJson将数据对象转换为json数据
            String receiveMsg =new String(message.getBody(),"utf-8");
            System.out.println("Receiver msg:" + receiveMsg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

4.Producer:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Producer {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-producer.xml");
        AmqpTemplate template = context.getBean(AmqpTemplate.class);
        //direct模式:接收routing-key=queue_one_key的消息
        //template.convertAndSend("queue_one_key", "hello!");
        //topic模式:以foo.* routing-key为模版接收消息
        template.convertAndSend("foo.bar", "hello!");
        //fanout模式:在集群范围内的所有consumer都会收到消息
        //template.convertAndSend("hello!");
        System.out.println("send message:hello world!");
    }
}

5.consumer:

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Cosumer {
    public static void main(String[] args) {
        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-consumer.xml");
    }
}

6.json数据转换器:

import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;


import java.io.UnsupportedEncodingException;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;

    public FastJsonMessageConverter() {
        super();
        //init();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }

    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return (T) JSON.parseObject(json, t.getClass());
    }


    protected Message createMessage(Object objectToConvert,
                                    MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(objectToConvert);
            //FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}


© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 95
博文 110
码字总数 98858
作品 0
昌平
程序员
加载中

评论(3)

J
JM衣领

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了

引用来自“lzhaoqiang”的评论

在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
给您看下我监听器的配置,例子是借鉴spring官方的:




消费
public class Foo{
  public void handleRequestMessage(String msg) {
    System.out.println("收到消息内容"+msg);
  }
}
生产者:
@RequestMapping("/index")
  public String index(){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String arg2) {
        if(ack){
   System.out.println("消息确认成功");
   }else{
   System.out.println("消息确认失败");
   }
你我他有个梦
你我他有个梦

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
J
JM衣领
消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
09/13
0
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
07/25
0
0
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风
06/07
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
09/07
0
0
【RabbitMQ】6、rabbitmq生产者的消息确认

通过Publisher Confirms and Returns机制,生产者可以判断消息是否发送到了exchange及queue,而通过消费者确认机制,Rabbitmq可以决定是否重发消息给消费者,以保证消息被处理。 1.什么是Pub...

xiaomin0322
05/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 鱼生不值得

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @瘟神灬念:分享新裤子的单曲《没有理想的人不伤心 (Remix版)》: 《没有理想的人不伤心 (Remix版)》- 新裤子 手机党少年们想听歌,请使劲儿戳...

小小编辑
20分钟前
4
3
arts-week10

Algorithm 905. Sort Array By Parity - LeetCode Review Who’s Afraid of the Big Bad Preloader? 一文读懂前端缓存 一个网络请求3个步骤:请求,处理,响应,而前端缓存主要在请求处响应这两步...

yysue
今天
4
0
00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
今天
5
1
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
6
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
180
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部