文档章节

RabbitMQ(6)-Spring AMQP,Spring集成RabbitMQ

你我他有个梦
 你我他有个梦
发布于 2015/12/20 15:35
字数 944
阅读 824
收藏 15

一.Qucik Start

1.rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang128" username="admin"
                               password="admin" port="5672"  />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--direct: exchange queue binging key 绑定 -->
    <!--<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange1">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列-->
    <bean id="jsonMessageConverter"  class="com.rayootech.rabbitmq.demo.springamqp.FastJsonMessageConverter"></bean>
    <!-- spring template声明-->
    <!--direct-->
    <!--<rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    <!--topic-->
    <rabbit:template exchange="myexchange1" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter" routing-key="foo.bar"/>
    <!--fanout-->
   <!-- <rabbit:template exchange="myexchange2" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    

</beans>

2.rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang129" username="admin"
                               password="admin" port="5672"  />
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange2">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--direct: exchange queue binging key 绑定 -->
   <!-- <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <bean id="consumerMessage" class="com.rayootech.rabbitmq.demo.springamqp.ConsumeMessage"></bean>
    <!-- 配置线程池 -->
    <bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <!-- 线程池维护线程的最少数量 -->
        <property name ="corePoolSize" value ="5" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name ="keepAliveSeconds" value ="30000" />
        <!-- 线程池维护线程的最大数量 -->
        <property name ="maxPoolSize" value ="1000" />
        <!-- 线程池所使用的缓冲队列 -->
        <property name ="queueCapacity" value ="200" />
    </bean>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="consumerMessage"/>
    </rabbit:listener-container>
</beans>

3.监听器(接收器):

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.UnsupportedEncodingException;

public class ConsumeMessage implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            //使用fastJson将数据对象转换为json数据
            String receiveMsg =new String(message.getBody(),"utf-8");
            System.out.println("Receiver msg:" + receiveMsg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

4.Producer:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Producer {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-producer.xml");
        AmqpTemplate template = context.getBean(AmqpTemplate.class);
        //direct模式:接收routing-key=queue_one_key的消息
        //template.convertAndSend("queue_one_key", "hello!");
        //topic模式:以foo.* routing-key为模版接收消息
        template.convertAndSend("foo.bar", "hello!");
        //fanout模式:在集群范围内的所有consumer都会收到消息
        //template.convertAndSend("hello!");
        System.out.println("send message:hello world!");
    }
}

5.consumer:

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Cosumer {
    public static void main(String[] args) {
        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-consumer.xml");
    }
}

6.json数据转换器:

import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;


import java.io.UnsupportedEncodingException;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;

    public FastJsonMessageConverter() {
        super();
        //init();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }

    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return (T) JSON.parseObject(json, t.getClass());
    }


    protected Message createMessage(Object objectToConvert,
                                    MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(objectToConvert);
            //FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}


© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 95
博文 110
码字总数 98858
作品 0
昌平
程序员
私信 提问
加载中

评论(3)

J
JM衣领

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了

引用来自“lzhaoqiang”的评论

在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
给您看下我监听器的配置,例子是借鉴spring官方的:




消费
public class Foo{
  public void handleRequestMessage(String msg) {
    System.out.println("收到消息内容"+msg);
  }
}
生产者:
@RequestMapping("/index")
  public String index(){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String arg2) {
        if(ack){
   System.out.println("消息确认成功");
   }else{
   System.out.println("消息确认失败");
   }
你我他有个梦
你我他有个梦

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
J
JM衣领
消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
RabbitMQ 实战教程 文集

RabbitMQ 实战教程 文集 此系列博客经梁总(梁桂钊的博客)授权录入本站点,下面推荐下梁总的技术公众号【服务端的思维】,公众号不定时更新技术文章,干货满满!! RabbitMQ 实战教程(一) He...

chenssy
10/06
0
0
RabbitMQ 入门 与 RabbitMQ 在 Spring Boot 中的使用

Message Broker与AMQP简介 Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 执行消息的聚集...

Anur
09/13
0
0
OpenCredo宣布为Spring Integration增加AMQP支持

OpenCredo是一家咨询公司,由Jonas Partner和Russ Miles创建,近日发布了一个开源的适配器——可以让Spring Integration应用与AMQP端点协同工作。 Advanced Message Queuing Protocol(AMQP)...

红薯
2010/03/10
365
0
SpringBoot | 第十二章:RabbitMQ的集成和使用

前言 上节讲了缓存数据库的使用,在实际工作中,一般上在系统或者应用间通信或者进行异步通知(登录后发送短信或者邮件等)时,都会使用消息队列进行解决此业务场景的解耦问题。这章节讲解下消...

oKong
07/25
0
0
消息队列1:RabbitMQ解析并基于Springboot实战

目录 RabbitMQ简介 RabitMQ 概念模型 Exchange 类型 代码实战 RabbitMQ简介 AMQP:Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此...

养码青年
09/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

jquery通过id显示隐藏

var $div3 = $('#div3'); 显示 $div3.show(); 隐藏 $div3.hide();

yan_liu
今天
3
0
《乱世佳人》读书笔记及相关感悟3900字

《乱世佳人》读书笔记及相关感悟3900字: 之前一直听「荔枝」,后来不知怎的转向了「喜马拉雅」,一听就是三年。上班的时候听房产,买房了以后听装修,兴之所至时听旅行,分手后听亲密关系,...

原创小博客
今天
3
0
大数据教程(9.6)map端join实现

上一篇文章讲了mapreduce配合实现join,本节博主将讲述在map端的join实现; 一、需求 实现两个“表”的join操作,其中一个表数据量小,一个表很大,这种场景在实际中非常常见,比如“订单日志...

em_aaron
今天
3
0
cookie与session详解

session与cookie是什么? session与cookie属于一种会话控制技术.常用在身份识别,登录验证,数据传输等.举个例子,就像我们去超市买东西结账的时候,我们要拿出我们的会员卡才会获取优惠.这时...

士兵7
今天
3
0
十万个为什么之为什么大家都说dubbo

Dubbo是什么? 使用背景 dubbo为什么这么流行, 为什么大家都这么喜欢用dubbo; 通过了解分布式开发了解到, 为适应访问量暴增,业务拆分后, 子应用部署在多台服务器上,而多台服务器通过可以通过d...

尾生
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部