文档章节

RabbitMQ(6)-Spring AMQP,Spring集成RabbitMQ

你我他有个梦
 你我他有个梦
发布于 2015/12/20 15:35
字数 944
阅读 775
收藏 15
点赞 0
评论 3

一.Qucik Start

1.rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang128" username="admin"
                               password="admin" port="5672"  />

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--direct: exchange queue binging key 绑定 -->
    <!--<rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange1">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列-->
    <bean id="jsonMessageConverter"  class="com.rayootech.rabbitmq.demo.springamqp.FastJsonMessageConverter"></bean>
    <!-- spring template声明-->
    <!--direct-->
    <!--<rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    <!--topic-->
    <rabbit:template exchange="myexchange1" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter" routing-key="foo.bar"/>
    <!--fanout-->
   <!-- <rabbit:template exchange="myexchange2" id="amqpTemplate"  connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>-->
    

</beans>

2.rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">


    <!-- 连接服务配置  -->
    <rabbit:connection-factory id="connectionFactory" host="liuzhaoqiang129" username="admin"
                               password="admin" port="5672"  />
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- queue 队列声明-->
    <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
    <!--fanout: exchange queue binging key 绑定 -->
    <!--<rabbit:fanout-exchange name="myexchange2"   durable="true" auto-delete="false" id="myexchange2">-->
        <!--<rabbit:bindings>-->
            <!--<rabbit:binding queue="queue_one"></rabbit:binding>-->
        <!--</rabbit:bindings>-->
    <!--</rabbit:fanout-exchange>-->
    <!--topic: exchange queue binging key 绑定 -->
    <rabbit:topic-exchange name="myexchange1"  durable="true" auto-delete="false" id="myexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" pattern="foo.*"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--direct: exchange queue binging key 绑定 -->
   <!-- <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="queue_one" key="queue_one_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>-->
    <bean id="consumerMessage" class="com.rayootech.rabbitmq.demo.springamqp.ConsumeMessage"></bean>
    <!-- 配置线程池 -->
    <bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
        <!-- 线程池维护线程的最少数量 -->
        <property name ="corePoolSize" value ="5" />
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name ="keepAliveSeconds" value ="30000" />
        <!-- 线程池维护线程的最大数量 -->
        <property name ="maxPoolSize" value ="1000" />
        <!-- 线程池所使用的缓冲队列 -->
        <property name ="queueCapacity" value ="200" />
    </bean>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
        <rabbit:listener queues="queue_one" ref="consumerMessage"/>
    </rabbit:listener-container>
</beans>

3.监听器(接收器):

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.UnsupportedEncodingException;

public class ConsumeMessage implements MessageListener {

    @Override
    public void onMessage(Message message) {

        try {
            //使用fastJson将数据对象转换为json数据
            String receiveMsg =new String(message.getBody(),"utf-8");
            System.out.println("Receiver msg:" + receiveMsg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

4.Producer:

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Producer {
    public static void main(String[] args) throws InterruptedException {

        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-producer.xml");
        AmqpTemplate template = context.getBean(AmqpTemplate.class);
        //direct模式:接收routing-key=queue_one_key的消息
        //template.convertAndSend("queue_one_key", "hello!");
        //topic模式:以foo.* routing-key为模版接收消息
        template.convertAndSend("foo.bar", "hello!");
        //fanout模式:在集群范围内的所有consumer都会收到消息
        //template.convertAndSend("hello!");
        System.out.println("send message:hello world!");
    }
}

5.consumer:

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;

public class Cosumer {
    public static void main(String[] args) {
        ApplicationContext context =
                new GenericXmlApplicationContext("rabbitmq-consumer.xml");
    }
}

6.json数据转换器:

import com.alibaba.fastjson.JSON;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;


import java.io.UnsupportedEncodingException;

public class FastJsonMessageConverter  extends AbstractMessageConverter {
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;

    public FastJsonMessageConverter() {
        super();
        //init();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    public Object fromMessage(Message message)
            throws MessageConversionException {
        return null;
    }

    public <T> T fromMessage(Message message,T t) {
        String json = "";
        try {
            json = new String(message.getBody(),defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        return (T) JSON.parseObject(json, t.getClass());
    }


    protected Message createMessage(Object objectToConvert,
                                    MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSON.toJSONString(objectToConvert);
            //FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}


© 著作权归作者所有

共有 人打赏支持
你我他有个梦

你我他有个梦

粉丝 92
博文 110
码字总数 98858
作品 0
昌平
程序员
加载中

评论(3)

J
JM衣领

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了

引用来自“lzhaoqiang”的评论

在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
给您看下我监听器的配置,例子是借鉴spring官方的:




消费
public class Foo{
  public void handleRequestMessage(String msg) {
    System.out.println("收到消息内容"+msg);
  }
}
生产者:
@RequestMapping("/index")
  public String index(){
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String arg2) {
        if(ack){
   System.out.println("消息确认成功");
   }else{
   System.out.println("消息确认失败");
   }
你我他有个梦
你我他有个梦

引用来自“JM衣领”的评论

消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
在监听容器当中有一个acknowledge属性,配置为manual 即为手动确认
J
JM衣领
消费如何手动ack回调确认呢? 原生编码可以实现消费者处理完消息在调用ack回调确认告诉生产者,spring集成下 ,请教下应该如何实现ack呢? 谢谢了
Spring Boot+RabbitMQ学习笔记

RabbitMQ RabbitMQ 是一个由 Erlang 语言开发的 的开源实现。最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 消息队列 先从基础开始,加...

听_风 ⋅ 06/07 ⋅ 0

【RabbitMQ】6、rabbitmq生产者的消息确认

通过Publisher Confirms and Returns机制,生产者可以判断消息是否发送到了exchange及queue,而通过消费者确认机制,Rabbitmq可以决定是否重发消息给消费者,以保证消息被处理。 1.什么是Pub...

xiaomin0322 ⋅ 05/13 ⋅ 0

阿里云Kubernetes SpringCloud 实践进行时(2): 分布式配置管理

简介 为了更好地支撑日益增长的庞大业务量,我们常常需要把服务进行整合、拆分,使我们的服务不仅能通过集群部署抵挡流量的冲击,又能根据业务在其上进行灵活的扩展。随着分布式的普及、服务...

osswangxining ⋅ 05/25 ⋅ 0

Spring Boot与RabbitMQ结合实现延迟队列的示例

背景 何为延迟队列? 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 场景一:在订单系统中,一个用户下单之后通常有...

xiaomin0322 ⋅ 05/11 ⋅ 0

通过实例理解 RabbitMQ 的基本概念

先说下自己开发的实例。 最近在使用 Spring Cloud Config 做分布式配置中心(基于 SVN/Git),当所有服务启动后,SVN/Git 中的配置文件更改后,客户端服务读取的还是旧的配置,并不能实时读取...

xumaojun ⋅ 05/06 ⋅ 0

RabbitMQ与消息队列模式

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法; RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言)编写的,可复用的企业级消息系统; AMQP(高级消...

Java技术汇 ⋅ 05/15 ⋅ 0

RabbitMQ学习以及与Spring的集成(三)

本文介绍RabbitMQ与Spring的简单集成以及消息的发送和接收。 在RabbitMQ的Spring配置文件中,首先需要增加命名空间。 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 其次是模...

onedotdot ⋅ 06/18 ⋅ 0

【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream

一、前言 在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。 在这篇博客中,小白向大家介绍一个消息事件驱动框架...

kisscatforever ⋅ 04/26 ⋅ 0

ActiveMQ RabbitMQ KafKa对比

前言: ActiveMQ和 RabbitMq 以及Kafka在之前的项目中都有陆续使用过,当然对于三者没有进行过具体的对比,以下摘抄了一些网上关于这三者的对比情况,我自己看过之后感觉还 是可以的,比较清...

xiaomin0322 ⋅ 05/11 ⋅ 0

介绍Spring Cloud Stream与RabbitMQ集成

Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。在本文中,我们将通过一些简单的例子来介绍Spring Cloud Stream的概念...

RaiseHead ⋅ 05/13 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

知乎Java数据结构

作者:匿名用户 链接:https://www.zhihu.com/question/35947829/answer/66113038 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 感觉知乎上嘲讽题主简...

颖伙虫 ⋅ 今天 ⋅ 0

Confluence 6 恢复一个站点有关使用站点导出为备份的说明

推荐使用生产备份策略。我们推荐你针对你的生产环境中使用的 Confluence 参考 Production Backup Strategy 页面中的内容进行备份和恢复(这个需要你备份你的数据库和 home 目录)。XML 导出备...

honeymose ⋅ 今天 ⋅ 0

JavaScript零基础入门——(九)JavaScript的函数

JavaScript零基础入门——(九)JavaScript的函数 欢迎回到我们的JavaScript零基础入门,上一节课我们了解了有关JS中数组的相关知识点,不知道大家有没有自己去敲一敲,消化一下?这一节课,...

JandenMa ⋅ 今天 ⋅ 0

火狐浏览器各版本下载及插件httprequest

各版本下载地址:http://ftp.mozilla.org/pub/mozilla.org//firefox/releases/ httprequest插件截至57版本可用

xiaoge2016 ⋅ 今天 ⋅ 0

Docker系列教程28-实战:使用Docker Compose运行ELK

原文:http://www.itmuch.com/docker/28-docker-compose-in-action-elk/,转载请说明出处。 ElasticSearch【存储】 Logtash【日志聚合器】 Kibana【界面】 答案: version: '2'services: ...

周立_ITMuch ⋅ 今天 ⋅ 0

使用快嘉sdkg极速搭建接口模拟系统

在具体项目研发过程中,一旦前后端双方约定好接口,前端和app同事就会希望后台同事可以尽快提供可供对接的接口方便调试,而对后台同事来说定好接口还仅是个开始、设计流程,实现业务逻辑,编...

fastjrun ⋅ 今天 ⋅ 0

PXE/KickStart 无人值守安装

导言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装。 常规的办法有什么? 光盘安装系统 ===> 一...

kangvcar ⋅ 昨天 ⋅ 0

使用Puppeteer撸一个爬虫

Puppeteer是什么 puppeteer是谷歌chrome团队官方开发的一个无界面(Headless)chrome工具。Chrome Headless将成为web应用自动化测试的行业标杆。所以我们很有必要来了解一下它。所谓的无头浏...

小草先森 ⋅ 昨天 ⋅ 0

Java Done Right

* 表示难度较大或理论性较强。 ** 表示难度更大或理论性更强。 【Java语言本身】 基础语法,面向对象,顺序编程,并发编程,网络编程,泛型,注解,lambda(Java8),module(Java9),var(...

风华神使 ⋅ 昨天 ⋅ 0

Linux系统日志

linux 系统日志 /var/log/messages /etc/logrotate.conf 日志切割配置文件 https://my.oschina.net/u/2000675/blog/908189 logrotate 使用详解 dmesg 命令 /var/log/dmesg 日志 last命令,调......

Linux学习笔记 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部