文档章节

Kafka Simple Consumer

散关清渭
 散关清渭
发布于 2014/11/21 12:39
字数 431
阅读 300
收藏 1
点赞 0
评论 0

先贴代码  回家接着写解释 ~~

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

public class Examples {
	private static List<String> m_replicaBrokers = new ArrayList<String>();

	public static void main(String[] args) throws UnsupportedEncodingException {
		String topic = "test.1";
		int partition = 0;

		List<String> seeds = new ArrayList<String>();
		// seeds.add("127.0.0.1:2181");
		seeds.add("192.168.166.244:9092");
		seeds.add("192.168.166.244:8092");
		seeds.add("192.168.166.244:7092");

		PartitionMetadata metadata = findLeader(seeds, topic, partition);
		Broker broker = metadata.leader();
		List<Broker> replicas = metadata.replicas();
		System.out.println("Leader Broker Info : " + broker);
		System.out.println("Replicas Broker Info : " + replicas.size());
		for (Broker replica : replicas) {
			System.out.println("Replica Broker Info : " + replica);
		}

		String clientName = "Client_" + broker.host() + "_" + broker.port();

		String host = broker.host();
		int port = broker.port();
		SimpleConsumer consumer = new SimpleConsumer(host, port, 3 * 1000,
				64 * 1024, clientName);

		long offset = getLastOffset(consumer, topic, partition,
				kafka.api.OffsetRequest.EarliestTime(), clientName);

		System.out.println("last offset : " + offset);

		kafka.api.FetchRequest fetchRequest = new FetchRequestBuilder()
				.clientId(clientName).addFetch(host, port, offset, 100000)
				.build();
		FetchResponse fetchResponse = consumer.fetch(fetchRequest);

		if (fetchResponse.hasError()) {
			// process fetch failed
		}

		for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
				topic, partition)) {
			long currentOffset = messageAndOffset.offset();
			if (currentOffset < offset) {
				System.out.println("Found an old offset: " + currentOffset
						+ " Expecting: " + offset);
				continue;
			}
			offset = messageAndOffset.nextOffset();
			ByteBuffer payload = messageAndOffset.message().payload();

			byte[] bytes = new byte[payload.limit()];
			payload.get(bytes);
			System.out.println(String.valueOf(messageAndOffset.offset()) + ": "
					+ new String(bytes, "UTF-8"));
		}

	}

	private static PartitionMetadata findLeader(List<String> a_seedBrokers,
			String a_topic, int a_partition) {

		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			String[] array = seed.split(":");
			String host = array[0];
			int port = Integer.valueOf(array[1]);

			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(host, port, 3 * 1000, 8 * 1024,
						"leaderLookup");

				List<String> topics = Collections.singletonList(a_topic);
				System.out.println("Topics : "
						+ Arrays.toString(topics.toArray()));

				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				System.out.println(req.describe(true));

				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();

				for (TopicMetadata item : metaData) {
					System.out.println("Topic & Metadata : " + item.topic());
					// for (PartitionMetadata data : item.partitionsMetadata())
					// {
					// System.out.println(data.);
					// }
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}

		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}

		return returnMetaData;
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);

		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		PartitionOffsetRequestInfo offsetRequestInfo = new PartitionOffsetRequestInfo(
				whichTime, 1);
		requestInfo.put(topicAndPartition, offsetRequestInfo);

		OffsetRequest request = new OffsetRequest(requestInfo,
				kafka.api.OffsetRequest.CurrentVersion(), clientName);

		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}

		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}
}


© 著作权归作者所有

共有 人打赏支持
散关清渭
粉丝 24
博文 159
码字总数 166498
作品 0
东城
程序员
4-kafka0.10 新消费者使用

Consumer Client 本节主要介绍Kafka从一些topic消费数据的示例。 配置 使用新版的Consumer,需要先在工程中添加kafka-clients依赖,添加的配置信息如下: 初始化与配置 Consumer的创建过程与...

李矮矮 ⋅ 2016/09/12 ⋅ 0

喵了个咪/See-KafKa

#See-KafKa 简单舒适的PHP-KafKa拓展 ##前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通...

喵了个咪 ⋅ 2016/09/27 ⋅ 0

简单舒适的 PHP-KafKa 拓展--See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪 ⋅ 2016/09/27 ⋅ 1

[喵咪KafKa(3)]PHP拓展See-KafKa

[喵咪KafKa(3)]PHP拓展See-KafKa 前言 (Simple 简单 easy 容易 expand 的拓展) KafKa是由Apache基金会维护的一个分布式订阅分发系统,KafKa它最初的目的是为了解决,统一,高效低延时,高通量(同...

喵了_个咪 ⋅ 2016/09/27 ⋅ 1

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较

Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲、异步通信、汇集日志、系统解耦等方面。相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一...

xumaojun ⋅ 05/06 ⋅ 0

一篇文全面解读Kafka Consumer设计精要

作者介绍 郭俊,专注于大数据架构,熟悉Kafka和Flume源码;熟悉Hadoop和Spark原理;精通数据(仓)库模型设计和SQL调优。个人博客:http://www.jasongj.com/。 注:本文已经作者同意授权转载...

郭俊 ⋅ 2017/09/14 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费...

Adel ⋅ 2016/01/29 ⋅ 0

Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费...

pior ⋅ 2015/12/23 ⋅ 0

Apache Camel 2.17.1 发布,路由以及媒介引擎

Apache Camel 2.17.1 发布了,一些提升及新特性: [CAMEL-9574] - Be able to force one-way operation when using camel-cxf transport [CAMEL-9883] - Add a SpringCache based idempotent......

淡漠悠然 ⋅ 2016/05/09 ⋅ 2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Docker Swarm的前世今生

概述 在我的《Docker Swarm集群初探》一文中,我们实际体验了Docker Swarm容器集群技术的魅力,与《Kubernetes实践录》一文中提到的Kubernetes集群技术相比,Docker Swarm没有Kubernetes显得...

CodeSheep ⋅ 今天 ⋅ 0

骰子游戏代码开源地址

因为阿里云现在服务器已经停用了,所以上面的配置已经失效。 服务端开源地址:https://gitee.com/goalya/chat4.git 客户端开源地址:https://gitee.com/goalya/client4.git 具体运行界面请参考...

算法之名 ⋅ 今天 ⋅ 0

设计模式--装饰者模式

装饰者模式 定义 动态地给一个对象添加一些额外的职责。就增加功能来说,装饰模式相比生成子类更为灵活。 通用类图 意图 动态地给一个对象添加一些额外的职责。就增加功能来说,装饰模式相比...

gaob2001 ⋅ 今天 ⋅ 0

JavaScript零基础入门——(八)JavaScript的数组

JavaScript零基础入门——(八)JavaScript的数组 欢迎大家回到我们的JavaScript零基础入门,上一节课我们讲了有关JavaScript正则表达式的相关知识点,便于大家更好的对字符串进行处理。这一...

JandenMa ⋅ 今天 ⋅ 0

sbt网络问题解决方案

转自:http://dblab.xmu.edu.cn/blog/maven-network-problem/ cd ~/.sbt/launchers/0.13.9unzip -q ./sbt-launch.jar 修改 vi sbt/sbt.boot.properties 增加一个oschina库地址: [reposit......

狐狸老侠 ⋅ 今天 ⋅ 0

大数据,必须掌握的10项顶级安全技术

我们看到越来越多的数据泄漏事故、勒索软件和其他类型的网络攻击,这使得安全成为一个热门话题。 去年,企业IT面临的威胁仍然处于非常高的水平,每天都会看到媒体报道大量数据泄漏事故和攻击...

p柯西 ⋅ 今天 ⋅ 0

Linux下安装配置Hadoop2.7.6

前提 安装jdk 下载 wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz 解压 配置 vim /etc/profile # 配置java环境变量 export JAVA_HOME=/opt/jdk1......

晨猫 ⋅ 今天 ⋅ 0

crontab工具介绍

crontab crontab 是一个用于设置周期性被执行的任务工具。 周期性执行的任务列表称为Cron Table crontab(选项)(参数) -e:编辑该用户的计时器设置; -l:列出该用户的计时器设置; -r:删除该...

Linux学习笔记 ⋅ 今天 ⋅ 0

深入Java多线程——Java内存模型深入(2)

5. final域的内存语义 5.1 final域的重排序规则 1.对于final域,编译器和处理器要遵守两个重排序规则: (1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用...

江左煤郎 ⋅ 今天 ⋅ 0

面试-正向代理和反向代理

面试-正向代理和反向代理 Nginx 是一个高性能的反向代理服务器,但同时也支持正向代理方式的配置。

秋日芒草 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部