(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

本期内容:

1、ReceiverTracker的架构设计

2、消息循环系统

3、ReceiverTracker具体实现

上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。

ReceiverTracker主要的功能:

  1. 在Executor上启动Receivers。
  2. 停止Receivers 。
  3. 更新Receiver接收数据的速度(也就是限流)
  4. 不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver,也就是Receiver的容错功能。
  5. 接受Receiver的注册。
  6. 借助ReceivedBlockTracker来管理Receiver接收数据的元数据。
  7. 汇报Receiver发送过来的错误信息

ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。

在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers。

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。

Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker。

/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]  ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}

当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据

case AddBlock(receivedBlockInfo) =>if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {walBatchingThreadPool.execute(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {if (active) {          context.reply(addBlock(receivedBlockInfo))         } else {throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")        }      }    })  } else {    context.reply(addBlock(receivedBlockInfo))  }

数据的元数据是交由ReceivedBlockTracker管理的

数据最终被写入到streamIdToUnallocatedBlockQueues中,一个流对应一个数据块信息的队列。

每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。

下面是简单的流程图:

参考博客:http://blog.csdn.net/hanburgud/article/details/51471074

http://lqding.blog.51cto.com/9123978/1774994

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 05-23

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考的相关文章

第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

本篇博文的目标是: Driver的ReceiverTracker接收到数据之后,下一步对数据是如何进行管理 一:ReceiverTracker的架构设计 1. Driver在Executor启动Receiver方式,每个Receiver都封装成一个Task,此时一个Job中就一个Task,而Task中就一条数据,也就是Receiver数据.由此,多少个Job也就可以启动多少个Receiver. 2. ReceiverTracker在启动Receiver的时候他有ReceiverSuperviso

11.Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

上篇文章详细解析了Receiver不断接收数据的过程,在Receiver接收数据的过程中会将数据的元信息发送给ReceiverTracker: 本文将详细解析ReceiverTracker的的架构设计和具体实现 一.ReceiverTracker的主要功能 ReceiverTracker的主要功能有: 1.在Executor上启动Receivers 2.接受Receiver的注册 3.借助ReceivedBlockTracker来管理Receiver接收数据的元数据 4.接受Receiver发送

Spark Streaming源码解读之生成全生命周期彻底研究与思考

本期内容 : DStream与RDD关系彻底研究 Streaming中RDD的生成彻底研究 问题的提出 : 1. RDD是怎么生成的,依靠什么生成 2.执行时是否与Spark Core上的RDD执行有什么不同的 3. 运行之后我们要怎么处理 为什么有第三点 : 是因为Spark Streaming 中会随着相关触发条件,窗口Window滑动的时候都会不断的产生RDD , 从最基本的层次考虑,RDD也是基本对象,每秒会产生RDD ,内存能不能完全容纳,每个处理完成后怎么进行管理? 一. 整个Spa

Spark Streaming源码解读之Driver中ReceiverTracker架构设计以具体实现彻底研究

本期内容 : ReceiverTracker的架构设计 消息循环系统 ReceiverTracker具体实现 一. ReceiverTracker的架构设计 1. ReceiverTracker可以以Driver中的具体自己的算法来在具体的Execute中启动Receiver,启动Receiver的方式会把每个Receiver都封装成为一个Tracker, Tracker是这个Job中唯一的Tracker,实质上讲ReceiverTracker启动Receiver的方式就是封装成一个个Job ,

(版本定制)第13课:Spark Streaming源码解读之Driver容错安全性

本期内容:1. ReceiverBlockTracker容错安全性 2. DStream和JobGenerator容错安全性 一:容错安全性 1. ReceivedBlockTracker负责管理Spark Streaming运行程序的元数据.数据层面 2. DStream和JobGenerator是作业调度的核心层面,也就是具体调度到什么程度了,从运行的考虑的.DStream是逻辑层面. 3. 作业生存层面,JobGenerator是Job调度层面,具体调度到什么程度了.从运行的角度的. 谈D

第13课:Spark Streaming源码解读之Driver容错安全性

本期内容: ReceivedBlockTracker容错安全性 DStream和JobGenerator容错安全性 Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管理的各组件信息(调度和驱动层面) 元数据采用了WAL的容错机制 case AddBlock(receivedBlockInfo) =>   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {     wa

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

本节的主要内容: 一.ReceivedBlockTracker容错安全性 二.DStreamGraph和JobGenerator容错安全性 从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息. 从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关. ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成