文档章节

Storm-kafka【接口实现】-2 ZkBrokerReader

止静
 止静
发布于 2014/07/22 16:12
字数 497
阅读 220
收藏 0


阅读背景:  您可能需要先阅读本空间之中有关博文 Storm-kafka【接口实现】-2 DynamicBrokerReader

阅读目的:再封装 DynamicBrokerReader

本章主题:在细节上把握 DynamicBrokerReder的封装类 - ZkBrokerReader

package com.mixbox.storm.kafka.trident;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.DynamicBrokersReader;
import com.mixbox.storm.kafka.ZkHosts;

import java.util.Map;

/**
 * 2014/07/22
 * 在ZK中间拿到 GlobalPartitionInformation
 * 
 * ZkBrokerReader 是对于DynamicBrokersReader的一个简单的封装
 * @author Yin Shuai
 */
public class ZkBrokerReader implements IBrokerReader {

	public static final Logger LOG = LoggerFactory
			.getLogger(ZkBrokerReader.class);

	GlobalPartitionInformation cachedBrokers;
	
	
	DynamicBrokersReader reader;
	
	
	long lastRefreshTimeMs;

	
	long refreshMillis;

	/**
	 * 
	 * @param conf
	 * @param topic
	 *            指定topic的zkBrokerReader
	 * @param hosts
	 */

	public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {

		reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
				hosts.brokerZkPath, topic);

		cachedBrokers = reader.getBrokerInfo();
		lastRefreshTimeMs = System.currentTimeMillis();
		refreshMillis = hosts.refreshFreqSecs * 1000L;
	}

	@Override
	public GlobalPartitionInformation getCurrentBrokers() {
		long currTime = System.currentTimeMillis();

		// 很简单, 指定了你多长时间开始去刷新Brokerlibiao
		if (currTime > lastRefreshTimeMs + refreshMillis) {
			LOG.info("brokers need refreshing because " + refreshMillis
					+ "ms have expired");
			cachedBrokers = reader.getBrokerInfo();
			lastRefreshTimeMs = currTime;
		}
		return cachedBrokers;
	}

	@Override
	public void close() {
		reader.close();
	}
}


      总览我们的Code :         

            ZkBrokerReader  是对于  DynamicBrokersReader的一个简单封装,ZkBrokerReader之中持有2个主要的Class

 

    1 GlobalPartitionInformatio  cachedBroker;

    2 DynamicBrokersReader  reader;


    3 long lastRefreshTimeMs;   最新的刷新时间

lastRefreshTimeMs = System.currentTimeMillis();    最新的刷新时间为系统的当前时间

    4 long refreshMillis 

refreshMillis = host.refreshFreqSecs * 1000L  设定刷新的毫秒数为

    5 

public GlobalPartitionInformation getCurrentBrokers() {
		long currTime = System.currentTimeMillis();

		// 很简单, 指定了你多长时间开始去刷新Brokerlibiao
		if (currTime > lastRefreshTimeMs + refreshMillis) {
			LOG.info("brokers need refreshing because " + refreshMillis
					+ "ms have expired");
			cachedBrokers = reader.getBrokerInfo();
			lastRefreshTimeMs = currTime;
		}
		return cachedBrokers;
	}

   每一次调用getCurrentBrokers,首先会取System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间+刷新

的间隔,就会再次的去跟新:

        cachedBrokers = reader.getBrokerInfo(); getBrokerInfo()方法每调用一次,也就重新在zk之中重新去取

一次。

总结:

                ZkBrokerReader是对于DynamicBrokerReader的一个封装,DynamicBrokerReader的Dynamic性质并不程序动态的因数,而只是简单在读取ZK数据的过程之中,Zk数据已经动态的发生变化?


          QA:  到目前为止,ZKBrokerReader 为什么需要存在,为什么会成为整体的一个部分?

           请参看本空间的另外一篇博文:

              Storm-kafka【接口实现】-2 DynamicPartitionConnections

© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

阅读背景:您需要对Zk,Kafka有基础的了解 本章主题:详尽的梳理ZkCoordinator的过程 package com.mixbox.storm.kafka; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com......

止静
2014/07/23
915
0
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
Apache Storm 2.0.0 发布,基于 Java ​​​​​​​的新架构

Apache Storm 2.0.0 发布了,距离它上次更新已过去一年,新版本在性能、新功能和与外部系统的集成方面进行了重大改进,下面是一些主要功能及改进: 用 Java 实现的新架构 在之前的版本中,S...

xplanet
2019/06/03
3.1K
5
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
2018/07/09
0
0
业务系统-kafka-Storm【日志本地化】 - 1 将日志文件打印到local

阅读前提: 1 : 您可能需要对 logback 日志系统有所了解 2 :您可能需要对于 kafka 有初步的了解 3:请代码查看之前,请您仔细参考系统的业务图解 由于kafka本身自带了和『Hadoop』的接口,...

止静
2014/09/10
2.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

node.js中__dirname和./有什么区别?

在Node.js中编程并引用位于与当前目录相关的文件时,是否有任何理由使用__dirname变量而不仅仅是常规./ ? 到目前为止,我一直在使用./并且刚刚发现了__dirname的存在,并且基本上想知道将....

javail
16分钟前
35
0
Hive安装

下载上传apache-hive-2.1.1-bin.tar.gz文件并解压 tar -zxvf apache-hive-2.1.1-bin.tar.gz -C /export/servers 配置环境变量 vi /etc/profileexport HIVE_HOME=/export/servers/apache......

长臂猿猴
22分钟前
39
0
OS X:相当于Linux的wget

如何从库存OS X系统上的Un * x shell脚本执行HTTP GET? (安装第三方软件不是一种选择,因为这必须在我无法控制的许多不同系统上运行)。 例如,如果我在本地启动Mercurial服务器执行hg服务...

技术盛宴
32分钟前
39
0
Fedora 31 - 荣耀魔法本问题记录

最近荣耀魔法本装了Fedora31系统,没有像Deepin 15.11那样出现亮度不能调节和网卡不能用的问题。但是也有不少小问题: 触摸板有时启动后失效,需要重启。 触摸板的右键失效。 待机后进入桌面...

wffger
35分钟前
34
0
ElasticSearch深入:内部机制浅析(三)@

前言 上篇从分布式的角度阐述了 ES 的分布式设计和思想,这一篇打算与 Lucene 结合起来,摸透一些 ES 的常遇到的概念,我们可以将了解到的这些东西应用到优化实践中去。 一、Shard Shard 实际...

HLee
今天
30
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部