文档章节

Kafka1.0.X_消费者API详解2

特拉仔
 特拉仔
发布于 06/25 07:21
字数 1970
阅读 195
收藏 9
点赞 0
评论 2

偏移量由消费者管理

​kafka Consumer Api还提供了自己存储offset的功能,将offset和data做到原子性,可以让消费具有Exactly Once 的语义,比kafka默认的At-least Once更强大

消费者从指定分区拉取数据-手动更改偏移量

​设置消费者从自定义的位置开始拉取数据,比如从程序停止时已消费的下一Offset开始拉取数据,使用这个功能要求data和offset的update操作是原子的,否则可能会破坏数据一致性

   /*
        手动设置指定分区的offset,只适用于使用Consumer.assign方法添加主题的分区,不适用于kafka自动管理消费者组中的消费者场景,
        后面这种场景可以使用ConsumerRebalanceListener做故障恢复使用
     */
    @Test
    public void controlsOffset() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,并设置要拉取的分区
​
        //加一段代码将自己保存的分区和偏移量读取到内存
        //load partition and it's offset
        TopicPartition partition0 = new TopicPartition("mytopic3", 0);
        consumer.assign(Arrays.asList(partition0));
​
        //告知Consumer每个分区应该从什么位置开始拉取数据,offset从你加载的值或者集合中拿
        consumer.seek(partition0, 4140l);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
​
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

代码和上面的绝大多数都一样,就是要自己加载分区信息,给消费者设置每个分区的偏移量

添加因消费者改变导致kafka rebalance的监听

​ kafka提供该监听来处理当某一个topic的消费者发生变化(加入、退出)时分区重新分配(先解除与消费者的绑定关系,再重新与消费者绑定)用户想做回调的情况,分区与消费者解除绑定时调用onPartitionsRevoked方法;重新绑定时调用onPartitionsAssigned。

监听代码

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
​
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
​
/*
    kafka提供了这个监听来处理分区的变化,区分被取消时调用onPartitionsRevoked方法;分区被分配时调用onPartitionsAssigned
 */
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
    static Map<TopicPartition,Long> partitionMap = new ConcurrentHashMap<>();
    private Consumer<?,?> consumer;
    //实例化Listener的时候将Consumer传进来
    public MyConsumerRebalanceListener(Consumer<?,?> consumer) {
        this.consumer = consumer;
    }
​
    /*
        有新的消费者加入消费者组或者已有消费者从消费者组中移除会触发kafka的rebalance机制,rebalance被调用前会先调用下面的方法
        此时你可以将分区和它的偏移量记录到外部存储中,比如DBMS、文件、缓存数据库等,还可以在这里处理自己的业务逻辑
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions){
            //记录分区和它的偏移量
            partitionMap.put(partition,consumer.position(partition));
            //清空缓存
​
            System.out.println("onPartitionsRevoked partition:" + partition.partition()+" - offset"+consumer.position(partition));
        }
    }
​
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //设置分区的偏移量
        for(TopicPartition partition: partitions){
            System.out.println("onPartitionsAssigned partition:" + partition.partition()+" - offset"+consumer.position(partition));
            if(partitionMap.get(partition)!=null){
                consumer.seek(partition, partitionMap.get(partition));
            }else{
                //自定义处理逻辑
            }
        }
    }
}

测试代码

    @Test
    public void autoCommitAddListner(){
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交 true-开启 false-关闭
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        MyConsumerRebalanceListener myListener = new MyConsumerRebalanceListener(consumer);
        //消费者订阅主题,可以订阅多个主题
        consumer.subscribe(Arrays.asList("mytopic3"),myListener);
        //consumer.subscribe(Arrays.asList("mytopic3"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    /*
                        可以将这里的偏移量提交挪到监听的onPartitionsRevoked方法中,控制灵活,但是也很容易出问题
                     */
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

其他

  • 使用pause和resume可以暂停和恢复一个分区的消费动作

    consumer.pause(Arrays.asList(new TopicPartition("topic_name",parition_num)))

    consumer.resume(Arrays.asList(new TopicPartition("topic_name",parition_num)))

  • 按事务读数据

    该操作与Producer的按事务写相匹配,在Consumer代码的配置中增加一行:

    props.put("isolation.level","read_committed");

    注意,按事务读,不能使用在按指定分区拉取数据的消费者中

多线程

KafkaConsumer是线程不安全,kafka官方提供了一种写法来避免线程安全问题

ConsumerRunner:

package com.jv.parallel;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
​
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
​
public class ConsumerRunner implements Runnable{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String,String> consumer;
    private final CountDownLatch latch;
​
    public ConsumerRunner(KafkaConsumer<String,String> consumer, CountDownLatch latch){
        this.consumer = consumer;
        this.latch = latch;
    }
​
    @Override
    public void run() {
        System.out.println("threadName...."+Thread.currentThread().getName());
        try {
            consumer.subscribe(Arrays.asList("mytopic3"));
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(),record.offset(), record.key(), record.value());
            }
        } catch (WakeupException e) {
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
            latch.countDown();
        }
    }
​
    public void shutdown() {
        System.out.println("close ConsumerRunner");
        closed.set(true);
        consumer.wakeup();
    }
}

驱动方法:

   @Test
    public void autoCommitParallelTest() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms", "1000");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        final List<ConsumerRunner> consumers = new ArrayList<>();
        final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>();
        for(int i = 0;i < 2;i++){
            kafkaConsumers.add(new KafkaConsumer<String, String>(props));
        }
        final CountDownLatch latch = new CountDownLatch(2);
        final ExecutorService executor = Executors.newFixedThreadPool(2);
        for(int i = 0;i < 2;i++){
            ConsumerRunner c = new ConsumerRunner(kafkaConsumers.get(i),latch);
            consumers.add(c);
            executor.submit(c);
        }
​
        /*
            这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭
            所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("....................");
                for (ConsumerRunner consumer : consumers) {
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
​
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

但是Kafka官方任然不建议多个线程共用一个Consumer,否则会出现ConcurrentModificationException异常

Kafka提供如下两个方案实现并发:

1.一个线程一个Consumer

每个线程都拥有自己的Consumer

优点:

  • 写代码容易

  • 因为不需要协调和调度线程,速度比较快

  • 实现分区的有序非常容易

缺点:

  • TCP连接更多,如果分区非常多,这种方案不可行了

  • 消费者多了,可能因为批处理少了,使IO吞吐量减少

  • 并发数严重依赖于分区数(消费者数只能小于等于分区数)

2.Consumer和Processer分离

使用一个或者多个Consumer从Kafka拉取数据,并将数据放到一个阻塞队列中,由Processor从阻塞队列中获取数据并做业务处理。

优点:

  • 将消费和业务处理做垂直切分,然后在水平上可以独立的进行扩展

缺点:

  • 分区顺序难以保障

  • 分区提交非常麻烦

针对这种方案的分区内数据顺序问题,可以使用让每个消费者都有自己的阻塞队列。因为Consumer和Processor已经分离了,如何让Consumer知道数据已经被Processor处理完是比较麻烦的事情,

© 著作权归作者所有

共有 人打赏支持
特拉仔
粉丝 25
博文 178
码字总数 162534
作品 0
渝中
项目经理
加载中

评论(2)

特拉仔
特拉仔
嗯,你有什么好的方法让Processor通知Consumer该提交哪些offset喃
君枫
君枫
第二种模型 + 外置存储offset + seek,可以很好兼顾性能与正确性,资源的使用也非常可控
​福利丨又一波大数据新书推介,来做第一个吃螃蟹的人吧!

又到了每周发福利的时刻! 延续上期主题,本期给大家带来3本@图灵教育的大数据新书推荐 小编云: 经典读物固然能让收获有所保障 可新书却给人更多阅读的新鲜体验 来吧,一起做第一个吃螃蟹的...

DBAplus社群
2017/12/20
0
0
分布式学习(5) ---- 初识消息系统kafka

消息系统在分布式应用中有着不可或缺的地位,像是成产消费数据解耦,缓存未处理的消息等等。 那为什么不学习用Java写的ActiveMQ或RabbitMQ呢? 因为我看过卡夫卡写的变形记。 简单原理图 分布...

KKys
2017/02/13
0
0
Spring Boot 中使用 Dubbo 详解

Dubbo是阿里巴巴SOA服务化治理方案的核心框架,每天为2,000+个服务提供3,000,000,000+次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。Dubbo是一个分布式服务框架,致力于提供高性能...

PengLei
2017/10/28
0
0
福利丨这也许是目前市面上最权威的Kafka学习指南!

Kafka,不是村上春树的《海边的卡夫卡》,不是《变形记》的作者弗兰兹·卡夫卡。程序员世界里的 Kafka,有三分之一的世界 500 强公司都在使用它发布和订阅、存储及实时地处理大规模流数据。 ...

DBAplus社群
01/15
0
0
分布式学习(6) ---- 深入kafka生产消费模型

生产者详解 上文中生产者代码如下: 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会泄露这些资源。 send()...

KKys
2017/02/14
0
0
泥沙砖瓦浆木匠/springboot-learning-example

springboot-learning-example spring boot 实践学习案例,是 spring boot 初学者及核心技术巩固的最佳实践。 推荐 springcloud-learning-example spring cloud 实践学习案例 https://github...

泥沙砖瓦浆木匠
2017/03/29
0
0
服务器后端开发系列——《实战FastDFS分布式文件系统》

1、FastDFS的配置、部署与API使用解读(1)Get Started with FastDFS 内容:讲解FastDFS的背景、基本原理,并讲述基本的配置、部署和测试的内容。 2、FastDFS的配置、部署与API使用解读(2)...

晨曦之光
2012/04/24
261
0
消息队列ActiveMQ的使用详解

通过上一篇文章 《消息队列深入解析》,我们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。 这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。 学习消息队...

snailclimb
04/24
0
0
系统的学习大数据分布式计算spark技术

我们在学习一门技术的时候一定要以系统的思维去学习,这样的话,不仅对你的提高有很大的帮助,也可以让你高效的使用这个技术。 对于学习spark,当然也是要以系统的思维去全面的学习。这篇博客...

tangweiqun
2017/09/24
0
0
Redis 5.0-Stream 操作详解

简介 Stream是Redis 5.0引入的一种新数据类型,允许消费者等待生产者发送的新数据,还引入了消费者组概念,组之间数据是相同的(前提是设置的偏移量一样),组内的消费者不会拿到相同数据。这...

特拉仔
06/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

程序的调试信息

调试二进制程序时,经常要借助GDB工具,跟踪程序的执行流程,获取程序执行时变量的值,以发现问题所在。GDB能得到这些信息,是因为编译程序时,编译器保存了相应的信息。Linux下的可执行程序...

qlee
24分钟前
0
0
应用级缓存

缓存命中率 从缓存中读取数据的次数与总读取次数的比例,命中率越高越好 java缓存类型 堆缓存 guavaCache Ehcache3.x 没有序列化和反序列化 堆外缓存ehcache3.x 磁盘缓存 存储在磁盘上 分布式...

writeademo
今天
0
0
python爬虫日志(3)find(),find_all()函数

1.一般来说,为了找到BeautifulSoup对象内任何第一个标签入口,使用find()方法。 以上代码是一个生态金字塔的简单展示,为了找到第一生产者,第一消费者或第二消费者,可以使用Beautiful Sou...

茫羽行
今天
0
0
java:thread:顺序执行多条线程

实现方案: 1.调用线程的join方法:阻塞主线程 2.线程池 package com.java.thread.test;public class MyThread01 implements Runnable {@Overridepublic void run() {Syste...

人觉非常君
今天
0
0
ElasticSearch 重写IK分词器源码设置mysql热词更新词库

常用热词词库的配置方式 1.采用IK 内置词库 优点:部署方便,不用额外指定其他词库位置 缺点:分词单一化,不能指定想分词的词条 2.IK 外置静态词库 优点:部署相对方便,可以通过编辑指定文...

键走偏锋
今天
18
0
Git 2.18版本发布:支持Git协议v2,提升性能

Git 2.18版本发布:支持Git协议v2,提升性能Git 2.18版本发布:支持Git协议v2,提升性能 新版本协议的主要驱动力是使 Git 服务端能够对各种 ref(分支与 tag)进行过滤操作。 这就意味着,G...

linux-tao
今天
0
0
python浏览器自动化测试库【2018/7/22-更新】

64位py2.7版本 更新 document_GetResources 枚举页面资源 document_GetresourceText 获取指定url的内容 包括页面图片 下载地址下载地址 密码:upr47x...

开飞色
今天
41
0
关于DCL双重锁失效及解决方案

关于DCL双重锁失效及解决方案 Double Check Lock (DCL)实现单例 DCL 方式实现单例的优点是既能够在需要时才初始化单例,又能够保证线程安全,且单例对象初始化后调用getInstance方法不进行...

DannyCoder
今天
0
0
PowerDesigner 16.5 安装配置

PowerDesigner16.5破解版是一款业内领先且开发人员常用的数据库建模工具,PowerDesigner可以从物理和概念两个层面设计数据库,方便用户制作处清晰直观的数据流程图和结构模型,欢迎有需要的朋...

Gibbons
今天
0
0
mac Homebrew 指令积累

1通用命令 brew install [包名] //安装包 brew list //列举安装的包 brew info [包名] // 显示安装包的详细信息 mysql 相关 #启动mysql 服务 brew service start mysql my...

Kenny100120
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部