文档章节

SimpleConsumer(翻译)

Yulong_
 Yulong_
发布于 2017/08/17 14:43
字数 2570
阅读 11
收藏 0

Using SimpleConsumer

Why use SimpleConsumer?

The main reason to use a SimpleConsumer implementation is you want greater control over partition consumption than Consumer Groups give you.

使用一个SimpleConsumer 实现的最主要原因是你想要更强地控制分区消费而不是使用Consumer Groups。

For example you want to:

  1. Read a message multiple times多次消费一条数据
  2. Consume only a subset of the partitions in a topic in a process在程序中只消费一个topic的部分分区点的数据
  3. Manage transactions to make sure a message is processed once and only once管理事务确保每条信息只被处理一次

Downsides of using SimpleConsumer

The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups:

SimpleConsumer确实需要做大量的工作,而这些工作在Consumer Groups中是不需要的。

  1. You must keep track of the offsets in your application to know where you left off consuming.你必须自己追踪offset来知道已经消费到什么位置。
  2. You must figure out which Broker is the lead Broker for a topic and partition你必须对于某个topic分区来说,哪个Broker是lead Broker
  3. You must handle Broker leader changes你必须需处理Broker leader的改变。

Steps for using a SimpleConsumer

  1. Find an active Broker and find out which Broker is the leader for your topic and partition找出对于你的topic分区来说的active Broker,并且找出哪个Broker是leader
  2. Determine who the replica Brokers are for your topic and partition确定对于你的topic分区来说,哪些是replica Brokers
  3. Build the request defining what data you are interested in建立request,定义你感兴趣点的数据
  4. Fetch the data获取数据
  5. Identify and recover from leader changes在leader改变时,识别并恢复

Finding the Lead Broker for a Topic and Partition

The easiest way to do this is to pass in a set of known Brokers to your logic, either via a properties file or the command line. These don’t have to be all the Brokers in the cluster, rather just a set where you can start looking for a live Broker to query for Leader information.

最简单的方法,通过配置文件或者命令行,传递已知Brokers集合。没必要获取集群中所有的Brokers,只需要一部分live brokers你能用来查询leader信息即可。

private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
 
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                List<TopicMetadata> metaData = resp.topicsMetadata();
 
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }

The call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in.

调用topicsMetadata()访问已连接的Broker来获取我们感兴趣的topic的所有细节。

The loop on partitionsMetadata iterates through all the partitions until we find the one we want. Once we find it, we can break out of all the loops.

partitionsMetadata 在所有分区上循环知道找出我们想要的那个。一旦找到,退出循环。

Finding Starting Offset for Reads

Now define where to start reading data. Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time.

现在定义从哪里开始读取数据。Kafka包含两个有用的常量。

kafka.api.OffsetRequest.EarliestTime() 找得到logs中数据开始位置,并且从那里开始流动。

kafka.api.OffsetRequest.LatestTime() 只流过新消息。不要以为offset0为最早的offset,因为message随着时间会过期。

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
 
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

 

Error Handling

Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it.

SimpleConsumer不会处理lead Broker 失败,你不得不写一点代码来处理它。

if (fetchResponse.hasError()) {
     numErrors++;
     // Something went wrong!
     short code = fetchResponse.errorCode(a_topic, a_partition);
     System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
     if (numErrors > 5) break;
 
     if (code == ErrorMapping.OffsetOutOfRangeCode())  {
         // We asked for an invalid offset. For simple case ask for the last element to reset
         readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
         continue;
     }
     consumer.close();
     consumer = null;
     leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
     continue;
 }

Here, once the fetch returns an error, we log the reason, close the consumer then try to figure out who the new leader is.

这里,一旦fetch返回错误,我们会记录原因,关闭consumer然后尝试指出新的leader

private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
       for (int i = 0; i < 3; i++) {
           boolean goToSleep = false;
           PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
           if (metadata == null) {
               goToSleep = true;
           } else if (metadata.leader() == null) {
               goToSleep = true;
           } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
               // first time through if the leader hasn't changed give ZooKeeper a second to recover
               // second time, assume the broker did recover before failover, or it was a non-Broker issue
               //
               goToSleep = true;
           } else {
               return metadata.leader().host();
           }
           if (goToSleep) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException ie) {
               }
           }
       }
       System.out.println("Unable to find new leader after Broker failure. Exiting");
       throw new Exception("Unable to find new leader after Broker failure. Exiting");
   }

This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard

这个方法将使用我们之前定义的findLeader()来找到新的leader,这里不同的是我们只是尝试连接topic/partition的副本中的个。这个方法如果我们不能到达任何一个包含我们需要topic对应的Brokers时,将强制退出。

Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.

可能ZooKeeper需要一小段时间来决定leader丢失,并且指定一个新的leader。这个方法中循环几次获取新leader的响应,如果没有获得响应,将sleep。在现实ZooKeeper中,failover的速度非常快,以至于你never sleep.

Reading the Data

Finally we read the data being streamed back and write it out. 

最终我们读取数据。

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
        .clientId(clientName)
        .addFetch(a_topic, a_partition, readOffset, 100000)
        .build();
FetchResponse fetchResponse = consumer.fetch(req);
 
if (fetchResponse.hasError()) {
        // See code in previous section
}
numErrors = 0;
 
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
    long currentOffset = messageAndOffset.offset();
    if (currentOffset < readOffset) {
        System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
        continue;
    }
    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = messageAndOffset.message().payload();
 
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
    numRead++;
    a_maxReads--;
}
 
if (numRead == 0) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ie) {
    }
}

Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.

 

Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.

而且注意,我们明确地的检测正在读得offset不少于我们要求的offset。这就需要如果Kafka对消息进行了压缩,fetch request将返回整个压缩块,及时request offset不是压缩块点的最开始位置。 因此我们之前看到一条消息可能会再次获取。而且我们设置fetchSize为100000bytes,而Kafka producer写了个更大的batches,那么这个值可能不够大,并且可能返回一个空的消息集合。这种情况下,fetchSize应该增大至不会返回一个空集合为止。

Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.

 

Running the example

The example expects the following parameters:

  • Maximum number of messages to read (so we don’t loop forever)
  • Topic to read from
  • Partition to read from
  • One broker to use for Metadata lookup
  • Port the brokers listen on 

Full Source Code

package com.test.simple;
 
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
 
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
public class SimpleExample {
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        long maxReads = Long.parseLong(args[0]);
        String topic = args[1];
        int partition = Integer.parseInt(args[2]);
        List<String> seeds = new ArrayList<String>();
        seeds.add(args[3]);
        int port = Integer.parseInt(args[4]);
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
             e.printStackTrace();
        }
    }
 
    private List<String> m_replicaBrokers = new ArrayList<String>();
 
    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }
 
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
 
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
 
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);
 
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
 
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
 
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }
 
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }
 
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
 
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
 
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
 
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 9
博文 115
码字总数 194116
作品 0
朝阳
部门经理
私信 提问
Kafka JAVA客户端代码示例--高级应用

什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用事务,确保每个消息只被处理一次 使用高级应用(调用较底层函数)的缺点? Simple...

cloud-coder
2014/08/08
0
0
kafka学习笔记:知识点整理(二)

三、kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 brok...

愉快的鱼儿
2017/06/05
0
0
Spark Streaming Source Kafka 0.8.2

描述 针对kafka0.8.2的API,Spark Streaming有两个版本的Source,Receiver和DirectAPI,其中Receiver模式使用HighLevel对应为KafkaInputDStream,继承自ReceiverInputDStream再继承InputDStr...

lioversky
08/01
0
0
8-kafka消费者0.8 0.9 版本高级API和简单API

使用Old Consumer High Level API编写consumer 第一步:编写具体处理消息的类 第二步:编写启动Consumer主类 1. topicCountMap.put(topic, new Integer(a_numThreads)) 是告诉Kafka我有多少个...

李矮矮
2016/09/22
327
0
第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容: 1,Direct Access 2,Kafka 使用No Receiver有更强的控制度和语义一致性。接下来我们以Kafka为例,讲解不使用Receiver而直接从Kafka的Broker上读取数据,这种方式成为Direct方式。...

葛晨鑫
2016/05/29
262
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot jpa 访问数据库

spring boot(五):spring data jpa的使用 SpringBoot非官方教程 | 第三篇:SpringBoot用JdbcTemplates访问Mysql

颖辉小居
15分钟前
3
0
Quartz + Oracle 分布式Job实现

简述 实现 依赖包 <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.3</version><exclusions><exclusion><groupId>com.zax......

呼呼南风
20分钟前
4
0
崛起于Springboot2.X之集成规则引擎Drools(41)

1、创建Springboot项目,勾选Web模块和lombok插件 2、添加pom其他依赖 <dependency> <groupId>org.kie</groupId> <artifactId>kie-spring</artifactId> <version>7.11.0.Fin......

木九天
28分钟前
4
0
Kylin修改context-path方法

Kylin修改context-path比较麻烦,本身kylin是前后端分离的工程,修改不是只改war,还得改前端路径 步骤如下: 1、tomcat webapps 修改 kylin.war 修改成test.war 2、vi test.war 从index.htm...

litterMo
29分钟前
2
0
怎样使用Chrome浏览器截取整个网页?

1、打开控制台:Ctrl + Shift + I 2、命令执行:Ctrl + Shift + P 3、Capture full size screenshot 浏览器会自动下载截图图片,推荐过几个扩展功能: 1.全屏幕截取网页编辑共享神器-Awesom...

近在咫尺远在天涯
32分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部