文档章节

第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

葛晨鑫
 葛晨鑫
发布于 2016/05/03 00:28
字数 1113
阅读 99
收藏 0

内容:

1,解密Spark Streaming运行机制

2,解密Spark Streaming架构

DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,转过来对其内部的RDD操作。

纵轴为空间维度:代表的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的。

横轴为时间维度:按照特定的时间间隔不断地生成job对象,并在集群上运行。

随着时间的推移,基于DStream Graph 不断生成RDD Graph ,也即DAG的方式生成job,并通过Job Scheduler的线程池的方式提交给spark cluster不断的执行。

由上可知,RDD    与  DStream的关系如下

RDD是物理级别的,而 DStream 是逻辑级别的

DStream是RDD的封装类,是RDD进一步的抽象

DStream 是RDD的模板。DStream要依赖RDD进行具体的数据计算

注意:纵轴维度需要RDD,DAG的生成模板,需要TimeLine的job控制器

横轴维度(时间维度)包含batch interval,窗口长度,窗口滑动时间等。

3,Spark Streaming源码解析

StreamingContext方法中调用JobSchedulerstart方法

JobGenerator的start方法中,调用startFirstTime方法,来开启定时生成Job的定时器

startFirstTime方法,首先调用DStreamGraph的start方法,然后再调用RecurringTimer的start方法。

timer对象为一个定时器,根据batchInterval时间间隔定期向EventLoop发送GenerateJobs的消息。

接收到GenerateJobs消息后,会回调generateJobs方法。

generateJobs方法再调用DStreamGraph的generateJobs方法生成Job

DStreamGraph的generateJobs方法

DStreamGraph的实例化是在StreamingContext中的

DStreamGraph类中保存了输入流和输出流信息

回到JobGenerator的start方法中receiverTracker.start()

其中ReceiverTrackerEndpoint对象为一个消息循环体

launchReceivers方法中发送StartAllReceivers消息

接收到StartAllReceivers消息后,进行如下处理

StartReceiverFunc方法如下,实例化Receiver监控者,开启并等待退出

supervisor的start方法中调用startReceiver方法

我们以socketTextStream为例,其启动的是SocketReceiver,内部开启一个线程,来接收数据。

内部调用supervisor的pushSingle方法,将数据聚集后存放在内存中

supervisor的pushSingle方法如下,将数据放入到defaultBlockGenerator中,defaultBlockGenerator为BlockGenerator,保存Socket接收到的数据

BlockGenerator对象中有一个定时器,来更新当前的Buffer

BlockGenerator对象中有一个线程,来从阻塞队列中取出数据

调用ReceiverSupervisorImpl类中的继承BlockGeneratorListener的匿名类中的onPushBlock方法。

receivedBlockHandler对象如下

这里我们讲解BlockManagerBasedBlockHandler的方式

trackerEndpoint如下

其实是发送给ReceiverTrackerEndpoint类,

InputInfoTracker类的reportInfo方法只是对数据进行记录统计

其generateJob方法是被DStreamGraph调用

DStreamGraph的generateJobs方法是被JobGenerator类的generateJobs方法调用。

JobGenerator类中有一个定时器,batchInterval发送GenerateJobs消息

总结:

1,当调用StreamingContext的start方法时,启动了JobScheduler

2,当JobScheduler启动后会先后启动ReceiverTracker和JobGenerator

3,ReceiverTracker启动后会创建ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息

4,ReceiverTracker在启动时会给自己发送StartAllReceivers消息,自己接收到消息后,向Spark提交startReceiverFunc的Job

5,startReceiverFunc方法中在Executor上启动Receiver,并实例化ReceiverSupervisorImpl对象,来监控Receiver的运行

6,ReceiverSupervisorImpl对象会调用Receiver的onStart方法,我们以SocketReceiver为例,启动一个线程,连接Server,读取网络数据先调用ReceiverSupervisorImpl的pushSingle方法,

保存在BlockGenerator对象中,该对象内部有个定时器,放到阻塞队列blocksForPushing,等待内部线程取出数据放到BlockManager中,并发AddBlock消息给ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint为ReceiverTracker的内部类,在接收到addBlock消息后将streamId对应的数据阻塞队列streamIdToUnallocatedBlockQueues中

7,JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器

8,接收到GenerateJobs消息会先后触发ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法

9,ReceiverTracker的allocateBlocksToBatch方法会调用getReceivedBlockQueue方法从阻塞队列streamIdToUnallocatedBlockQueues中根据streamId获取数据

10,DStreamGraph的generateJobs方法,继而调用变量名为outputStreams的DStream集合的generateJob方法

11,继而调用DStream的getOrCompute来调用具体的DStream的compute方法,我们以ReceiverInputDStream为例,compute方法是从ReceiverTracker中获取数据


© 著作权归作者所有

葛晨鑫
粉丝 9
博文 25
码字总数 22470
作品 0
杭州
私信 提问
第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming

从昨天第一课的黑名单过滤的案例中,我們可以看見其實一個Spark Streaming 程序,里面會自動生成很多不同的作業,可以用以下的圖,去理解什麼是DStream,它跟RDD 之間有什麼不同。 簡單說 DS...

jcchoiling
2016/05/10
57
0
第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming

第一部份 课堂的第一部份是用IMF 晚上案例实战课的程序再运行一次,把数据再次输入数据库里面,从图一你可以看出里面有很多运行细节,例如receiver.ReceiverSupervisor,receiver.BlockManag...

jcchoiling
2016/05/09
71
0
Ls 1 - Understanding the nature of Spark Streaming

What is Spark Streaming? According to the Official Apache Spark website, Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tole......

jcchoiling
2016/05/09
801
0
通过案例对SparkStreaming透彻理解-3

本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming 容错架构和运行机制   一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力...

柯里昂
2016/05/04
293
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.5K
0

没有更多内容

加载失败,请刷新页面

加载更多

Linux 运行shell文件,出现 $'\r': command not found

运行编写的shell脚本时,出现了 $'\\r': command not found 这样的错误提示。 报错的原因是我们在windows系统操作时,编辑器里的换行符是\r\n ,而Linux上为\n,两个系统之间有差异导致的。 ...

芥末无敌
今天
10
0
Java数据结构(上)

枚举(Enumeration) 位集合(BitSet) 向量(Vector) 栈(Stack) 1.Enumeration(枚举) boolean hasMoreElements( ):测试是否有更多的元素 Object nextElement( ):如果此枚举对象至少还...

Firefly-
昨天
17
0
vue 跨层组件通讯 provide inject

https://cn.vuejs.org/v2/api/#provide-inject 类型: provide:Object | () => Object inject:Array<string> | { [key: string]: string | Symbol | Object } 详细: provide 和 inject 主......

阿豪boy
昨天
14
0
黑马程序员面试宝典(Java)Beta6.0免费下载

场景 JavaSE基础 面向对象特征以及理解 访问权限修饰符区别 理解clone对象 JavaSE语法 java有没有goto语句 &和&&的区别 如何跳出当前的多重嵌套循环? 是否可以继承String? 重载与重写的区别...

badaoliumang
昨天
14
0
监控linux系统状态

查看系统负载: w/uptime 最后面三个数字表示1分钟,5分钟,15分钟平均有多少个进程占用CPU 占用CPU的进程可以是Running,也可以是Waiting 某一时刻1颗CPU只能有一个进程在使用其资源 #查看c...

asnfuy
昨天
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部