文档章节

Storm-kafka【接口实现】3 - DynamicPartitionConnections

止静
 止静
发布于 2014/07/22 17:01
字数 875
阅读 158
收藏 0


阅读背景: 如有需要,尽情参看本空间的另外一篇文档

阅读目的:了解Storm 如何来封装kafka接口,如何处理Connection连接的封装性问题


参看 DynamicPartitionConnections class

package com.mixbox.storm.kafka;

import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.trident.IBrokerReader;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * 2014/07/22
 * 动态的【分区连接】
 * @author Yin Shuai
 */

public class DynamicPartitionConnections {

	public static final Logger LOG = LoggerFactory
			.getLogger(DynamicPartitionConnections.class);

	/**
	 * 持有了一个 kafka底层的SimpleConsumer对象
	 * 持有了  具体的分区
	 * 
	 * @author Yin Shuai
	 */
	
	
	static class ConnectionInfo {

		//内部维持了一个SimpleConsumer
		SimpleConsumer consumer;
		
		//分区
		Set<Integer> partitions = new HashSet();

		public ConnectionInfo(SimpleConsumer consumer) {
			this.consumer = consumer;
		}
	}

	/**
	 * 也就是kafka的每一个节点都维持了一个COnnectionInfo,ConnectionInfo
	 */
	Map<Broker, ConnectionInfo> _connections = new HashMap();

	// kafkaConfig
	KafkaConfig _config;

	/**
	 * IBrokerReader 基本上 IbroerReader这里初始化的是ZkBrokerReader
	 */

	IBrokerReader _reader;

	/**
	 * @param config
	 *            kafka配置
	 * @param brokerReader
	 *            IBrokerReader-用于拿到当前的接口
	 */
	public DynamicPartitionConnections(KafkaConfig config,
			IBrokerReader brokerReader) {
		_config = config;
		_reader = brokerReader;
	}

	/**
	 * @param partition  分区
	 * @return
	 */
	public SimpleConsumer register(Partition partition) {

		/**
		 * 依据你所拥有的partition号,拿到你所对应的Broker
		 * GlobalPartitionInformation中有Map<Integer, Broker>
		 * partitionMap,记录了分区号与Broker所对应的关系
		 */

		Broker broker = _reader.getCurrentBrokers().getBrokerFor(
				partition.partition);
		return register(broker, partition.partition);
	}

	/**
	 * @param host
	 *            主机
	 * @param partition
	 *            分区
	 * @return 底层的SimpleConsumer 对象,这里存在一个注册的行为,将主机和端口【broker】,和分区【partition】 注册到 connections连接之中
	 */
	public SimpleConsumer register(Broker host, int partition) {

		// Map<Broker, ConnectionInfo> _connections = new HashMap();


		//如果连接之中没有包含了Broker,那么建立一个新的连接,并且将这个  主机和连接注册到  _connections之中
		if (!_connections.containsKey(host)) {
			_connections.put(host, new ConnectionInfo(new SimpleConsumer(
					host.host, host.port, _config.socketTimeoutMs,
					_config.bufferSizeBytes, _config.clientId)));
		}
		
		// ---------   在这里,不管之前有没有都只取一次 -------------
		
		//当包含了,那就直接取出
		ConnectionInfo info = _connections.get(host);
		info.partitions.add(partition);
		return info.consumer;
	}

	public SimpleConsumer getConnection(Partition partition) {

		// ConnectionInfo 之中封装了一个simpleConsumer
		ConnectionInfo info = _connections.get(partition.host);
		if (info != null) {
			return info.consumer;
		}
		return null;
	}

	/**
	 * @param port    固定的Broker
	 * @param partition  固定的分区
	 */
	public void unregister(Broker port, int partition) {
		ConnectionInfo info = _connections.get(port);
		info.partitions.remove(partition);
		if (info.partitions.isEmpty()) {
			info.consumer.close();
			_connections.remove(port);
		}
	}

	public void unregister(Partition partition) {
		unregister(partition.host, partition.partition);
	}

	public void clear() {
		for (ConnectionInfo info : _connections.values()) {
			info.consumer.close();
		}
		_connections.clear();
	}
}


     与前文有关

                1:       在DynamicPartitionConnections之中,我们持有了一个 IBrokerReader的接口对象。

                2 :       由于IBrokerReader 派生出了  

                                                    2.1 StaticBrokerReader

                                                    2.2 ZBrokerReader

       在这个序列的一系列博文之中,ZBrokerReader已经进行了详尽的分析,并且在赋值的过程之中,IBrokerReader也是实例化为ZBrokerReader了。

     内部类:

                DynamicPartitionConnections 持有了一个 CinnectionInfo的内部类

      

static class ConnectionInfo {

		//内部维持了一个SimpleConsumer
		SimpleConsumer consumer;
		
		//分区
		Set<Integer> partitions = new HashSet();

		public ConnectionInfo(SimpleConsumer consumer) {
			this.consumer = consumer;
		}
	}

      1:  对于每一个Connection内部都维持了一个SimpleConsumer ,以及一个 Set集合 partitions

      

      2 :在DynamicPartitionConnections里面我们维持了一个_connections的对象

Map<Broker, ConnectionInfo> _connections = new HashMap();

      3 :在连接维护之中,关键的地方是维护一个 register注册的行为:

     

public SimpleConsumer register(Broker host, int partition) {

     4: 如果_connections之中没有包含Broker,那么将会再建立一个新的连接,并且将Broker和Connection 注册到_connections之中

    

    5:在注册的过程之中,不包含就注册,最后都直接取出SimpleConsumer,这个SimpleConsumer

封装了

new ConnectionInfo(new SimpleConsumer(

host.host, host.port, _config.socketTimeoutMs,

_config.bufferSizeBytes, _config.clientId)):

 


最后总结:在DynamicPartitionConn之中 ZBrokerReader的实例变量_reader 只是用于处理当前的Broker没有指定的时候,我们在构建simpleConsumer的过程之中,动态的依据partition号去load对应的Broker而已。


    那么?如何进行下一步的抽象过程,如何使用DynamicPartitionConnections?

    请参考本空间的另外一篇博文:

Storm-kafka【接口实现】4 - KafkaSpout


© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

阅读背景:您需要对Zk,Kafka有基础的了解 本章主题:详尽的梳理ZkCoordinator的过程 package com.mixbox.storm.kafka; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com......

止静
2014/07/23
915
0
基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats)

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase Highchats) 网盘地址:https://pan.baidu.com/s/1-59o76H32Jfp0Vp-o8zLYg 提取码: im8f 备用地址(腾讯微云):https://share...

小小搞笑弟
2018/11/16
0
0
Apache Storm 2.0.0 发布,基于 Java ​​​​​​​的新架构

Apache Storm 2.0.0 发布了,距离它上次更新已过去一年,新版本在性能、新功能和与外部系统的集成方面进行了重大改进,下面是一些主要功能及改进: 用 Java 实现的新架构 在之前的版本中,S...

xplanet
2019/06/03
3.1K
5
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
2018/07/09
0
0
业务系统-kafka-Storm【日志本地化】 - 1 将日志文件打印到local

阅读前提: 1 : 您可能需要对 logback 日志系统有所了解 2 :您可能需要对于 kafka 有初步的了解 3:请代码查看之前,请您仔细参考系统的业务图解 由于kafka本身自带了和『Hadoop』的接口,...

止静
2014/09/10
2.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

应急广播户户通平台

一、平台概述 应急广播户户通平台为软硬一体化广播服务解决方案。实现了应急广播、视音频及图片文字信息、调频及数字广播FM、天气预报信息接收功能,以及视音频播放、智能机器人、电子日历等...

neocean
30分钟前
47
0
如何为Apache 2.2启用mod_rewrite

我已经在我的Vista机器上安装了新的Apache 2.2,一切正常,除了mod重写。 我没有注释 LoadModule rewrite_module modules/mod_rewrite.s 但是我的重写规则都没有,即使是简单的重写规则 Re...

javail
36分钟前
23
0
移除Python unicode字符串中的重音符号的最佳方法是什么?

我在Python中有一个Unicode字符串,我想删除所有的重音符号(变音符号)。 我在网上发现了一种用Java实现此目的的优雅方法: 将Unicode字符串转换为长规范化格式(带有单独的字母和变音符号)...

技术盛宴
51分钟前
48
0
ActiveMQ学习之SpringBoot整合ActiveMQ------>主题生产者和消费者

一、pom <!--聚合工程集成关系--> <!--统一整合第三方框架依赖信息--> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</a......

冥焱
今天
89
0
两周自制脚本语言-第11天 优化变量读写性能

第11天 优化变量读写性能 以变量值的读写为例,向读者介绍基于这种理念的语言处理器性能优化方式。 11.1 通过简单数组来实现环境 假如函数包含局部变量x与y,程序可以事先将x设为数组的第0个...

果汁分你一半
今天
58
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部