文档章节

spring cloud stream kafka实例

go4it
 go4it
发布于 2017/04/10 13:23
字数 241
阅读 190
收藏 4

maven

    <dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Camden.SR6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

    <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
	</dependency>

生产者配置

server:
  port: 8081
spring:
  application:
    name: output-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1


  • java代码
@EnableBinding(Source.class)
public class SendService {

	@Autowired
	private Source source;

	public void sendMessage(String msg) {
		try {
			source.output().send(MessageBuilder.withPayload(msg).build());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

@RestController
public class ProducerController {
	
	@Autowired
	private SendService service;
	
	@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
	public void send(@PathVariable("msg") String msg){
		service.sendMessage(msg);
	}
	
}

消费者

spring:
  application:
    name: input-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: event-demo
            group: s1
            consumer:
              autoCommitOffset: false
              concurrency: 1
              partitioned: false
  • java代码
@EnableBinding(Sink.class)
public class MsgSink {

	@StreamListener(Sink.INPUT)
	public void process(Message<?> message) {
		System.out.println(message.getPayload());
		Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
		if (acknowledgment != null) {
			System.out.println("Acknowledgment provided");
			acknowledgment.acknowledge();
		}
	}
}

运行

先运行生产者,再运行消费者

curl -i localhost:8081/send/hello1

doc

© 著作权归作者所有

共有 人打赏支持
go4it
粉丝 71
博文 823
码字总数 698045
作品 0
深圳
私信 提问
(十七) 整合spring cloud云架构 -消息驱动 Spring Cloud Stream

在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud做架构的时候,其中消息的...

itcloud
08/28
0
0
Spring Cloud (十五)Stream 入门、主要概念与自定义消息发送与接收

前言 不写随笔的日子仿佛就是什么都没有产出一般……上节说到要学Spring Cloud Bus,这里发现按照官方文档的顺序反而会更好些,因为不必去后边的章节去为当前章节去打基础,所以我们先学习S...

东北小狐狸
07/31
0
0
Spring Cloud Stream 1.0.1 发布

Spring Cloud Stream 1.0.1 发布了。Spring Cloud Stream 1.0.0 附带 Apache Kafka 和 Rabbitmq 的Binders,并带来了其他实现,比如GemFire可用。 作为一个维护版本,此次更新包括了一些修复...

oschina
2016/06/04
1K
2
SpringCloud实战9-Stream消息驱动

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。   应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Clo...

狂小白
05/29
0
0
Spring Cloud Stream 1.3 GA 正式发布

Spring Cloud Stream 1.3 —— Ditmars 已发布。Spring Cloud Stream 是与 Spring 集成的事件驱动的微服务。 此版本中包含的新功能请查看发布主页。 发布说明包括与 Spring Boot,Spring ...

局长
2017/10/07
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

ArrayList的实现原理以及实现线程安全

一、ArrayList概述 ArrayList是基于数组实现的,是一个动态的数字,可以自动扩容。 ArrayList不是线程安全的,效率比较高,只能用于单线程的环境中,在多线程环境中可以使用Collections.syn...

一看就喷亏的小猿
15分钟前
0
0
Netty 备录 (一)

入职新公司不久,修修补补1个月的bug,来了点实战性的技术---基于netty即时通信 还好之前对socket有所使用及了解,入手netty应该不是很难吧,好吧,的确有点难,刚看这玩意的时候,可能都不知道哪里...

_大侠__
昨天
4
0
Django简单介绍和用户访问流程

Python下有许多款不同的 Web 框架。Django是重量级选手中最有代表性的一位。许多成功的网站和APP都基于Django。 Django是一个开放源代码的Web应用框架,由Python写成。 Django遵守BSD版权,初...

枫叶云
昨天
8
0
EOS错误代码及中文释义

本文集汇总了EOS区块链常见错误代码及其含义,完整错误代码集请查看 EOS错误代码集 - 汇智网 EOS错误代码列表如下, <table class="table table-striped"> <thead> <tr><th>错误代码</th><t......

汇智网教程
昨天
4
0
Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另外一种重试...

程序猿DD
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部