文档章节

Consumer Group Example-High Level Consumer(翻译)

Yulong_
 Yulong_
发布于 2017/08/16 20:08
字数 2726
阅读 8
收藏 0
点赞 0
评论 0

Using the High Level Consumer

Why use the High Level Consumer

Sometimes the logic to read messages from Kafka doesn't care about handling the message offsets, it just wants the data. So the High Level Consumer is provided to abstract most of the details of consuming events from Kafka.

有时候从Kafka读取message的逻辑并不关心维护message offsets,只是想要获取数据。所以High Level Consumer被提供作为从Kafka消费事件大部分细节的抽象。

First thing to know is that the High Level Consumer stores the last offset read from a specific partition in ZooKeeper. This offset is stored based on the name provided to Kafka when the process starts. This name is referred to as the Consumer Group.

首先需要知道的事情是High Level Consumer将从某个分区读取时的最后offset存储在ZooKeeper中。在程序启动时,这个offset基于Kafka提供的名字进行存储。这个名字被称为Consumer Group。

The Consumer Group name is global across a Kafka cluster, so you should be careful that any 'old' logic Consumers be shutdown before starting new code. When a new process is started with the same Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.

Consumer Group名字对于Kafka集群来说是全局,所以你应该注意在启动新的程序之前确认任何'历史的'逻辑Consumers被关闭。当一个新程序使用相同的Consumer Group name,Kafka将把这个程序的thread增加到消费这个Topic的线程集合中,并且触发一个're-balance'.这个re-balance中,Kafka将指定可用的分区到可用的线程,可能移动一个分区到其他的process。如果你有一个旧的和新的业务逻辑的组合,这可能的导致一些信息发送到旧逻辑中。

Designing a High Level Consumer

The first thing to know about using a High Level Consumer is that it can (and should!) be a multi-threaded application. The threading model revolves around the number of partitions in your topic and there are some very specific rules:

关于High Level Consumer首先需要知道的事情是,它可以(并且应该)是一个多线程程序。线程模型将围绕你的topic分区数量来进行工作,有一些具体的规则:

  • if you provide more threads than there are partitions on the topic, some threads will never see a message

    线程数大于topic分区数时,一些线程将无法接收信息

  • if you have more partitions than you have threads, some threads will receive data from multiple partitions
    线程数小于topic分区数时,一些线程将从多个分区中接收信息

  • if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available.
    当每个线程接收多个分区消息时,则不能保证接收消息的顺序,而不是分区内的顺序,offset将是连续的。例如,您可以从分区10收到5条信息,从分区11收到6条消息,然而可能出现从分区10接收5条信息之后又接收5条信息,即使分区11有可用的数据。

  • adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

    增加更多的进程/线程将导致Kafka re-balance,可能改变线程上的分区分配。

Next, your logic should expect to get an iterator from Kafka that may block if there are no new messages available.

其次,你的逻辑应该从Kafka中得到迭代器,并且果没有可用的新消息时进行阻塞。

Here is an example of a very simple consumer that expects to be threaded.

这是一个很简单的线程化消费的例子。

package com.test.groups;
 
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
 
public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
 
    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it.

有趣的部分是while (it.hasNext()),这段代码会一直从Kafka中读取,直到你停止它。

Configuring the test application

Unlike the SimpleConsumer the High level consumer takes care of a lot of the bookkeeping and error handling for you. However you do need to tell Kafka where to store some information. The following method defines the basics for creating a High Level Consumer:

与SimpleConsumer 不同,High level consumer已经为你负责了一些bookkeeping和错误处理的工作。然而你需要告诉Kafka将这些信息存储在哪里。下面的方法定义了如何创建High Level Consumer的基础部分。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster. Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group.定义在你的集群冲找到Zookeeper的一个实例。Kafka使用ZooKeeper来存储对于某个特定topic和partition通过Consumer Group进行消费的消息offsets。

The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of.定义这个进程消费的Consumer Group

The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.定义Kafka等待ZooKeeper响应的超时时间。

The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.

The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.定义在ZooKeeper中存储消费offset的更新时间间隔。这个commit间隔是基于时间而不是消息消费的数量,如果一个错误在两次更新中发生,当重新启动任务是,可能得到重复消息。

More information about these settings can be found here

Creating the thread pool

This example uses the Java java.util.concurrent package for thread management since it makes creating a thread pool very simple.

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
 
    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);
 
    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.execute(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}


First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)

首先我们创建一个Map并且告诉Kafka我们提供给哪些topics多少个线程。这个consumer.createMessageStreams就是我们如何将信息传递给Kafka的方法。返回的是一个用于监听每个topic的KafkaStream的map。(备注,这里我们只是获取了一个topic,但是我们可以通过在Map中增加元素来获取多个topics)

Finally we create the thread pool and pass a new ConsumerTest object to each thread as our business logic.

最终我们创建一个线程池,并且在我们的业务逻辑中为每个线程设置一个ConsumerTest对象。

Clean Shutdown and Error Handling

Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn't been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Kafka不会每次read都更新Zookeeper的message offset,而是每次等待一个短的周期。由于这个延迟,这可能导致你的逻辑已经消费了消息,然而事实上还没有同步到ZooKeeper。所以当你的客户端退出或者崩溃时,你可能在下次启动时发现消息重播了。

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

而且需要记住,有时候一个Broker丢失或者其他导致分区Leader改变,也会导致重复消息重播。

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be 'kill -9'd.

为了避免这个问题,确认你提供了一个clean方法来退出客户端,而不是使用"kill -9".

As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.

作为一个例子,main方法在这里sleep 10秒,允许后台的消费者线程可以消费流数据10秒。由于自动提交,他们每秒都会commit offsets。然后,shutdown方法被调用,它将关闭consumer,然后是关闭ExecutorService,并最终等待ExecutorService完成所有主要工作。这给消费者线程时间来处理完一些可以保留在流中的message。关闭消费者导致每个数据流iterator的hasNext方法一旦所有点的message已经接受并处理则返回false,所以其他的线程应该优雅地关闭。此外,因为启动了自动提交,consumer.shutdown()的调用将提交最终的offset。

try {
    Thread.sleep(10000);
} catch (InterruptedException ie) {
 
}
example.shutdown();

In practice, a more common pattern is to use sleep indefinitely and use a shutdown hook to trigger clean shutdown.

实际上,更常见的模式是无限期地使用休眠,并使用关闭挂钩来触发优雅关闭。

Running the example

The example code expects the following command line parameters:

  • ZooKeeper connection string with port number
  • Consumer Group name to use for this process
  • Topic to consume messages from
  • # of threads to launch to consume the messages

For example:

server01.myco.com1:2181 group3 myTopic  4

Will connect to port 2181 on server01.myco.com for ZooKeeper and requests all partitions from Topic myTopic and consume them via 4 threads. The Consumer Group for this example is group3.

Full Source Code

package com.test.groups;
 
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest"); // 从最早的数据开始消费
        // props.put("auto.offset.reset", "largest "); // 从最新的数据开始消费
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

 

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 8
博文 79
码字总数 169760
作品 0
朝阳
部门经理
使用Kafka的High Level Consumer

为什么使用High Level Consumer 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们仅仅关心数据能被消费就行。High Level 就是用于抽象这类消费动作的...

囚兔 ⋅ 2014/12/31 ⋅ 0

一篇文全面解读Kafka Consumer设计精要

作者介绍 郭俊,专注于大数据架构,熟悉Kafka和Flume源码;熟悉Hadoop和Spark原理;精通数据(仓)库模型设计和SQL调优。个人博客:http://www.jasongj.com/。 注:本文已经作者同意授权转载...

郭俊 ⋅ 2017/09/14 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费...

Adel ⋅ 2016/01/29 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费...

pior ⋅ 2015/12/23 ⋅ 0

Kafka 0.8的一些变化

之前分享了一个英文版的变化,一直没时间去翻译,今天上了下kafka的官网发现0.8的代码能下载了,更值得关注的是0.8的相关文档也更新上来了,上面的一些变化还是很可喜的,说明这套系统还是有...

Gaischen ⋅ 2013/07/09 ⋅ 11

kafka学习笔记:知识点整理(二)

三、kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 brok...

愉快的鱼儿 ⋅ 2017/06/05 ⋅ 0

kafka学习(二)---- Kafka简单的Java版本的Hello World实例

源码git地址:http://git.oschina.net/zhengweishan/Kafka_study_demo github下载地址 我使用的是官网的kafka2.11-0.10.0.0版本,最新的是kafka2.11-0.10.0.1版本,大家自行下载安装配置。点...

火龙战士 ⋅ 2016/08/12 ⋅ 0

kafka 文档 (三)producer(生产者)和高级消费者

API Producer API 此处只简介一个procedure的例子 生产类是用来创建新消息的主题和可选的分区。 如果使用Java你需要包括几个包和支持类: import kafka.javaapi.producer.Producer; import k...

q5725827 ⋅ 2014/05/29 ⋅ 0

Kafka史上最详细原理总结

Kafka Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足...

ychenfeng ⋅ 2017/07/11 ⋅ 0

Using Kafka with Flume

这个文档是 Cloudera Distribution of Apache Kafka 1.3.x. 其他版本的文档在Cloudera Documentation. Using Kafka with Flume 在CDH 5.2.0 及更高的版本中, Flume 包含一个Kafka source an......

晨磊 ⋅ 2015/08/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

笔试题之Java基础部分【简】【一】

基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语法,集合的语法,io 的语法,虚拟机方面的语法,其他 1.length、length()和size() length针对...

anlve ⋅ 19分钟前 ⋅ 1

table eg

user_id user_name full_name 1 zhangsan 张三 2 lisi 李四 `` ™ [========] 2018-06-18 09:42:06 星期一½ gdsgagagagdsgasgagadsgdasgagsa...

qwfys ⋅ 43分钟前 ⋅ 0

一个有趣的Java问题

先来看看源码: public class TestDemo { public static void main(String[] args) { Integer a = 10; Integer b = 20; swap(a, b); System.out......

linxyz ⋅ 48分钟前 ⋅ 0

十五周二次课

十五周二次课 17.1mysql主从介绍 17.2准备工作 17.3配置主 17.4配置从 17.5测试主从同步 17.1mysql主从介绍 MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主...

河图再现 ⋅ 今天 ⋅ 0

docker安装snmp rrdtool环境

以Ubuntu16:04作为基础版本 docker pull ubuntu:16.04 启动一个容器 docker run -d -i -t --name flow_mete ubuntu:16.04 bash 进入容器 docker exec -it flow_mete bash cd ~ 安装基本软件 ......

messud4312 ⋅ 今天 ⋅ 0

OSChina 周一乱弹 —— 快别开心了,你还没有女友呢。

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享吴彤的单曲《好春光》 《好春光》- 吴彤 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :小萝莉街上乱跑,误把我认错成...

小小编辑 ⋅ 今天 ⋅ 8

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

Linux下php访问远程ms sqlserver

1、安装freetds(略,安装在/opt/local/freetds 下) 2、cd /path/to/php-5.6.36/ 进入PHP源码目录 3、cd ext/mssql进入MSSQL模块源码目录 4、/opt/php/bin/phpize生成编译配置文件 5、 . ./...

wangxuwei ⋅ 昨天 ⋅ 0

如何成为技术专家

文章来源于 -- 时间的朋友 拥有良好的心态。首先要有空杯心态,用欣赏的眼光发现并学习别人的长处,包括但不限于工具的使用,工作方法,解决问题以及规划未来的能力等。向别人学习的同时要注...

长安一梦 ⋅ 昨天 ⋅ 0

Linux vmstat命令实战详解

vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix最喜爱的命令...

刘祖鹏 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部