spring集成kafka

原创
08/03 20:18
阅读数 175

1、引入依赖jar包

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka信息

spring:
  kafka: 
    bootstrap-servers: localhost:9092
    consumer: 
      group-id: group1
    listener: 
      missing-topics-fatal: false

启动报错,需要配置missing-topics-fatal为false

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215)
	at com.zhoulp.SyncMessageWebApplication.main(SyncMessageWebApplication.java:24)
Caused by: java.lang.IllegalStateException: Topic(s) [topic1] is/are not present and missingTopicsFatal is true
	at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:383)
	at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:144)
	at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:340)
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
	at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
	... 14 common frames omitted

3、实现生产者

package com.zhoulp.producer;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * 
 * @author zhoulp
 * @date   2020-08-03
 *
 */
@Component("kafkaProducer")
public class KafkaProducer {
	
	private static Logger log = LoggerFactory.getLogger(KafkaProducer.class);
	
	@Inject
	private KafkaTemplate<String, String> template;
	
	public void sendMessage(String topic, String data) {
		log.info("send: topic = {}, data = {}", topic, data);
		template.send(topic, data);
	}

}

4、实现消费者

package com.zhoulp.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @author zhoulp
 * @date   2020-08-03
 *
 */
@Component("kafkaConsumer")
public class KafkaConsumer {
	
	private static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
	
	@KafkaListener(topics = "topic1")
	public void listenTopic1(ConsumerRecord<String, String> consumerRecord) {
		log.info("listenTopic1");
		log.info(consumerRecord.toString());
		log.info(consumerRecord.topic());
		log.info(consumerRecord.value());
	}

}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
在线直播报名
返回顶部
顶部