spring boot整合kafaka批量消费

原创
08/06 17:23
阅读数 424

spring boot整合kafaka批量消费:
配置文件:
kafka:
  producer:
    bootstrap-servers: 127.0.0.1:9092
    batch-size: 16785                                   #一次最多发送数据量
    retries: 1                                          #发送失败后的重复发送次数
    buffer-memory: 33554432                             #32M批处理缓冲区
    linger: 1
  consumer:
    bootstrap-servers: 127.0.0.1:9092
    auto-offset-reset: earliest                           #最早未被消费的offset earliest
    max-poll-records: 200                              #批量消费一次最大拉取的数据量
    enable-auto-commit: true                           #是否开启自动提交
    auto-commit-interval: 1000                          #自动提交的间隔时间
    session-timeout: 20000                              #连接超时时间
    max-poll-interval: 15000                            #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
    max-partition-fetch-bytes: 15728640                 #设置拉取数据的大小,15M
  listener:
    batch-listener: true                                #是否开启批量消费,true表示批量消费
    concurrencys: 3,6                                   #设置消费的线程数
    poll-timeout: 1500                                  #只限自动提交,

初始化topic:

@Component
public class KafkaInitialConfiguration {
    //创建TopicName为topic.quick.initial的Topic并设置分区数为8以及副本数为1
    //@Bean//通过bean创建(bean的名字为initialTopic)
   /* public NewTopic initialTopic() {
        return new NewTopic("topic.quick.initial",8, (short) 1 );
    }*/

    /**
     * 此种@Bean的方式,如果topic的名字相同,那么会覆盖以前的那个
     * @return
     */
//    //修改后|分区数量会变成11个 注意分区数量只能增加不能减少
    @Bean
    public NewTopic initialTopic2() {
        return new NewTopic("topic.quick.initial",11, (short) 1 );
    }

    @Bean//通过bean创建(bean的名字为initialTopic)
    public NewTopic initialTopic3() {
        return new NewTopic("topic1",10, (short) 2 );
    }
/*    @Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        //配置Kafka实例的连接地址                                                                    //kafka的地址,不是zookeeper
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }*/

  /*  @Bean  //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfigurationProperties());
    }*/

}

生产者配置:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.producer.retries}")
    private Integer retries;

    @Value("${kafka.producer.batch-size}")
    private Integer batchSize;

    @Value("${kafka.producer.buffer-memory}")
    private Integer bufferMemory;

    @Value("${kafka.producer.linger}")
    private Integer linger;

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>(7);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private ProducerFactory<String, String> producerFactory() {
        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        /*producerFactory.transactionCapable();
        producerFactory.setTransactionIdPrefix("hous-");*/
        return producerFactory;
    }

    /*@Bean
    public KafkaTransactionManager transactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory());
        return manager;
    }*/

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    /*@Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        //配置Kafka实例的连接地址                                                                    //kafka的地址,不是zookeeper
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

   @Bean  //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfigurationProperties());
    }*/
}

消费者配置:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Value("${kafka.consumer.auto-commit-interval}")
    private Integer autoCommitInterval;

    @Value("${kafka.consumer.max-poll-records}")
    private Integer maxPollRecords;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
    private Integer concurrency3;

    @Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
    private Integer concurrency6;

    @Value("${kafka.listener.poll-timeout}")
    private Long pollTimeout;

    @Value("${kafka.consumer.session-timeout}")
    private String sessionTimeout;

    @Value("${kafka.listener.batch-listener}")
    private Boolean batchListener;

    @Value("${kafka.consumer.max-poll-interval}")
    private Integer maxPollInterval;

    @Value("${kafka.consumer.max-partition-fetch-bytes}")
    private Integer maxPartitionFetchBytes;

    /**
     * 并发数6
     *
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener6")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener6() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
        factory.setConcurrency(concurrency6);
        return factory;
    }

    /**
     * 并发数3
     *
     * @return
     */
    @Bean
    @ConditionalOnMissingBean(name = "kafkaBatchListener")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
        factory.setConcurrency(concurrency3);
        return factory;
    }

    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //批量消费
        factory.setBatchListener(batchListener);
        //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
        // 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
        //手动提交无需配置
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
        // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    private Map<String, Object> consumerConfigsBatch(){
        Map<String, Object> stringObjectMap = consumerConfigs();
        stringObjectMap.put(ConsumerConfig.GROUP_ID_CONFIG, "4");
        return stringObjectMap;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigsBatch());
    }

    //https://blog.csdn.net/qq_26869339/article/details/88324980
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}


发送:
 @ApiOperation(value = "测试", notes = "测试", tags = "用户接口")
    @GetMapping("/test")
    public String test() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "yyf");
        map.put("age", 26);
        map.put("length", 175);
        String s = JSON.toJSONString(map);
        for (int i = 0; i < 1; i++) {
            int n = 0 % 8;
            kafkaTemplate.send("topic1", n, "a" + i, s);
        }
        return "ok";
    }

消费者:
package com.llw.medical.bs.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;


@Component
public class KafakaListener {

/*
    @KafkaListener(id = "1", topics = {"topic2"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record =" + record);
            System.out.println("----------------- message =" + message);
        }
    }

    @KafkaListener(id = "2", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = { "1", "2", "3"}
                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")
            )
            })
    public void listen2(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 1=" + record);
            System.out.println("------------------ message 1=" + message);
        }
    }*/
//id = "4",
    @KafkaListener( topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"0"}
                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")
            )
            }, containerFactory = "kafkaBatchListener")
    public void listen3(List<ConsumerRecord<?, ?>> records) {
        //, Acknowledgment ack
        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 4=" + record);
                    System.out.println("------------------ message 4=" + message);
                }
            }
        }finally {
         //   ack.acknowledge();
        }
    }

    @KafkaListener( topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"0"}
                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")
            )
            }, containerFactory = "kafkaBatchListener6")
    public void listen2(List<ConsumerRecord<?, ?>> records) {
        //, Acknowledgment ack
        try {
            for (ConsumerRecord<?, ?> record : records) {
                Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                if (kafkaMessage.isPresent()) {
                    Object message = kafkaMessage.get();
                    System.out.println("----------------- record 6=" + record);
                    System.out.println("------------------ message 6=" + message);
                }
            }
        }finally {
            //   ack.acknowledge();
        }
    }

    /*@KafkaListener(id = "3", topicPartitions =
            {@TopicPartition(topic = "topic1",
                    partitions = {"6", "7"}
                    // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4")
            )
            })
    public void listen4(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("----------------- record 3=" + record);
            System.out.println("------------------ message 3=" + message);
        }
    }*/
}

kafaka多个消费者消费同一个topic或者同一个topic的partition时,类似于广播消息,每个消费者都会接收到一份消息。
 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
在线直播报名
返回顶部
顶部