文档章节

Storm-kafka【接口实现】4 - KafkaSpout

止静
 止静
发布于 2014/08/08 16:25
字数 892
阅读 330
收藏 0

  

阅读前提:  请参看本空间之前的博文

博文目的: 对于Storm 如何和Kafka进行整合

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;

import java.util.*;

/**
 * @author Yin Shuai
 */

public class KafkaSpout extends BaseRichSpout {

	public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

	/**
	 * 内部类,Message和Offset的偏移量对象
	 * 
	 * @author Yin Shuai
	 */

	public static class MessageAndRealOffset {
		public Message msg;
		public long offset;

		public MessageAndRealOffset(Message msg, long offset) {
			this.msg = msg;
			this.offset = offset;
		}
	}

	/**
	 * 发射的枚举类
	 * @author Yin Shuai
	 */
	static enum EmitState {
		EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
	}

	String _uuid = UUID.randomUUID().toString();
	
	SpoutConfig _spoutConfig;
	
	SpoutOutputCollector _collector;

	// 分区的协调器,getMyManagedPartitions 拿到我所管理的分区
	PartitionCoordinator _coordinator;

	// 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码
	DynamicPartitionConnections _connections;

	// 提供了从zookeeper读写kafka 消费者信息的功能
	ZkState _state;

	// 上次更新的毫秒数
	long _lastUpdateMs = 0;

	// 当前的分区
	int _currPartitionIndex = 0;

	public KafkaSpout(SpoutConfig spoutConf) {
		_spoutConfig = spoutConf;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void open(Map conf, final TopologyContext context,
			final SpoutOutputCollector collector) {
		_collector = collector;

		List<String> zkServers = _spoutConfig.zkServers;

		// 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper
		if (zkServers == null) {

			zkServers = new ArrayList<String>() {

				{
					add("192.168.50.144");
					add("192.168.50.169");
					add("192.168.50.207");
				}
			};

			// zkServers =
			// (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
			System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers);

		}

		Integer zkPort = _spoutConfig.zkPort;

		// 在这里我们也同时 来检查zookeeper的端口是否为空
		if (zkPort == null) {

			zkPort = 2181;
			// zkPort = ((Number)
			// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
		}

		Map stateConf = new HashMap(conf);

		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);

		// 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除
		_state = new ZkState(stateConf);

		// 对于连接的维护
		_connections = new DynamicPartitionConnections(_spoutConfig,
				KafkaUtils.makeBrokerReader(conf, _spoutConfig));

		// using TransactionalState like this is a hack
		// 拿到总共的任务次数

		int totalTasks = context
				.getComponentTasks(context.getThisComponentId()).size();

		// 判断当前的主机是否是静态的statichost
		if (_spoutConfig.hosts instanceof StaticHosts) {
			_coordinator = new StaticCoordinator(_connections, conf,
					_spoutConfig, _state, context.getThisTaskIndex(),
					totalTasks, _uuid);

			// 当你拿到的spoutConfig是zkhost的时候
		} else {
			_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
					_state, context.getThisTaskIndex(), totalTasks, _uuid);
		}

		context.registerMetric("kafkaOffset", new IMetric() {
			KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(
					_spoutConfig.topic, _connections);

			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Set<Partition> latestPartitions = new HashSet();
				for (PartitionManager pm : pms) {
					latestPartitions.add(pm.getPartition());
				}
				_kafkaOffsetMetric.refreshPartitions(latestPartitions);
				for (PartitionManager pm : pms) {
					_kafkaOffsetMetric.setLatestEmittedOffset(
							pm.getPartition(), pm.lastCompletedOffset());
				}
				return _kafkaOffsetMetric.getValueAndReset();
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);

		context.registerMetric("kafkaPartition", new IMetric() {
			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Map concatMetricsDataMaps = new HashMap();
				for (PartitionManager pm : pms) {
					concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
				}
				return concatMetricsDataMaps;
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);
	}

	@Override
	public void close() {
		_state.close();
	}

	@Override
	public void nextTuple() {
		// Storm-spout 是从kafka 消费数据,把 kafka 的 consumer
		// 当成是一个spout,并且向其他的bolt的发送数据

		// 拿到当前我管理的这些PartitionsManager
		List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
		for (int i = 0; i < managers.size(); i++) {

			// 对于每一个分区的 PartitionManager

			// in case the number of managers decreased
			// 当前的分区

			_currPartitionIndex = _currPartitionIndex % managers.size();

			// 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖
			EmitState state = managers.get(_currPartitionIndex)
					.next(_collector);

			// 如果发送状态为:发送-还有剩余
			if (state != EmitState.EMITTED_MORE_LEFT) {
				_currPartitionIndex = (_currPartitionIndex + 1)
						% managers.size();
			}

			// 如果发送的状态为: 发送-没有剩余
			if (state != EmitState.NO_EMITTED) {
				break;
			}
		}

		long now = System.currentTimeMillis();
		if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
			commit();
		}
	}

	@Override
	public void ack(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.ack(id.offset);
		}
	}

	@Override
	public void fail(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.fail(id.offset);
		}
	}

	@Override
	public void deactivate() {
		// 停止工作
		commit();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		System.out.println(_spoutConfig.scheme.getOutputFields());
		declarer.declare(_spoutConfig.scheme.getOutputFields());
	}

	private void commit() {
		_lastUpdateMs = System.currentTimeMillis();
		for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
			manager.commit();
		}
	}

}

  

       在粗浅的代码阅读之后,在这里进行详细的分析:

      1  KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类

    

public static class MessageAndRealOffset
{
    public Message msg;
    
    public long offset;
    
    public MessageAndRealOffset(Message msg,long offset)
    {
        this.msg = msg;
        this.offset = offset;
    }
}


    2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象

是ZKCoordinator

     有关ZKCoordinator的实现,请参看本空间的另外一篇博

      Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

    

     

       




© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
storm集群 + kafka单机性能测试

storm与kafka单机功能整合很顺利,但是到了storm集群环境和数据处理性能时则出现了一些问题,现将测试过程和问题简单记录如下: 性能指标:每分钟处理至少100万的信息(csv格式,100bytes左右...

GoldenRoc
2014/09/26
4.3K
2
【Storm】- Storm集成kafka

Storm 流式处理Kafka数据 --- tips 老版本:官方文档 新版本:官方文档 Storm可集成组件: 测试代码 需求:给kafka数据添加日期 实际用途:可根据业务续期自定义,例如解析Nginx日志ip限制访...

ZeroneLove
2019/04/08
72
0
Kafka添加了ACL权限后,Storm该如何消费?

Kafka采用了SASL+ACL的权限验证后,Storm中的KafkaSpout该如何修改呢? 我查看了storm-kafka-1.0.1和storm-kafka-1.1.1的源码,KafkaConfig均是使用旧的消费模式,即传入zookeeper connect,...

酋长_lee
2017/09/04
375
2
Flume+Kafka+Storm+Redis构建大数据实时处理系统

一、大数据处理的常用方法 之前在《采集→清洗→处理:基于MapReduce的离线数据分析》中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通...

技术小能手
2018/07/09
0
0
Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

阅读背景:您需要对Zk,Kafka有基础的了解 本章主题:详尽的梳理ZkCoordinator的过程 package com.mixbox.storm.kafka; import org.slf4j.Logger;import org.slf4j.LoggerFactory; import com......

止静
2014/07/23
915
0

没有更多内容

加载失败,请刷新页面

加载更多

工作自由--2020年开篇,开启一个项目:工作自由 worksolo.cn

新年伊始,我突发奇想,也是很多人敢想而不敢做的事情,下面我以一个多年软件开发从业者的角度去思考,去设计这个项目,当然希望看到这篇文章的你可以给我更多思路: 项目名称:工作自由 域名...

_aron_
22分钟前
14
0
王道 第一章 计算机系统概述

这门课学的是逻辑实现,不是具体的机型 主要内容: 基本部件的结构和组织方式 基本运算的操作原理 基本部件和单元的设计思想 处理器+内存=计算机 存储器 存储器(高速缓存、主存储器、虚拟存...

heronos
今天
81
0
SpringBoot+Mybatis+Thymeleaf-Build Blog site_1

1、快速构建Springboot项目 (1)、 Spring Boot 项目目录结构介绍 (2)、 Spring Boot 项目启动的几种方式 2、 (1)、hello blog (2)、 DispatchServlet 配置 (3)、 静态 web 资源如何...

杨木发
今天
128
0
关于docker0: iptables: No chain/target/match by that name的问题解决

由于Docker 0默认网桥的iptables策略冲突问题,将导致一些web server启动时出现如下错误: docker: Error response from daemon: driver failed programming external connectivity on endpo......

王焱君
今天
103
0
js 下载 canvas 兼容移动端

很蛋疼的问题PC上好好的, 移动端下载不了 , 貌似前端 js 生成的时 base64 格式的 图片数据,移动端无法直接下载, 但是chrome 移动端和pc端都没问题, 国产的几个浏览器全部挂了 之前的下载方式...

阿豪boy
昨天
96
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部