文档章节

Kafka中produer发送消息回调超时错误

z
 zxpost
发布于 2017/02/13 18:07
字数 4230
阅读 455
收藏 1

Kafka版本0.10.1.1

producer发送消息后出现如下错误消息:

 The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append
The producer has a error:Expiring 1 record(s) for testtopic-0 due to 30039 ms has passed since last append

修改timeout.ms,batch.size等均无效,仍然出错。

删掉topic及数据,重新用命令行创建topic后,问题解决。

原因:producer发消息到具体topic的时候,如果没有该topic,kafka会自动创建topic,但可能会出现上面的错误。所以topic一定要命令行创建,然后再使用。

 

参考如下文章:

实践代码采用kafka-clients V0.10.0.0 编写(http://m.blog.csdn.net/article/details?id=51577117)

一、编写producer

第一步:使用./kafka-topics.sh 命令创建topic及partitions 分区数

./kafka-topics.sh --create--zookepper "172.16.49.173:2181" --topic "producer_test" --partitions 10 replication-factor 3

第二步:实现org.apache.kafka.clients.producer.Partitioner 分区接口,以实现自定义的消息分区

import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyPartition implements Partitioner {
    private static Logger LOG = LoggerFactory.getLogger(MyPartition.class);
    public MyPartition() {
        // TODO Auto-generated constructor stub
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // TODO Auto-generated method stub
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionNum = 0;
        try {
            partitionNum = Integer.parseInt((String) key);
        } catch (Exception e) {
            partitionNum = key.hashCode() ;
        }
        LOG.info("the message sendTo topic:"+ topic+" and the partitionNum:"+ partitionNum);
        return Math.abs(partitionNum  % numPartitions);
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

}

第三步:编写 producer

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionTest {
    private static Logger LOG = LoggerFactory.getLogger(PartitionTest.class);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.16.49.173:9092;172.16.49.173:9093");

        props.put("retries", 0);
        // props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        // props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("partitioner.class", "com.goodix.kafka.MyPartition");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("producer_test", "2223132132",
                "test23_60");
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                // TODO Auto-generated method stub
                if (e != null)
                    LOG.error("the producer has a error:" + e.getMessage());
                else {
                    LOG.info("The offset of the record we just sent is: " + metadata.offset());
                    LOG.info("The partition of the record we just sent is: " + metadata.partition());
                }

            }
        });
        try {
            Thread.sleep(1000);
            producer.close();
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

}

备注: 要先用命令创建topic及partitions 分区数;否则在自定义的分区中如果有大于1的情况下,发送数据消息到kafka时会报expired due to timeout while requesting metadata from brokers错误

二、使用Old Consumer High Level API编写consumer

第一步:编写具体处理消息的类

import java.io.UnsupportedEncodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;


public class Consumerwork implements Runnable {
    private static Logger LOG = LoggerFactory.getLogger(Consumerwork.class);
     @SuppressWarnings("rawtypes")
    private KafkaStream m_stream;
     private int m_threadNumber;
     @SuppressWarnings("rawtypes")
    public Consumerwork(KafkaStream a_stream,int a_threadNumber) {
        // TODO Auto-generated constructor stub
         m_threadNumber = a_threadNumber;
         m_stream = a_stream;
    }

    @SuppressWarnings("unchecked")
    @Override
    public void run() {
        // TODO Auto-generated method stub
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
           while (it.hasNext())
                try {
                    MessageAndMetadata<byte[], byte[]> thisMetadata=it.next();
                    String jsonStr = new String(thisMetadata.message(),"utf-8") ;
                    LOG.info("Thread " + m_threadNumber + ": " +jsonStr);
                    LOG.info("partion"+thisMetadata.partition()+",offset:"+thisMetadata.offset());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                } catch (UnsupportedEncodingException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    }
}

第二步:编写启动Consumer主类

mport java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerGroup {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private static Logger LOG = LoggerFactory.getLogger(ConsumerGroup.class);
    public ConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
    public static void main(String[] args) {
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入zookeeper集群地址(如zk1:2181,zk2:2181,zk3:2181):");
        String zooKeeper = sc.nextLine(); 
        System.out.println("请输入指定的消费group名称:");
        String groupId = sc.nextLine(); 
        System.out.println("请输入指定的消费topic名称:");
        String topic = sc.nextLine(); 
        System.out.println("请输入指定的消费处理线程数:");
        int threads = sc.nextInt();
        LOG.info("Starting consumer kafka messages with zk:" + zooKeeper + " and the topic is " + topic);
        ConsumerGroup example = new ConsumerGroup(zooKeeper, groupId, topic);
        example.run(threads);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException ie) {

        }
        // example.shutdown();
    }

    private void shutdown() {
        // TODO Auto-generated method stub
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            LOG.info("Interrupted during shutdown, exiting uncleanly");
        }
    }

    private void run(int a_numThreads) {
        // TODO Auto-generated method stub
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        //
        int threadNumber = 0;
        LOG.info("the streams size is "+streams.size());
        for (final KafkaStream stream : streams) {
            executor.submit(new com.goodix.kafka.oldconsumer.Consumerwork(stream, threadNumber));
    //      consumer.commitOffsets();
            threadNumber++;
        }

    }

    private ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "60000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
//      props.put("rebalance.max.retries", "5");
//      props.put("rebalance.backoff.ms", "15000");
        return new ConsumerConfig(props);
    }

}

1. topicCountMap.put(topic, new Integer(a_numThreads)) 是告诉Kafka我有多少个线程来处理消息。

(1). 这个线程数必须是小等于topic的partition分区数;可以通过./kafka-topics.sh --describe --zookeeper "172.16.49.173:2181" --topic "producer_test"命令来查看分区的情况 
(2). kafka会根据partition.assignment.strategy指定的分配策略来指定线程消费那些分区的消息;这里没有单独配置该项即是采用的默认值range策略(按照阶段平均分配)。比如分区有10个、线程数有3个,则线程 1消费0,1,2,3,线程2消费4,5,6,线程3消费7,8,9。另外一种是roundrobin(循环分配策略),官方文档中写有使用该策略有两个前提条件的,所以一般不要去设定。 
(3). 经过测试:consumerMap.get(topic).size(),应该是获得的目前该topic有数据的分区数 
(4). stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个 partition都只能到一个stream

2. Executors.newFixedThreadPool(a_numThreads)是创建一个创建固定容量大小的缓冲池:每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3. props.put(“auto.offset.reset”, “smallest”) 是指定从最小没有被消费offset开始;如果没有指定该项则是默认的为largest,这样的话该consumer就得不到生产者先产生的消息。

4. 要使用old consumer API需要引用kafka_2.11以及kafka-clients。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

三、使用Old SimpleConsumerAPI编写consumer

这是一个更加底层和复杂的API

使用的场景

由于使用该API需要自己控制的项比较多,也比较复杂,官方给出了一些合适的适用场景,也可以理解成为这些场景是High Level Consumer API 不能够做到的

1. 针对一个消息读取多次
2. 在一个process中,仅仅处理一个topic中的一个partitions
3. 使用事务,确保每个消息只被处理一次

需要处理的事情

1. 必须在程序中跟踪offset值
2. 必须找出指定Topic Partition中的lead broker
3. 必须处理broker的变动

使用SimpleConsumer的步骤

首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变

示例

package com.goodix.kafka.oldconsumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleExample {
    private static Logger LOG = LoggerFactory.getLogger(SimpleExample.class);
    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        Scanner sc = new Scanner(System.in);
        System.out.println("请输入broker节点的ip地址(如172.16.49.173)");
        String brokerIp = sc.nextLine(); 
        List<String> seeds = new ArrayList<String>();
        seeds.add(brokerIp);
        System.out.println("请输入broker节点端口号(如9092)");
        int port = Integer.parseInt( sc.nextLine());
        System.out.println("请输入要订阅的topic名称(如test)");
        String topic = sc.nextLine();
        System.out.println("请输入要订阅要查找的分区(如0)");
        int partition = Integer.parseInt( sc.nextLine());
        System.out.println("请输入最大读取消息数量(如10000)");
        long maxReads = Long.parseLong( sc.nextLine());

        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            LOG.error("Oops:" + e);
             e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //获取指定Topic partition的元数据  
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            LOG.error("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            LOG.error("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                LOG.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    LOG.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                LOG.info("the messag's offset is :"+String.valueOf(messageAndOffset.offset()) + " and the value is :" + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOG.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
    /**
     * 找一个leader broker
     * 遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
     * 根据返回的PartitionMetadata.leader().host()找到leader broker
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return
     * @throws Exception
     */
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper a second to recover
                // second time, assume the broker did recover before failover, or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        LOG.error("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
    /**
     * 
     * @param a_seedBrokers
     * @param a_port
     * @param a_topic
     * @param a_partition
     * @return
     */
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;

        loop:
        for (String seed : a_seedBrokers) { //遍历每个broker 
            SimpleConsumer consumer = null;
            try {
                // 创建Simple Consumer,
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                //发送TopicMetadata Request请求
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
                //取到Topic的Metadata 
                List<TopicMetadata> metaData = resp.topicsMetadata();
                //遍历每个partition的metadata
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        // 判断是否是要找的partition
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            //找到就返回
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                LOG.info("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

四、 使用NewConsumer API

(一)、自动提交offset偏移量

Properties props = new Properties();
//brokerServer(kafka)ip地址,不需要把所有集群中的地址都写上,可是一个或一部分
props.put("bootstrap.servers", "172.16.49.173:9092");
//设置consumer group name,必须设置
props.put("group.id", a_groupId);
//设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
props.put("enable.auto.commit", "true");
//偏移量(offset)提交频率
props.put("auto.commit.interval.ms", "1000");
//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
//如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//设置心跳时间
props.put("session.timeout.ms", "30000");
//设置key以及value的解析(反序列)类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic
consumer.subscribe(Arrays.asList("topic_test"));
while (true) {
    //每次取100条信息
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
 }

需要注意的:

group.id :必须设置 
auto.offset.reset:如果想获得消费者启动前生产者生产的消息,则必须设置为earliest;如果只需要获得消费者启动后生产者生产的消息,则不需要设置该项 
enable.auto.commit(默认值为true):如果使用手动commit offset则需要设置为false,并再适当的地方调用consumer.commitSync(),否则每次启动消费折后都会从头开始消费信息(在auto.offset.reset=earliest的情况下);

(二)、 自己控制偏移量提交

很多时候,我们是希望在获得消息并经过一些逻辑处理后,才认为该消息已被消费,这可以通过自己控制偏移量提交来实现。

示例1:批量提交偏移量

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * 手动批量提交偏移量
 * @author lxh
 *
 */
public class ManualOffsetConsumer {
    private static Logger LOG = LoggerFactory.getLogger(ManualOffsetConsumer.class);
    public ManualOffsetConsumer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g1");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
        //如果采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        final int minBatchSize = 5;  //批量提交数量
         List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 LOG.info("consumer message values is "+record.value()+" and the offset is "+ record.offset());
                 buffer.add(record);
             }
             if (buffer.size() >= minBatchSize) {
                 LOG.info("now commit offset");
                 consumer.commitSync();
                 buffer.clear();
             }
         }
    }

}

示例2:消费完一个分区后手动提交偏移量

package com.goodix.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消费完一个分区后手动提交偏移量
 * @author lxh
 *
 */
public class ManualCommitPartion {
    private static Logger LOG = LoggerFactory.getLogger(ManualCommitPartion.class);
    public ManualCommitPartion() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        //props.put("bootstrap.servers", bootstrapServers);//"172.16.49.173:9092;172.16.49.173:9093");
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id","manual_g2");

        props.put("enable.auto.commit", "false");

        //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
        //如果采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.subscribe(Arrays.asList("producer_test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    LOG.info("now consumer the message it's offset is :"+record.offset() + " and the value is :" + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                LOG.info("now commit the partition[ "+partition.partition()+"] offset");
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }

}

(三)、指定消费某个分区的消息

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 消费指定分区的消息
 * @author lxh
 *
 */
public class ManualPartion {
    private static Logger LOG = LoggerFactory.getLogger(ManualPartion.class);
    public ManualPartion() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        //设置brokerServer(kafka)ip地址
        props.put("bootstrap.servers", "172.16.49.173:9092");
        //设置consumer group name
        props.put("group.id", "manual_g4");
        //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率
        props.put("enable.auto.commit", "true");
        //偏移量(offset)提交频率
        props.put("auto.commit.interval.ms", "1000");
        //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
        //如果采用latest,消费者只能得道其启动后,生产者生产的消息
        props.put("auto.offset.reset", "earliest");
        //
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        TopicPartition partition0 = new TopicPartition("producer_test", 0);
        TopicPartition partition1 = new TopicPartition("producer_test", 1);
        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);
        consumer.assign(Arrays.asList(partition0, partition1));
        while (true) {
              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
              for (ConsumerRecord<String, String> record : records)
                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());
              try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

总结

使用newConsumer API 只需要引用kafka-clients即可 
newConsumer API 更加易懂、易用

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

官方对于consumer与partition的建议

1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的

参考文章: 
Java线程池使用说明 
Java并发编程:线程池的使用 
如何确定Kafka的分区数、key和consumer线程数
Kafka Consumer接口

 

如何发送15MB的message到KAFKA里面,StackOverflow的答复:

Minor changes required for Kafka 0.10 and the new consumer compared to laughing_man's answer:

  • Broker: No changes, you still need to increase properties message.max.bytes and replica.fetch.max.bytesmessage.max.bytes has to be smaller than replica.fetch.max.bytes.
  • Producer: Increase max.request.size to send the larger message.
  • Consumer: Increase max.partition.fetch.bytes to receive larger messages.

5down vote

The idea is to have equal size of message being sent from Kafka Producer to Kafka Broker and then received by Kafka Consumer i.e.

Kafka producer --> Kafka Broker --> Kafka Consumer

Suppose if the requirement is to send 15MB of message, then the Producer, the Broker and the Consumer, all three, needs to be in sync.

Kafka Producer sends 15 MB --> Kafka Broker Allows/Stores 15 MB --> Kafka Consumer receives 15 MB

The setting therefore should be A.) On Broker: message.max.bytes=15728640 replica.fetch.max.bytes=15728640

B.) On Consumer: fetch.message.max.bytes=15728640

© 著作权归作者所有

共有 人打赏支持
z
粉丝 1
博文 409
码字总数 21359
作品 0
成都
玩转Kafka的生产者——分区器与多线程

上篇文章学习kafka的基本安装和基础概念,本文主要是学习kafka的常用API。其中包括生产者和消费者, 多线程生产者,多线程消费者,自定义分区等,当然还包括一些避坑指南。 首发于个人网站:...

Janti
08/09
0
0
源码分析Kafka之Producer

Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。 要使用...

MageekChiu
01/30
0
0
从源码分析如何优雅的使用 Kafka 生产者

前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 作为中间件。 其中有朋友咨询在大量消息的情况下 是如何保证消息的高效及一致性呢? 正好以这个问题结合 的源码讨论下如何...

crossoverJie
10/11
0
0
Kafka源码剖析 —— 生产者消息发送流水线上的大体流程

一、唤醒Selector 上篇文章说到,消息是怎么被生产出来,简单来说,就是消息被追加到了RecordAccumulator中,以ByteBuffer的形式存了下来。 那么在消息添加到ByteBuffer中后,后续的步骤又是...

Anur
09/14
0
0
Learning Apache Kafka 2nd Edition读书笔记

Chap 1 Kafka简介 1.Apache Kafka是一款开源的,分布式的,基于分区、日志提交和订阅推送的消息系统。设计用于: 持久化消息到硬盘,TB级别 高吞吐量,每秒数百M读写 分布式,支持弹性伸缩 ...

GunnerAha
04/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux使用ntfs-3g操作ntfs格式硬盘

Linux内核目前只支持对微软NTFS文件系统的读取。 NTFS-3G 是微软 NTFS 文件系统的一个开源实现,同时支持读和写。NTFS-3G 开发者使用 FUSE 文件系统来辅助开发,同时对可移植性有益。 安装 ...

linuxprobe16
今天
1
0
kubeadm部署kubernetes集群

一、环境要求 这里使用RHEL7.5 master、etcd:192.168.10.101,主机名:master node1:192.168.10.103,主机名:node1 node2:192.168.10.104,主机名:node2 所有机子能基于主机名通信,编辑...

人在艹木中
今天
7
0
Shell特殊符号总结以及cut,sort,wc,uniq,tee,tr,split命令

特殊符号总结一 * 任意个任意字符 ? 任意一个字符 # 注释字符 \ 脱义字符 | 管道符 # #号后的备注被忽略[root@centos01 ~]# ls a.txt # 备注 a.txt[root@centos01 ~]# a=1[root@centos01...

野雪球
今天
2
0
OSChina 周二乱弹 —— 程序员圣衣

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @达尔文:分享Skeeter Davis的单曲《The End of the World》 《The End of the World》- Skeeter Davis 手机党少年们想听歌,请使劲儿戳(这里...

小小编辑
今天
17
0
[ python import module ] 导入模块

import moudle_name ----> import module_name.py ---> import module_name.py文件路径 -----> sys.path (这里进行查找文件) # from app.web import Personimport app.web.Person as Pe......

_______-
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部