spring cloud stream kafka实例
博客专区 > go4it 的博客 > 博客详情
spring cloud stream kafka实例
go4it 发表于6个月前
spring cloud stream kafka实例
  • 发表于 6个月前
  • 阅读 82
  • 收藏 4
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

maven

    <dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Camden.SR6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

    <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
	</dependency>

生产者配置

server:
  port: 8081
spring:
  application:
    name: output-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          output:
            destination: event-demo
            content-type: text/plain
            producer:
              partitionCount: 1


  • java代码
@EnableBinding(Source.class)
public class SendService {

	@Autowired
	private Source source;

	public void sendMessage(String msg) {
		try {
			source.output().send(MessageBuilder.withPayload(msg).build());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

@RestController
public class ProducerController {
	
	@Autowired
	private SendService service;
	
	@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
	public void send(@PathVariable("msg") String msg){
		service.sendMessage(msg);
	}
	
}

消费者

spring:
  application:
    name: input-demo
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2182
            auto-add-partitions: true
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: event-demo
            group: s1
            consumer:
              autoCommitOffset: false
              concurrency: 1
              partitioned: false
  • java代码
@EnableBinding(Sink.class)
public class MsgSink {

	@StreamListener(Sink.INPUT)
	public void process(Message<?> message) {
		System.out.println(message.getPayload());
		Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
		if (acknowledgment != null) {
			System.out.println("Acknowledgment provided");
			acknowledgment.acknowledge();
		}
	}
}

运行

先运行生产者,再运行消费者

curl -i localhost:8081/send/hello1

doc

标签: Spring Cloud
共有 人打赏支持
粉丝 13
博文 398
码字总数 190445
×
go4it
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: