kafka指定时间、topic进行消费数据 (kafka 学习【二】)

原创
2023/10/09 14:07
阅读数 120

 指定时间和topic进行数据回溯

​
@Resource 
private KafkaConsumer<String,String> kafkaConsumer;

/**
 * 指定时间和topic进行消费数据
 * @param request
 * @throws Exception
 */
public void consumeRecords(CustomKafkaRequest request) throws Exception{
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm");
    Date date = simpleDateFormat.parse(request.getStartTime());
    long startTime =  date.getTime();
    List<TopicPartition> topicPartitions = new ArrayList<>();
    Map<TopicPartition, Long> partitionLongMap = new HashMap<>();
    // 获取topic对应的partition信息
    for (PartitionInfo topicPartition : kafkaConsumer.partitionsFor(request.getTopic())) {
        partitionLongMap.put(new TopicPartition(topicPartition.topic(), topicPartition.partition()), startTime);
        topicPartitions.add(new TopicPartition(topicPartition.topic(),topicPartition.partition()));
    }
    kafkaConsumer.assign(topicPartitions);
    Map<TopicPartition, OffsetAndTimestamp> map = kafkaConsumer.offsetsForTimes(partitionLongMap);
    // 获取topic的partition以及offset信息
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
        TopicPartition partition = entry.getKey();
        OffsetAndTimestamp value = entry.getValue();
        if(value != null){
            long offset = value.offset();
            kafkaConsumer.seek(partition, offset);
        }
    }
    long endTime = StringUtils.isEmpty(request.getEndTime())? new Date().getTime() : simpleDateFormat.parse(request.getEndTime()).getTime();
    boolean isCancel = false;
    while (!isCancel) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            if (record.timestamp() <= endTime) {
                 // do some thing
            } else {
                isCancel = true;
            }
        }
        if(records.isEmpty()){
            isCancel = true;
        }
    }
}

​

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部