文档章节

Kafka Simple Consumer

散关清渭
 散关清渭
发布于 2014/11/21 12:39
字数 431
阅读 300
收藏 1

先贴代码  回家接着写解释 ~~

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class Examples {
	private static List<String> m_replicaBrokers = new ArrayList<String>();

	public static void main(String[] args) throws UnsupportedEncodingException {
		String topic = "test.1";
		int partition = 0;

		List<String> seeds = new ArrayList<String>();
		// seeds.add("127.0.0.1:2181");
		seeds.add("192.168.166.244:9092");
		seeds.add("192.168.166.244:8092");
		seeds.add("192.168.166.244:7092");

		PartitionMetadata metadata = findLeader(seeds, topic, partition);
		Broker broker = metadata.leader();
		List<Broker> replicas = metadata.replicas();
		System.out.println("Leader Broker Info : " + broker);
		System.out.println("Replicas Broker Info : " + replicas.size());
		for (Broker replica : replicas) {
			System.out.println("Replica Broker Info : " + replica);
		}

		String clientName = "Client_" + broker.host() + "_" + broker.port();

		String host = broker.host();
		int port = broker.port();
		SimpleConsumer consumer = new SimpleConsumer(host, port, 3 * 1000,
				64 * 1024, clientName);

		long offset = getLastOffset(consumer, topic, partition,
				kafka.api.OffsetRequest.EarliestTime(), clientName);

		System.out.println("last offset : " + offset);

		kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
				.clientId(clientName).addFetch(host, port, offset, 100000)
				.build();
		FetchResponse fetchResponse = consumer.fetch(fetchRequest);

		if (fetchResponse.hasError()) {
			// process fetch failed
		}

		for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
				topic, partition)) {
			long currentOffset = messageAndOffset.offset();
			if (currentOffset < offset) {
				System.out.println("Found an old offset: " + currentOffset
						+ " Expecting: " + offset);
				continue;
			}
			offset = messageAndOffset.nextOffset();
			ByteBuffer payload = messageAndOffset.message().payload();

			byte[] bytes = new byte[payload.limit()];
			payload.get(bytes);
			System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
					+ new String(bytes, "UTF-8"));
		}

	}

	private static PartitionMetadata findLeader(List<String> a_seedBrokers,
			String a_topic, int a_partition) {

		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			String[] array = seed.split(":");
			String host = array[0];
			int port = Integer.valueOf(array[1]);

			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(host, port, 3 * 1000, 8 * 1024,
						"leaderLookup");

				List<String> topics = Collections.singletonList(a_topic);
				System.out.println("Topics : "
						+ Arrays.toString(topics.toArray()));

				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				System.out.println(req.describe(true));

				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();

				for (TopicMetadata item : metaData) {
					System.out.println("Topic & Metadata : " + item.topic());
					// for (PartitionMetadata data : item.partitionsMetadata())
					// {
					// System.out.println(data.);
					// }
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}

		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}

		return returnMetaData;
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);

		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		PartitionOffsetRequestInfo offsetRequestInfo = new PartitionOffsetRequestInfo(
				whichTime, 1);
		requestInfo.put(topicAndPartition, offsetRequestInfo);

		OffsetRequest request = new OffsetRequest(requestInfo,
				kafka.api.OffsetRequest.CurrentVersion(), clientName);

		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}

		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}
}


© 著作权归作者所有

共有 人打赏支持
散关清渭
粉丝 24
博文 238
码字总数 166498
作品 0
东城
程序员
4-kafka0.10 新消费者使用

Consumer Client 本节主要介绍Kafka从一些topic消费数据的示例。 配置 使用新版的Consumer,需要先在工程中添加kafka-clients依赖,添加的配置信息如下: 初始化与配置 Consumer的创建过程与...

李矮矮
2016/09/12
83
0
Spark Streaming + Kafka Integration Guide

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partition......

刺猬一号
07/18
0
0
喵了个咪/See-KafKa

#See-KafKa 简单舒适的PHP-KafKa拓展 ##前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通...

喵了个咪
2016/09/27
0
0
简单舒适的 PHP-KafKa 拓展--See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪
2016/09/27
2.1K
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

困扰当前数据中心管理的三大难题

导读 当企业发展到一定程度,或者之前的机房不能满足现在的数据中心使用时,企业会对数据中心进行迁移。那么在数据中心进行迁移的时候会遇到哪些风险呢?针对这些风险我们应该做出怎样的措施来...

问题终结者
7分钟前
0
0
设计模式:工厂方法模式(工厂模式)

工厂方法模式才是真正的工厂模式,前面讲到的静态工厂模式实际上不能说是一种真正意义上的设计模式,只是一种变成习惯。 工厂方法的类图: 这里面涉及到四个种类: 1、抽象产品: Product 2、...

京一
24分钟前
0
0
区块链和数据库,技术到底有何区别?

关于数据库和区块链,总会有很多的困惑。区块链其实是一种数据库,因为他是数字账本,并且在区块的数据结构上存储信息。数据库中存储信息的结构被称为表格。但是,区块链是数据库,数据库可不...

HiBlock
31分钟前
0
0
react native 开发碰到的问题

react-navigation v2 问题 问题: static navigationOptions = ({navigation, navigationOptions}) => ({ headerTitle: ( <Text style={{color:"#fff"}}>我的</Text> ), headerRight: ( <View......

罗培海
38分钟前
0
0
Mac Docker安装流程

久仰Docker大名已久,于是今天趁着有空,尝试了一下Docker 先是从docker的官网上下载下来mac版本的docker安装包,安装很简易,就直接拖图标就好了。 https://www.docker.com/products/docker...

writeademo
46分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部