文档章节

KafkaConsumer源码解析

DJZhu
 DJZhu
发布于 2017/03/03 12:51
字数 1289
阅读 25
收藏 0

#测试代码

上次讲了KafkaProducer的用法和实现代码,这里继续来看看Consumer是怎样工作的。 同样先来看看示例代码:

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SimpleConsumer {

    private Logger LOG = LoggerFactory.getLogger(SimpleConsumer.class);

    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private static String groupId = "";

    public static void main(String [] args){
        String zookeeper = "node87:2181";
        groupId = String.valueOf(new Date().getTime());//每次生成一个新的groupId方便测试
        String topic = "test1234";

        int threadCount = 1;
        SimpleConsumer simpleConsumer = new SimpleConsumer(zookeeper, groupId, topic);
        simpleConsumer.run(threadCount);

        try {
            Thread.sleep(100000);  //等待100秒后关掉服务
        } catch (InterruptedException e) {
            //
        }
        simpleConsumer.shutdown();
    }

    public SimpleConsumer(String a_zookeeper, String a_groupId, String a_topic) {
        //创建一个ConsumerConnector负责和zookeeper通信,createJavaConsumerConnector(config : ConsumerConfig)是scala的方法。内部实例化了一个
        //kafka.javaapi.consumer.ZookeeperConsumerConnector(config)对象返回
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
        this.topic = a_topic;
        this.executor = Executors.newCachedThreadPool();
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
    }

    /**
     * 读取kafkaStream方法
     */
    public void run(int a_numThreads){ //a_numThreads=1
        //topic,创建stream的数量
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, a_numThreads);

        //创建MessageStreams Map
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        ExecutorService executor = Executors.newFixedThreadPool(a_numThreads);
        executor.execute(new ConsumerTest(streams.get(0)));  //因为上面只创建了一条stream,这里直接获取之
    }

    public class ConsumerTest implements Runnable {

        KafkaStream<byte[], byte[]> stream;

        public ConsumerTest(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }

        public void run() {
            //每个stream都支持一个Iterator用来获取消息
            ConsumerIterator iterator = stream.iterator();
            LOG.info("groupId:{}",groupId);
            while(true){
                try {
                    if(iterator.hasNext()) {
                        MessageAndMetadata<byte[], byte[]> data = iterator.next();
                        LOG.info("message:{}, partition:{}, offset:{},", new String(data.message()), data.partition(), data.offset());
                    }
                }catch (ConsumerTimeoutException e){
                    System.out.println(Thread.currentThread().getName() + "----" + "超时...");
                }
            }
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);  //zookeeper地址
        props.put("group.id", a_groupId);  //group id
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "largest");  //新group-consumer启动后从最新(largest)/最旧(smalles)的数据开始读取
        props.put("consumer.timeout.ms","3000"); //消费者等待新消息时间,超过此时间没有收到新的消息会抛出一个ConsumerTimeoutException,如果设为-1
        return new ConsumerConfig(props);
    }
}

应用代码很简单,消费数据的流程是这样的:

  1. 创建一个ConsumerConnector对象实例,负责和zookeeper通信
  2. ConsumerConnector实例在zookeeper上注册相应节点,初始化若干条Stream负责和kafka-Broker通信。
  3. 每条Stream上都可以创建一个Iterator来获取消息。

#ConsumerConnector接口 这里使用的是kafka通过scala实现此接口的类:kafka.javaapi.consumer.ZookeeperConsumerConnector 下面摘自scaladoc:

ZookeeperConsumerConnector类处理和zookeeper的交互工作,包括:

  1. 在/consumers/[group_id]/注册 每个consumer在一个group中都有自己的唯一id。consumer在创建的时候会在上述路径中创建一个临时节点[ids/节点名],保存此consumer读取的topic列表。Consumer会监视其所在的[group_id]目录的变化,比如说ids目录变化就会触发一次rebalance。这里的id由消费者指定,而不是zk按序生成。 此路径下包含: /consumers/[group_id]/ids。ids目录下为本group中每个存活的consumer都创建一个节点consumer-id /consumers/[group_id]/owners。owners目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的consumer-id /consumers/[group_id]offsets。offsets目录下为本group消费的每个的topic创建一个目录,目录中为每个partition创建一个节点,节点的内容为正在消费此partition的offset
  2. 监听broker节点:/brokers/[0...N] --> { "host" : "host:port", "topics" : {"topic1": ["partition1" ... "partitionN"], ..., "topicN": ["partition1" ... "partitionN"] } } /brokers/[ids]/下每一个子节点代表一个正在运行的broker。在kafka的配置中的broker.id参数对应的就是这里的ids。节点内容为json格式,内容为broker监听的host和端口 /broker/[topics]/下包含所有topic的信息

进入ZookeeperConsumerConnector后,首先看到:

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                val enableFetcher: Boolean) // for testing only
    extends ConsumerConnector {

  private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
  private val messageStreamCreated = new AtomicBoolean(false)
  //... ...
}

创建了一个val变量(类似于Java中的final) underlying, 其实这是作为一个单例在处理consumer客户端跟zookeeper的交互的核心。 然后是val messageStreamCreated,目的是为了防止多次在同一consumer上创建多次stream.(具体目的还在研究中)

而这里我们发现其实这里存在两个同名不同包的ZookeeperConsumerConnector,java直接调用的是kafka.javaapi.consumer.ZookeeperConsumerConnector, 而在此类内部实例化的时候创建的是一个kafka.consumer.ZookeeperConsumerConnector类的实例。

按照惯例,同样先来看看这个类有什么类属性,

private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
        val enableFetcher: Boolean) // for testing only
        extends ConsumerConnector with Logging with KafkaMetricsGroup {

private val isShuttingDown = new AtomicBoolean(false) //关闭标识
private val rebalanceLock = new Object //rebalance锁
private var fetcher: Option[ConsumerFetcherManager] = None //
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)

private var sessionExpirationListener: ZKSessionExpireListener = null
private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null

private var offsetsChannel: BlockingChannel = null
private val offsetsChannelLock = new Object

private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null

// useful for tracking migration of consumers to store offsets in kafka
private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))

        val consumerIdString = {
        var consumerUuid : String = null
        config.consumerId match {
        case Some(consumerId) // for testing only
        => consumerUuid = consumerId
        case None // generate unique consumerId automatically
        => val uuid = UUID.randomUUID()
        consumerUuid = "%s-%d-%s".format(
        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
        uuid.getMostSignificantBits().toHexString.substring(0,8))
        }
        config.groupId + "_" + consumerUuid
        }
        this.logIdent = "[" + consumerIdString + "], "


© 著作权归作者所有

DJZhu
粉丝 3
博文 26
码字总数 30529
作品 0
广州
程序员
私信 提问
python kafka kerberos 验证 消费 生产

[toc] 安装 pykafkagithub 注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902) 该作者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增加了kerberos支持 验证......

stys35
02/28
55
0
记Structured Streaming 2.3.1的OOM排查过程

记Structured Streaming 2.3.1的OOM排查过程 缘起 最近在使用Structured Streaming开发一套自助配置SQL的来生成流式作业的平台,在测试的过程中发现有些作业长时间运行后会有Executor端的OOM...

纳兰清风
2018/09/26
447
1
Disruptor、Kafka、Netty整合之路

NETTY应用网关 整个网关的核心是一个netty server,各个应用程序(包括web server,手机app等)连到这个netty server上请求数据;关于数据来源,需要监听多个kafka topic(而且这里的topic是...

大胖和二胖
2016/09/23
2.3K
0
Kafka Consumer多线程实例

Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖。社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划。鉴于目前这方面...

matrix_google
2018/04/22
0
0
kafkaoffsetmonitor监控topic的logsize和offset数据没有变化

问题描述: 使用kafkaoffsetmonitor监控线上的kafka的集群信息。监控平台搭建之后,发现offset和logsize的值几乎都是平行的,没有任何的数据变化,并且在kafkaoffsetmonitor的监控web界面上发...

liuhuang9496
2017/04/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Nervos CKB 脚本编程简介[1]:验证模型

CKB 脚本编程简介[1]: 验证模型 本文作者:Xuejie 原文链接:Introduction to CKB Script Programming 1: Validation Model 本文译者:Jason,Orange 译文链接:https://talk.nervos.org/t/c...

NervosCommunity
29分钟前
4
0
消息中间件——RabbitMQ的高级特性

前言 前面我们介绍了RabbitMQ的安装、各大消息中间件的对比、AMQP核心概念、管控台的使用、快速入门RabbitMQ。本章将介绍RabbitMQ的高级特性。分两篇(上/下)进行介绍。 消息如何保障100%的...

Java架构师ya七
今天
9
0
如何编写高质量的 JS 函数(1) -- 敲山震虎篇

本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/7lCK9cHmunvYlbm7Xi7JxQ 作者:杨昆 一千个读者,有一千个哈姆雷特。 此系列文章将会从函数的执行机制、鲁棒性、函...

vivo互联网技术
今天
7
0
学会这5个Excel技巧,让你拒绝加班

在网上,随处都可以看到Excel技巧,估计已看腻了吧?但下面5个Excel技巧会让你相见恨晚。关键的是它们个个还很实用 图一 技巧1:快速删除边框 有时当我们处理数据需要去掉边框,按Ctrl+Shif...

干货趣分享
今天
11
0
JS基础-该如何理解原型、原型链?

JS的原型、原型链一直是比较难理解的内容,不少初学者甚至有一定经验的老鸟都不一定能完全说清楚,更多的"很可能"是一知半解,而这部分内容又是JS的核心内容,想要技术进阶的话肯定不能对这个...

OBKoro1
今天
12
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部