文档章节

spring-cloud stream + rabbitmq 小记

從此迷花粉
 從此迷花粉
发布于 2018/09/04 14:15
字数 673
阅读 14
收藏 1

此篇主要记录spring-cloud stream,对rabbitmq的安装使用不做累述。

  1. 创建stream-receiver作为消费者
    pom.xml:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud-stream</artifactId>
            <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>stream-receiver</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    
    </project>

    application.properties:
     

    spring.application.name=customer
    server.port=7889
    
    spring.rabbitmq.host=192.168.1.17
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    
    spring.cloud.stream.bindings.input.destination=sink-channel
    spring.cloud.stream.bindings.output.destination=sink-channel#不指定该输出通道无法接收消息

    Java 代码:
     

    /**********************************CustomerApp.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @author cherrishccl
     * @time 2018-08-31 14:53
     * @name CustomerApp
     * @desc:
     */
    @SpringBootApplication
    public class CustomerApp {
        private static Logger logger = LoggerFactory.getLogger(CustomerApp.class);
        public static void main(String[] args) {
            SpringApplication.run(CustomerApp.class, args);
        }
    
    }
    /**********************************SinkReceiver.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    /**
     * @author cherrish
     * @time 2018-08-31 14:39
     * @name SinkReceiver
     * @desc:
     */
    @EnableBinding(value = Sink.class)
    public class SinkReceiver {
        private static final AtomicInteger NUM = new AtomicInteger(0);
        private static Logger log = LoggerFactory.getLogger(SinkReceiver.class);
        @StreamListener(Sink.INPUT)
        public void receive(Message<String> payload) {
            log.info(NUM.getAndIncrement() + " Received : " + payload.getPayload());
        }
    }
    
    

     

  2. 创建stream-sender作为生产者
    pom.xml:
     

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud-stream</artifactId>
            <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>stream-sender</artifactId>
    
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    
    </project>

    application.properties:
     

    spring.application.name=producer
    server.port=7888
    
    spring.rabbitmq.host=192.168.1.195
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    
    spring.cloud.stream.bindings.input.destination=sink-channel
    spring.cloud.stream.bindings.output.destination=sink-channel


    Java代码:

    /**********************************ProducerApp.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    /**
     * @author cherrish
     * @time 2018-08-31 14:52
     * @name ProducerApp
     * @desc:
     */
    @EnableScheduling
    @SpringBootApplication
    public class ProducerApp {
        private static Logger logger = LoggerFactory.getLogger(ProducerApp.class);
        public static void main(String[] args) {
            SpringApplication.run(ProducerApp.class, args);
        }
    }
    /**********************************SinkSender.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.annotation.InboundChannelAdapter;
    import org.springframework.integration.annotation.Poller;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author cherrish
     * @time 2018-08-30 16:16
     * @name SinkSender
     * @desc:
     */
    @EnableBinding(value = {Source.class})
    public class SinkSender {
        private static Logger log = LoggerFactory.getLogger(SinkSender.class);
        private final static AtomicInteger NUM = new AtomicInteger(0);
        @InboundChannelAdapter(value = Source.OUTPUT,poller = @Poller(fixedRate = "3000"))
        public String timerMessageSource() {
            String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            log.info(NUM.getAndIncrement() + " Send message : " + format);
            return format;
        }
    
        @Autowired
        Source source;
        public void send(String message){
            source.output().send(org.springframework.integration.support.MessageBuilder.withPayload(message).build());
        }
    }
    /**********************************ScheduleTimer.java*********************************/
    
    package com.cherrish;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @author cherrish
     * @time 2018-09-03 10:58
     * @name ScheduleTimer
     * @desc:
     */
    @Component
    public class ScheduleTimer {
        @Autowired
        SinkSender sender;
        @Scheduled(fixedRate = 3000)
        public void send(){
            sender.send("定时调用发送消息" + new Date());
        }
    }
    
    

    ---------------------------
    父pom.xml:
     

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
           <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        <artifactId>cloud-stream</artifactId>
        <packaging>pom</packaging>
        <modules>
            <module>stream-sender</module>
            <module>stream-receiver</module>
        </modules>
    
       <properties>
            <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <java.version>1.8</java.version>
            <spring-boot.version>2.0.4.RELEASE</spring-boot.version>
            <skipTests>true</skipTests>
        </properties>
    
       <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-dependencies</artifactId>
                    <version>${spring-boot.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    </project>

     

© 著作权归作者所有

從此迷花粉

從此迷花粉

粉丝 7
博文 175
码字总数 64022
作品 0
海淀
程序员
私信 提问
【Spring Cloud】分布式必学springcloud(十一)——消息驱动 SpringCloud Stream

一、前言 在前面的博客中,基本上已经把springcloud系列的大部分都介绍过了。如果有太明白的小白,还是建议从小编的第一篇博客进行学习。 在这篇博客中,小白向大家介绍一个消息事件驱动框架...

kisscatforever
2018/04/26
0
0
【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305
2018/05/20
0
0
Spring Cloud 系列之 Spring Cloud Stream

Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 为消息中间件系统为基础,介绍 Spring Cloud Stream 的使用。如果你没有用过消息中间件,可以到...

风的姿态
09/24
0
0
Spring Cloud Stream应用与自定义RocketMQ Binder:编程模型

前言: 本文作者张天,节选自笔者与其合著的《Spring Cloud微服务架构进阶》,即将在八月出版问世。本文将其中Spring Cloud Stream应用与自定义Rocketmq Binder的内容抽取出来,主要介绍Spr...

aoho
2018/06/23
0
0
介绍Spring Cloud Stream与RabbitMQ集成

Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。在本文中,我们将通过一些简单的例子来介绍Spring Cloud Stream的概念...

RaiseHead
2018/05/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

kubernetes pod exec接口调用

正文 一般生产环境上由于网络安全策略,大多数端口是不能为集群外部访问的。多个集群之间一般都是通过k8s的ApiServer组件提供的接口通信,如https://192.168.1.101:6443。所以在做云平台时,...

码农实战
38分钟前
4
0
3_数组

3_数组

行者终成事
今天
8
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部