文档章节

Kafka1.0.X_消费者API详解2

特拉仔
 特拉仔
发布于 06/25 07:21
字数 1970
阅读 207
收藏 9

偏移量由消费者管理

​kafka Consumer Api还提供了自己存储offset的功能,将offset和data做到原子性,可以让消费具有Exactly Once 的语义,比kafka默认的At-least Once更强大

消费者从指定分区拉取数据-手动更改偏移量

​设置消费者从自定义的位置开始拉取数据,比如从程序停止时已消费的下一Offset开始拉取数据,使用这个功能要求data和offset的update操作是原子的,否则可能会破坏数据一致性

   /*
        手动设置指定分区的offset,只适用于使用Consumer.assign方法添加主题的分区,不适用于kafka自动管理消费者组中的消费者场景,
        后面这种场景可以使用ConsumerRebalanceListener做故障恢复使用
     */
    @Test
    public void controlsOffset() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,并设置要拉取的分区
​
        //加一段代码将自己保存的分区和偏移量读取到内存
        //load partition and it's offset
        TopicPartition partition0 = new TopicPartition("mytopic3", 0);
        consumer.assign(Arrays.asList(partition0));
​
        //告知Consumer每个分区应该从什么位置开始拉取数据,offset从你加载的值或者集合中拿
        consumer.seek(partition0, 4140l);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
​
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

代码和上面的绝大多数都一样,就是要自己加载分区信息,给消费者设置每个分区的偏移量

添加因消费者改变导致kafka rebalance的监听

​ kafka提供该监听来处理当某一个topic的消费者发生变化(加入、退出)时分区重新分配(先解除与消费者的绑定关系,再重新与消费者绑定)用户想做回调的情况,分区与消费者解除绑定时调用onPartitionsRevoked方法;重新绑定时调用onPartitionsAssigned。

监听代码

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
​
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
​
/*
    kafka提供了这个监听来处理分区的变化,区分被取消时调用onPartitionsRevoked方法;分区被分配时调用onPartitionsAssigned
 */
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
    static Map<TopicPartition,Long> partitionMap = new ConcurrentHashMap<>();
    private Consumer<?,?> consumer;
    //实例化Listener的时候将Consumer传进来
    public MyConsumerRebalanceListener(Consumer<?,?> consumer) {
        this.consumer = consumer;
    }
​
    /*
        有新的消费者加入消费者组或者已有消费者从消费者组中移除会触发kafka的rebalance机制,rebalance被调用前会先调用下面的方法
        此时你可以将分区和它的偏移量记录到外部存储中,比如DBMS、文件、缓存数据库等,还可以在这里处理自己的业务逻辑
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions){
            //记录分区和它的偏移量
            partitionMap.put(partition,consumer.position(partition));
            //清空缓存
​
            System.out.println("onPartitionsRevoked partition:" + partition.partition()+" - offset"+consumer.position(partition));
        }
    }
​
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //设置分区的偏移量
        for(TopicPartition partition: partitions){
            System.out.println("onPartitionsAssigned partition:" + partition.partition()+" - offset"+consumer.position(partition));
            if(partitionMap.get(partition)!=null){
                consumer.seek(partition, partitionMap.get(partition));
            }else{
                //自定义处理逻辑
            }
        }
    }
}

测试代码

    @Test
    public void autoCommitAddListner(){
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交 true-开启 false-关闭
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        MyConsumerRebalanceListener myListener = new MyConsumerRebalanceListener(consumer);
        //消费者订阅主题,可以订阅多个主题
        consumer.subscribe(Arrays.asList("mytopic3"),myListener);
        //consumer.subscribe(Arrays.asList("mytopic3"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    /*
                        可以将这里的偏移量提交挪到监听的onPartitionsRevoked方法中,控制灵活,但是也很容易出问题
                     */
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

其他

  • 使用pause和resume可以暂停和恢复一个分区的消费动作

    consumer.pause(Arrays.asList(new TopicPartition("topic_name",parition_num)))

    consumer.resume(Arrays.asList(new TopicPartition("topic_name",parition_num)))

  • 按事务读数据

    该操作与Producer的按事务写相匹配,在Consumer代码的配置中增加一行:

    props.put("isolation.level","read_committed");

    注意,按事务读,不能使用在按指定分区拉取数据的消费者中

多线程

KafkaConsumer是线程不安全,kafka官方提供了一种写法来避免线程安全问题

ConsumerRunner:

package com.jv.parallel;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
​
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
​
public class ConsumerRunner implements Runnable{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String,String> consumer;
    private final CountDownLatch latch;
​
    public ConsumerRunner(KafkaConsumer<String,String> consumer, CountDownLatch latch){
        this.consumer = consumer;
        this.latch = latch;
    }
​
    @Override
    public void run() {
        System.out.println("threadName...."+Thread.currentThread().getName());
        try {
            consumer.subscribe(Arrays.asList("mytopic3"));
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(),record.offset(), record.key(), record.value());
            }
        } catch (WakeupException e) {
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
            latch.countDown();
        }
    }
​
    public void shutdown() {
        System.out.println("close ConsumerRunner");
        closed.set(true);
        consumer.wakeup();
    }
}

驱动方法:

   @Test
    public void autoCommitParallelTest() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms", "1000");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        final List<ConsumerRunner> consumers = new ArrayList<>();
        final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>();
        for(int i = 0;i < 2;i++){
            kafkaConsumers.add(new KafkaConsumer<String, String>(props));
        }
        final CountDownLatch latch = new CountDownLatch(2);
        final ExecutorService executor = Executors.newFixedThreadPool(2);
        for(int i = 0;i < 2;i++){
            ConsumerRunner c = new ConsumerRunner(kafkaConsumers.get(i),latch);
            consumers.add(c);
            executor.submit(c);
        }
​
        /*
            这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭
            所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("....................");
                for (ConsumerRunner consumer : consumers) {
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
​
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

但是Kafka官方任然不建议多个线程共用一个Consumer,否则会出现ConcurrentModificationException异常

Kafka提供如下两个方案实现并发:

1.一个线程一个Consumer

每个线程都拥有自己的Consumer

优点:

  • 写代码容易

  • 因为不需要协调和调度线程,速度比较快

  • 实现分区的有序非常容易

缺点:

  • TCP连接更多,如果分区非常多,这种方案不可行了

  • 消费者多了,可能因为批处理少了,使IO吞吐量减少

  • 并发数严重依赖于分区数(消费者数只能小于等于分区数)

2.Consumer和Processer分离

使用一个或者多个Consumer从Kafka拉取数据,并将数据放到一个阻塞队列中,由Processor从阻塞队列中获取数据并做业务处理。

优点:

  • 将消费和业务处理做垂直切分,然后在水平上可以独立的进行扩展

缺点:

  • 分区顺序难以保障

  • 分区提交非常麻烦

针对这种方案的分区内数据顺序问题,可以使用让每个消费者都有自己的阻塞队列。因为Consumer和Processor已经分离了,如何让Consumer知道数据已经被Processor处理完是比较麻烦的事情,

© 著作权归作者所有

共有 人打赏支持
特拉仔
粉丝 32
博文 188
码字总数 167470
作品 0
渝中
项目经理
加载中

评论(2)

特拉仔
特拉仔
嗯,你有什么好的方法让Processor通知Consumer该提交哪些offset喃
君枫
君枫
第二种模型 + 外置存储offset + seek,可以很好兼顾性能与正确性,资源的使用也非常可控
​福利丨又一波大数据新书推介,来做第一个吃螃蟹的人吧!

又到了每周发福利的时刻! 延续上期主题,本期给大家带来3本@图灵教育的大数据新书推荐 小编云: 经典读物固然能让收获有所保障 可新书却给人更多阅读的新鲜体验 来吧,一起做第一个吃螃蟹的...

DBAplus社群
2017/12/20
0
0
分布式学习(5) ---- 初识消息系统kafka

消息系统在分布式应用中有着不可或缺的地位,像是成产消费数据解耦,缓存未处理的消息等等。 那为什么不学习用Java写的ActiveMQ或RabbitMQ呢? 因为我看过卡夫卡写的变形记。 简单原理图 分布...

KKys
2017/02/13
0
0
Spring Boot 中使用 Dubbo 详解

Dubbo是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,000,000,000+次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。Dubbo是一个分布式服务框架,致力于提供高性能...

PengLei
2017/10/28
0
0
福利丨这也许是目前市面上最权威的Kafka学习指南!

Kafka,不是村上春树的《海边的卡夫卡》,不是《变形记》的作者弗兰兹·卡夫卡。程序员世界里的 Kafka,有三分之一的世界 500 强公司都在使用它发布和订阅、存储及实时地处理大规模流数据。 ...

DBAplus社群
01/15
0
0
分布式学习(6) ---- 深入kafka生产消费模型

生产者详解 上文中生产者代码如下: 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。 send()...

KKys
2017/02/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

redis-hash

哈希类型是指健值本身又是一个键值对结构 基本命令: hset key field value 设置值 hget(获取),hdel(删除),hlen(计算field个数),hmget(批量设置),hexists(是否存在),hkeys(获取所有的...

拐美人
37分钟前
2
0
简单的svm例子

数据来源:https://github.com/oumiga1314/Coursera-ML-AndrewNg-Notes/blob/master/code/ex6-SVM/data/ex6data1.mat import pandas as pd import numpy as np import scipy.io as sio impor......

南桥北木
41分钟前
1
0
android 关于View的一些整理

1、Button text的值为英文时,会自动转换成大写。如需取消,设置android:textAllCaps="false" 2、控件的可见性 可以在layout的配置文件中,配置android:visibility属性 调用setVisibility()...

西米小娅
51分钟前
1
0
Spring JDBC数据源分析

Spring数据源分析 分析这样一段代码: package com.jason.spring.datasource.jdbc;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframew......

宸明
今天
1
0
FatJar:适用于sdk多module打包和合并多个jar的gradle插件

usage: 1.下载fatJar.gradle放置于project根目录 2.在project的build.gradle中添加依赖和配置: apply from: 'fatJar.gradle'buildscript { dependencies { classpath 'xyz......

SuShine
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部