文档章节

Storm中的Worker

writeademo
 writeademo
发布于 2017/01/09 17:52
字数 2630
阅读 46
收藏 0

    Storm中worker是实际执行Topology的进程,它由supervisor启动,从zookeeper中获取分配到自身的所有Executor并启动这些Executor来执行。

 

worker中的数据

通过worker-data方法定义了一个包含很多共享数据的映射集合,worker中的很多方法都依赖它;

worker中的计时器

每个计时器都对应着一个java线程,worker中使用计时器进行心跳保持以及获取元数据的更新信息。

worker的心跳

do-heartbeat函数用于产生worker的心跳信息,这些信息被写入本地文件系统中,supervisor会读取这些心跳信息以判断worker的状态,然后决定是否要重启worker.

worker-state方法会创建一个LocalState对象,并调用该对象的put方法将worker的心跳信息存储到本地的文件系统,路径:STORM-LOCAL-DIR/workers/<workerId>/heartbeats,

worker的heartbeat目录下的文件,文件名为当前的时间戳。

Executor的心跳

与worker的心跳不同,它的心跳信息直接发送到zookeeper中保存,主要保存了Executor中Task的运行统计。

 do-executor-heartbeats函数用来发送一次心跳信息

构建Executor的心跳对象,包含如下信息:

storm-id:topologyId

executor-stats:该worker中executor的运行统计,具体为对每一个Task的统计

uptime:worker的启动时间

time-secs:当前时间

调用storm-cluster-state的worker-heartbeat()方法存储心跳信息,在zookeeper中的默认路径为:/storm/workerbeats/<storm-id>/<node-port>

worker使用executor-heartbeat-timer计时器线程来发送Executor的心跳信息,默认为3秒发送一次;

worker中对ZMQ连接的维护

     在进程期间,Storm利用ZMQ来发送和接收信息,并且采用端到端的方法完成消息传输,Worker会根据Topology的定义以及分配到自身的任务情况,计算出自己发出的消息将被哪些Task接收,基于Topology的这一任务分配信息,worker可以熟悉目标Task所在的机器和端口号

    可以看出,Worker通过两种机制来保证连接的可靠性,一是在zookeeper中注册watcher回调通知的方法,这种方式并不一定可靠,例如与zookeeper的连接丢失,则注册的watcher回调方法将失效。二是采用定时器的方法定期执行该函数。

 

从ZooKeeper获取Topology的活跃情况

worker需要获得其执行的Topology的状态,refresh-storm-active函数用于获取topology的状态信息。

mk-halting-timer函数用于调用mk-timer函数来创建一个计时器,该计时器会在遇到错误的时候将错误信息记录到日志中并退出JVM

 

创建Worker

mk-worker函数用于创建Worker进程,主要工作包括启动相应的计时器,创建Worker中对应的Executor,以及启动接收线程来接收消息

过程:

    若为分布式模式,则将打印到控制台的信息打到日志里面;

    若为分布式模式,则将与Worker对应的进程ID放到pids目录下,并创建以进程ID为ID作为文件名的空文件。Supervisor在关闭Worker时会尝试关闭pids目录下面所有与进程ID相对应的进程,Worker创建的子进程也应遵循这样的规则,在Storm中,由于任务会被重新调度,因此正在执行的worker也可能被关闭;

    分别启动Worker以及Executor的心跳计时器线程,这里是预先调用一次,以确保第一次的信条信息可以被快速发送出去,然后启动计时器线程来完成周期性的心跳更新;

创建用于完成ZMQ连接更新的计时器线程;

启动消息的接收线程,receive-thread-shutdown为该线程的关闭函数;

启动消息队列的发送线程;

 

关闭worker

理解Worker关闭函数有利于进一步理解Worker中启动的线程及资源

过程:

1关闭缓存的ZMQ的Socket连接

2关闭消息接收线程

3关闭worker中的所有Executor线程

4关闭ZMQ的上下文,释放已经创建的Socket连接

5关闭消息发送队列和线程

6关闭所有的计时器线程

7关闭资源

8从ZooKeeper中清除该Worker的心跳信息

9断开与ZooKeeper的连接

 

重要的辅助方法介绍

 

   创建worker中的数据结构,启动Worker,关闭Worker的过程中会用到很多辅助方法,这些方法有助于我们更好地理解Worker的工作原理:

worker中的接收函数

Worker中的mk-transfer-local-fn函数用于生产并发送消息到Executor的接收队列,同一个worker内部的Executor之间会通过该函数传递消息。

short-executor-receive-queue-map存储Executor中第一个Task的taskid到该Executor对应的接收队列(Distuptor Queue)的映射关系。

task-getter函数以ZMQ 发来的消息为传入参数,这里的消息为一个含有两个元素的数组,第一个元素为TaskId,task-getter函数的目标是通过消息的taskId获得与其对应的Executor中第一个Task的TaskId,第二个元素为消息的实际内容。

定义函数体,函数的输入为ZMQ收到的一组消息tuple-batch,按照与消息Taskid对应的Executor中第一个Task的TaskId对消息进行分组,其变量grouped对应的键为Executor中第一个Task的Taskid,值为属于该executor的一组消息;

通过executor中第一个task的taskid获得与Executor相对应的接收消息队列q,调用disruptor/publish方法将收到的消息发送至队列q中

fast-group-by方法:

该方法传入一个函数afn和一个列表alist,当前情况下afn为task-getter函数,alist对应于接收的消息。

Worker中的发送函数

 worker中的mk-transfer-fn函数与mk-transfer-local-fn类似,主要有用于Executor的数据发送:这里分两种情况:

1.消息的目标TaskId跟发送TaskId属于同一个Worker,此时不需要跨进程传输消息,因此可以将消息直接发送至接收端Executor的接受队列。

2消息的目标Taskid跟发送TaskId属于不同的worker,此时则将消息发送至worker的发送队列,由worker负责将队列中的消息通过ZMQ发送出去。

Worker消息队列是如何接收消息的呢?Worker中会有一个额外的线程对transfer-queue进行监听,函数mk-transfer-tuples-handler用于创建于Disruptor queue对应的消息处理器。

 

过程:

    drainer列表用于缓存要发送的消息,Disruptor Queue的Onevent回调会调用本函数定义的方法,该方法的最后一个参数标识Queue是否为一个Batch结束,并会在Batch结束之前将消息缓存到drainer列表中。

node+port->socket保存了Worker中与目标node+port相对应的ZMQ Socket连接,node+port代表nimbus的资源分配单位,node则表示一台运行Supervisor的机器,port为该Supervisor上某一个运行Worker的端口号.

task->node+port为从taskId到node+port的映射关系。

endpoint-socket-lock为Worker中定义的ReentrantReadWriteLick类型的锁,Worker中存在一个专门的线程,会对缓存的ZMQ连接进行更新。

 

 

clojure-handler,函数第一个传入参数为一组消息packets,第三个表明Batch是否结束。

将消息packets放入drainer变量中,若batch-end为true,则为了避免跟ZMQ连接更新线程相冲突,这里需要申请读取endpoint-socket-lock锁,然后遍历drainer中缓存的所有消息,根据消息的taskId找到node+port,然后通过从node+port到ZMQSocket的映射关系找到对应Socket连接

调用msg/send函数将消息发送出去

清理drainer缓存

Worker中,使用transfer-tuples,transfer-thread来启动发送监听线程

获取属于Worker的Executor

    read-worker-executors函数用来计算分配到该Worker的Executor,它通过调用Storm-cluster-state的assignment-info函数获得所有Topology的分配信息,然后利用worker的assignemtn-id以及port进行过滤,得到某个worker所属的Executor,这里的assignment-id对应于node,Worker启动后,其执行的Executor集合将不再发生变化,但当任务分配情况发生变化时,Supervisor就会重启worker来处理任务。其中,Nimbus在计算分吴分配时会尽量不改变Worker中已执行的Executor。当前Worker中任何一个Executor处理失败都会导致Worker重启。

 

创建Executor中接收消息队列和查找表

    mk-receive-queue-map函数用于为Worker中的每一个Executor创建接收队列,并将其存入hash表,其中键为ExecutorId,值为Disruptor Queue的对象;

    ExecutorId实际上为含有两个元素的数据,即[startTaskId,endTaskId],表示该Executor执行的任务区间。

worker中映射关系的创建:

用mk-receive-queue-map创建Disruptor Queue

调用executor-id->tasks函数获得Executor中包含的Taskid集合,并创建hash,键为taskId,值为Tasidk所属的Executor的接收队列;

获得Executor中包含的TaskID集合,即为receive-queue-map的键集合;

构建一个新的hash,存储从Executor的看是TaskId到该Executor的接收队列的映射关系;

构建Executor中从TaksId到Executor的起始TaskId的映射关系;

 

对于接收到的一组消息,根据其taskId找到Executor的起始Taskid并根据其进行消息分组,然后根据从起始TaskId到Executor接收队列的哈希表short-executor-receive-queue-map来进行消息的分发。

 

在Worker中,TaskId,Executor以及Executor的收发队列的哈希表的关系如下:

:executor-receive-queue-map:[startTaskId,endTaskId]->Executor接收队列;

:short-executor-receive-queue-map:[startTaskId]->Executor接收队列

:receive-queue-map:[taskId]->Executor接收队列

:task->short-executor:[taskId]->[startTaskId]

 

下载Topology的配置项以及代码

   在执行一个topology时,Supervisor将从Nimbus下载3个文件,他们分别为stormconf.ser,stomrcode.ser,stormjar.jar:

stormconf.ser为Topology配置项的序列化文件,read-supervisor-storm-conf函数用于读取该文件并将其反序列化,

stormcode.ser为Topology的定义文件,可通过read-supervisor-topology函数来读取该文件,并将其反序列化解析

从nimbus上下载这些文件在Supervisor所在机器的路径为:

$StormRoot/stormData/supervisor/stormdist/<stormid>/

stormjar.jar文件包含了用户的资源文件,第三方库等,会解压,并放置于运行目录的resources文件夹下$StormRoot/stormData/supervisor/stormdist/<stormid>/resources

 

worker中的线程及其通信关系:

over

 

© 著作权归作者所有

上一篇: Storm中的Executor
下一篇: Timer类的使用
writeademo
粉丝 25
博文 692
码字总数 264466
作品 0
东城
私信 提问
Apache Storm简介及安装部署

Apache Storm是一个分布式的、可靠的、容错的实时数据流处理框架。它与Spark Streaming的最大区别在于它是逐个处理流式数据事件,而Spark Streaming是微批次处理,因此,它比Spark Streaming...

风火数据
2018/07/20
0
0
【Storm】Storm简介及Storm集群的安装部署

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86557602 1、Storm概述 (1)Storm简介 Storm最早是由BackType公司开发的实时...

魏晓蕾
01/20
0
0
Storm概念讲解和工作原理介绍

Strom的结构 Storm与传统关系型数据库 传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存 传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据 关系型数据库重...

张超
2015/04/26
2.8K
0
Apache Storm 1.0.3 发布,分布式实时计算

Apache Storm 1.0.3 发布了,Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像 Hadoop 一样实现实时批处理。Storm 很简单,可用于任意编程语言。Apache Storm ...

王练
2017/02/05
1K
0
提交的storm任务,在storm ui上可以查看,没有问题。但是去找具体的worker日志,发现没有生成日志?(目录下没有内容)

因为,之前storm非正常关闭,我把生成worker日志给删除了,再次提交storm任务,storm ui显示正常,也没发现错误。但是没有worker日志生成,不知道是怎么回事。(Storm 版本 0.96)...

威化
03/15
441
0

没有更多内容

加载失败,请刷新页面

加载更多

PHP计算两个经纬度地点之间的距离

/** * 求两个已知经纬度之间的距离,单位为米 * * @param lng1 $ ,lng2 经度 * @param lat1 $ ,lat2 纬度 * @return float 距离,单位米 * @author www.Alixixi.com */function get...

子枫Eric
29分钟前
14
0
Linux—day 4

ch2 需要掌握的命令 (1)cat -n 1.txt (2)more 1.txt (3)head -n 15 initial-setup-ks.cfg (4)tail -n 17 initial-setup-ks.cfg;tail -f initial-setup-ks.cfg (5)cat -n anaconda-ks.c......

呵呵暖茶
42分钟前
23
0
【Kubernetes社区之路】我的PR被抢了

2019年11月的某天,我无意间发现一个PR作者在自己的PR中抱怨自己的PR没被合入,而另一个比自己提交晚且内容几乎一样的PR则被合入了。 字里行间透露些许伤感外加无奈,原文如下: 作为一名开源...

恋恋美食
49分钟前
28
0
阻塞队列

对于许多线程问题, 可以通过使用一个或多个队列以优雅且安全的方式将其形式化。生产者线程向队列插人元素, 消费者线程则取出它们。 使用队列, 可以安全地从一个线程向另 一个线程传递数据...

ytuan996
50分钟前
27
0
mysql docker 配置

安装   主机上的mysql服务是基于docker安装的,具体安装脚本如下: docker run --detach \--restart always \--publish 3306:3306 --name mysql \--volume /data/mysql/logs:/logs \-...

qwfys
53分钟前
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部