文档章节

使用Kafka的High Level Consumer

囚兔
 囚兔
发布于 2014/12/31 08:57
字数 660
阅读 816
收藏 2

##为什么使用High Level Consumer

  • 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们仅仅关心数据能被消费就行。High Level 就是用于抽象这类消费动作的。

  • 消息消费已Consumer Group为单位,每个Consumer Group中可以有多个consumer,每个consumer是一个线程,topic的每个partition同时只能被某一个consumer读取,Consumer Group对应的每个partition都有一个最新的offset的值,存储在zookeeper上的。所以不会出现重复消费的情况。

##设计High Level Consumer High Level Consumer 可以并且应该被使用在多线程的环境,线程模型中线程的数量(也代表group中consumer的数量)和topic的partition数量有关,下面列举一些规则:

  1. 当提供的线程数量多于partition的数量,则部分线程将不会接收到消息;
  2. 当提供的线程数量少于partition的数量,则部分线程将从多个partition接收消息;
  3. 当某个线程从多个partition接收消息时,不保证接收消息的顺序;可能出现从partition3接收5条消息,从partition4接收6条消息,接着又从partition3接收10条消息;
  4. 当添加更多线程时,会引起kafka做re-balance, 可能改变partition和线程的对应关系。

##代码示例 ConsumerGroupExample

package com.test.groups;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
	private final ConsumerConnector consumer;
	private final String topic;
	private  ExecutorService executor;

	public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
				createConsumerConfig(a_zookeeper, a_groupId));
		this.topic = a_topic;
	}

	public void shutdown() {
		if (consumer != null) consumer.shutdown();
		if (executor != null) executor.shutdown();
	}

	public void run(int a_numThreads) {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(a_numThreads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

		// now launch all the threads
		//
		executor = Executors.newFixedThreadPool(a_numThreads);

		// now create an object to consume the messages
		//
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			executor.submit(new ConsumerTest(stream, threadNumber));
			threadNumber++;
		}
	}

	private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
		Properties props = new Properties();
		props.put("zookeeper.connect", a_zookeeper);
		props.put("group.id", a_groupId);
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");

		return new ConsumerConfig(props);
	}

	public static void main(String[] args) {
		String zooKeeper = args[0];
		String groupId = args[1];
		String topic = args[2];
		int threads = Integer.parseInt(args[3]);

		ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
		example.run(threads);

		try {
			Thread.sleep(10000);
		} catch (InterruptedException ie) {

		}
		example.shutdown();
	}
}

ConsumerTest

package com.test.groups;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerTest implements Runnable {
	private KafkaStream m_stream;
	private int m_threadNumber;

	public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
		m_threadNumber = a_threadNumber;
		m_stream = a_stream;
	}

	public void run() {
		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
		while (it.hasNext())
			System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
		System.out.println("Shutting down Thread: " + m_threadNumber);
	}
}

© 著作权归作者所有

共有 人打赏支持
囚兔

囚兔

粉丝 38
博文 82
码字总数 46066
作品 1
南京
程序员
kafka学习笔记:知识点整理(二)

三、kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 brok...

愉快的鱼儿
2017/06/05
0
0
一篇文全面解读Kafka Consumer设计精要

作者介绍 郭俊,专注于大数据架构,熟悉Kafka和Flume源码;熟悉Hadoop和Spark原理;精通数据(仓)库模型设计和SQL调优。个人博客:http://www.jasongj.com/。 注:本文已经作者同意授权转载...

郭俊
2017/09/14
0
0
Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费...

Adel
2016/01/29
36
0
Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费...

pior
2015/12/23
76
0
消息服务百科全书——消息投递语义

消息投递语义(Message delivery semantics) 有如下几种可能的消息传递保障: 1、At most once:消息可能丢失,但是不会重复。 2、At least once:消息不会丢失,但是可能重复。系统保证每条...

PaaS小魔仙
07/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

qduoj~前端~二次开发~打包docker镜像并上传到阿里云容器镜像仓库

上一篇文章https://my.oschina.net/finchxu/blog/1930017记录了怎么在本地修改前端,现在我要把我的修改添加到部署到本地的前端的docker容器中,然后打包这个容器成为一个本地镜像,然后把这...

虚拟世界的懒猫
今天
1
0
UML中 的各种符号含义

Class Notation A class notation consists of three parts: Class Name The name of the class appears in the first partition. Class Attributes Attributes are shown in the second par......

hutaishi
今天
1
0
20180818 上课截图

小丑鱼00
今天
1
0
Springsecurity之SecurityContextHolderStrategy

注:下面分析的版本是spring-security-4.2.x,源码的github地址是: https://github.com/spring-projects/spring-security/tree/4.2.x 先上一张图: 图1 SecurityContextHolderStrategy的三个......

汉斯-冯-拉特
今天
1
0
LNMP架构(Nginx负载均衡、ssl原理、生成ssl密钥对、Nginx配置ssl)

Nginx负载均衡 网站的访问量越来越大,服务器的服务模式也得进行相应的升级,比如分离出数据库服务器、分离出图片作为单独服务,这些是简单的数据的负载均衡,将压力分散到不同的机器上。有时...

蛋黄_Yolks
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部