文档章节

Kafka Simple Consumer

散关清渭
 散关清渭
发布于 2014/11/21 12:39
字数 431
阅读 431
收藏 1

行业解决方案、产品招募中!想赚钱就来传!>>>

先贴代码  回家接着写解释 ~~

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class Examples {
	private static List<String> m_replicaBrokers = new ArrayList<String>();

	public static void main(String[] args) throws UnsupportedEncodingException {
		String topic = "test.1";
		int partition = 0;

		List<String> seeds = new ArrayList<String>();
		// seeds.add("127.0.0.1:2181");
		seeds.add("192.168.166.244:9092");
		seeds.add("192.168.166.244:8092");
		seeds.add("192.168.166.244:7092");

		PartitionMetadata metadata = findLeader(seeds, topic, partition);
		Broker broker = metadata.leader();
		List<Broker> replicas = metadata.replicas();
		System.out.println("Leader Broker Info : " + broker);
		System.out.println("Replicas Broker Info : " + replicas.size());
		for (Broker replica : replicas) {
			System.out.println("Replica Broker Info : " + replica);
		}

		String clientName = "Client_" + broker.host() + "_" + broker.port();

		String host = broker.host();
		int port = broker.port();
		SimpleConsumer consumer = new SimpleConsumer(host, port, 3 * 1000,
				64 * 1024, clientName);

		long offset = getLastOffset(consumer, topic, partition,
				kafka.api.OffsetRequest.EarliestTime(), clientName);

		System.out.println("last offset : " + offset);

		kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
				.clientId(clientName).addFetch(host, port, offset, 100000)
				.build();
		FetchResponse fetchResponse = consumer.fetch(fetchRequest);

		if (fetchResponse.hasError()) {
			// process fetch failed
		}

		for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
				topic, partition)) {
			long currentOffset = messageAndOffset.offset();
			if (currentOffset < offset) {
				System.out.println("Found an old offset: " + currentOffset
						+ " Expecting: " + offset);
				continue;
			}
			offset = messageAndOffset.nextOffset();
			ByteBuffer payload = messageAndOffset.message().payload();

			byte[] bytes = new byte[payload.limit()];
			payload.get(bytes);
			System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
					+ new String(bytes, "UTF-8"));
		}

	}

	private static PartitionMetadata findLeader(List<String> a_seedBrokers,
			String a_topic, int a_partition) {

		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			String[] array = seed.split(":");
			String host = array[0];
			int port = Integer.valueOf(array[1]);

			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(host, port, 3 * 1000, 8 * 1024,
						"leaderLookup");

				List<String> topics = Collections.singletonList(a_topic);
				System.out.println("Topics : "
						+ Arrays.toString(topics.toArray()));

				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				System.out.println(req.describe(true));

				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();

				for (TopicMetadata item : metaData) {
					System.out.println("Topic & Metadata : " + item.topic());
					// for (PartitionMetadata data : item.partitionsMetadata())
					// {
					// System.out.println(data.);
					// }
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}

		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}

		return returnMetaData;
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);

		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		PartitionOffsetRequestInfo offsetRequestInfo = new PartitionOffsetRequestInfo(
				whichTime, 1);
		requestInfo.put(topicAndPartition, offsetRequestInfo);

		OffsetRequest request = new OffsetRequest(requestInfo,
				kafka.api.OffsetRequest.CurrentVersion(), clientName);

		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}

		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}
}


上一篇: IPython
散关清渭
粉丝 23
博文 238
码字总数 166498
作品 0
东城
程序员
私信 提问
加载中
请先登录后再评论。
ZBUS高可用HA介绍

请参考最新文档 http://zbus.io/guide/ha?menu=ha http://git.oschina.net/rushmore/zbus 1. ZBUS 高可用设计 Zbus高可用采用ZbusServer + TrackServer结合完成,相对于单机版本的zbus,客户...

少帮主
2015/12/21
5.9K
6
日志分析(二)jvm agent+kafka+es +kibana 的OLAP日志分析系统

jvm agent+kafka+es +kibana 一般web业务的场景都包含了分布式,web事务的多应用间跳转,各层次间实现负载均衡等。日志分析OLAP的技术要求就需要跟踪夸web应用的多层次、多应用的链式请求,针...

venuser
2015/12/14
956
4
利用java8新特性实现类似javascript callback特性

Java8的新特性之一,就是首次引入了函数式编程Lambda表达式,按oracle的说法,是为了引导java向函数式编程的方向发展。 在JDK1.8中,多了一个包,java.util.function,这里主要用到了这个包下...

Acce1erator
2015/12/02
3.4K
3
Spark Streaming使用Kafka保证数据零丢失

源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正。原文链接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/spark_streaming使用kafka保证数据零丢...

jacksu
2016/01/16
945
0
如何给Vanilla(OpenResty)添加一个路由协议

源起 QQ群经常看到有同学问(Vanilla/OpenResty开发:205773855,OpenResty技术交流2群:481213820): 如何让Vanilla支持Restful(或者Vanilla如何支持xxxx样子的URL访问)? Vanilla的路由...

iDev_周晶
2016/01/16
996
1

没有更多内容

加载失败,请刷新页面

加载更多

Azure Application Gateway(一)对后端 Web App 进行负载均衡

一,引言   今天,我们学习一个新的知识点-----Azure Application Gateway,通过Azure 应用程序网关为我么后端的服务提供负载均衡的功能。我们再文章头中大概先了解一下什么是应用程序网关...

osc_lc4icfkt
14分钟前
0
0
WoLai(我来) 注册码 ——国内版 notion 【笔记】

注册码: SQGYG23 ❤ W4T9PKP JLTHNJP KMTXK7P JDHKJEM KRJXX5P 6M7PPAP DEGLMG3 N3BZMRI 87BR22I GSIWGWP GNGBNTI QA8URIM UDUV9VM IHKJA7P MD9ZA3M 3XR67ZI TBUP9JX TI4TYMM 注册完了可以把......

osc_c05lkk3u
15分钟前
4
0
2020hdu多校第二场比赛及补题

这一场我们队只A了一题 1010 Lead of Wisdom 直接爆搜,但是T了好几发,剪了下枝 如果一个物品的a,b,c,d都比不上另外一个同种物品的a,b,c,d,那这个物品就可以直接淘汰掉了 #include<iostrea...

osc_usgpahnw
16分钟前
21
0
为什么Java有瞬态字段? - Why does Java have transient fields?

问题: 为什么Java有瞬态字段? 解决方案: 参考一: https://stackoom.com/question/3opS/为什么Java有瞬态字段 参考二: https://oldbug.net/q/3opS/Why-does-Java-have-transient-fields...

富含淀粉
17分钟前
16
0
轻松搭建CAS 5.x系列(6)-在CAS Server上增加OAuth2.0协议

概述说明 CAS Server默认搭建出来,客户端程序只能按照CAS自身的协议接入。CAS的强大在于,有官方的插件,可以支持其他的协议。本章节就让CAS Server怎么增加OAuth2.0的登录协议。 安装步骤 ...

osc_inj0cicw
18分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部