文档章节

spring cloud stream kafka实例

go4it
 go4it
发布于 2017/04/10 13:23
字数 241
阅读 1.5K
收藏 4

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

maven

    <dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Camden.SR6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

    <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
	</dependency>

生产者配置

server:
  port: 8081
spring:
  application:
    name: output-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1


  • java代码
@EnableBinding(Source.class)
public class SendService {

	@Autowired
	private Source source;

	public void sendMessage(String msg) {
		try {
			source.output().send(MessageBuilder.withPayload(msg).build());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

@RestController
public class ProducerController {
	
	@Autowired
	private SendService service;
	
	@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
	public void send(@PathVariable("msg") String msg){
		service.sendMessage(msg);
	}
	
}

消费者

spring:
  application:
    name: input-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: event-demo
            group: s1
            consumer:
              autoCommitOffset: false
              concurrency: 1
              partitioned: false
  • java代码
@EnableBinding(Sink.class)
public class MsgSink {

	@StreamListener(Sink.INPUT)
	public void process(Message<?> message) {
		System.out.println(message.getPayload());
		Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
		if (acknowledgment != null) {
			System.out.println("Acknowledgment provided");
			acknowledgment.acknowledge();
		}
	}
}

运行

先运行生产者,再运行消费者

curl -i localhost:8081/send/hello1

doc

© 著作权归作者所有

go4it
粉丝 93
博文 1347
码字总数 1253425
作品 0
深圳
私信 提问
加载中

评论(0)

Spring Cloud学习系列第七篇【消息总线】

  上篇学习了搭建“分布式配置中心”,发现Spring Cloud Config这个配置中心动态刷新设计有点土,当配置仓库属性有更新动作的时候,需要对对应实例请求/refresh端点,该实例才能获取到更新...

osc_k1dxty4x
2018/08/06
7
0
Spring Cloud Stream 核心概念

Spring Cloud Stream简介 Spring cloud stream是一个构建与Spring Boot和Spring Integration之上的框架,方便开发人员快速构建基于Message-Driven的系统。 Spring Integration & Enterprise ...

osc_rnrep3wi
2019/08/22
8
0
Spring Cloud Stream 进行服务之间的通讯

Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构。Spring Cloud Stream本身对Sprin...

osc_qukgacve
2019/10/15
2
0
学习笔记:微服务-22 spring cloud config+kafka+spring cloud bus 消息总线实现配置自动更新

上节配置了spring cloud config server,集中管理配置文件,当配置文件发生变化时,客户端通过 curl -X POST http://客户端地址:端口/actuator/refresh 可以刷新客户端配置,但是如果微服务多...

领尚
03/31
0
0
Spring Cloud 之 Stream.

一、简介 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。 Spring Cloud Stream 为一些供应商的消息中间件产品(目前集成了 RabbitMQ 和 Kafka)提供了个性化的自动化配...

osc_i6ddt53t
04/16
10
0

没有更多内容

加载失败,请刷新页面

加载更多

centos7安装squid代理

局域网只有一台服务器可以上互联网,其他机器需要使用代理上网,windows下可以用ccproxy,linux建议使用squid(dns解析需要配合iptables) 1、安装squid yum install squid.x86_64 2、配置squ...

osc_rn23gf4h
12分钟前
8
0
基于表单的网站身份验证的权威指南[关闭] - The definitive guide to form-based website authentication [closed]

问题: Form-based authentication for websites 基于表单的网站身份验证 We believe that Stack Overflow should not just be a resource for very specific technical questions, but also......

fyin1314
13分钟前
7
0
我的第一个Flask项目

项目背景提要 最近公司经常有测试,产品,开发人员需要我帮忙查看服务器上面发送的短信验证码来完成工作上的一些需求。我们的短信验证码由我们后台程序发出,调用第三方短信平台发送,这中间...

osc_6kvl6c8h
13分钟前
10
0
云原生下的开发测试之困与阿里的解决之道

【以下为分享实录,有删节】 测试环境管理之困与阿里巴巴的解决之道 在云原生时代下,软件的迭代速度越来越快,对测试的要求也越来越高,很多开发者开始使用Kubernetes来管理测试环境。在这个...

阿里云技术博客
13分钟前
5
0
Active Directory颗粒化密码策略配置

1 多元(颗粒化)密码策略介绍 在windows server 2000/2003中,我们无法针对域用户不同而设置不同密码策略, 域用户密码策略和账户设置都 由默认域策略控制,如果要重新建立策略我们必须创建...

osc_61i1fz2h
14分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部