(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容:

1、Spark Streaming元数据清理详解

2、Spark Streaming元数据清理源码解析

一、如何研究Spark Streaming元数据清理

  1. 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手。因为DStream是RDD的模板,DStream之间有依赖关系。 
    DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的。由此,DStream负责RDD的整个生命周期。因此研究的入口的是DStream。
  2. 基于Kafka数据来源,通过Direct的方式访问Kafka,DStream随着时间的进行,会不断的在自己的内存数据结构中维护一个HashMap,HashMap维护的就是时间窗口,以及时间窗口下的RDD.按照Batch Duration来存储RDD以及删除RDD.
  3. Spark Streaming本身是一直在运行的,在自己计算的时候会不断的产生RDD,例如每秒Batch Duration都会产生RDD,除此之外可能还有累加器,广播变量。由于不断的产生这些对象,因此Spark Streaming有自己的一套对象,元数据以及数据的清理机制。
  4. Spark Streaming对RDD的管理就相当于JVM的GC

二、源码解析

Spark Streaming是通过我们设定的Batch Durations来不断的产生RDD,Spark Streaming清理元数据跟时钟有关,因为数据是周期性的产生,所以肯定是周期性的释放,这些都跟JobGenerator有关,所以我们先从这开始研究。

1、RecurringTimer: 消息循环器将消息不断的发送给EventLoop

= RecurringTimer(...millisecondslongTime => .post((Time(longTime))))

2、eventLoop:onReceive接收到消息

(): = synchronized {
(!= ) = EventLoop[JobGeneratorEvent]() {
(event: JobGeneratorEvent): = processEvent(event)

(e: ): = {
      jobScheduler.reportError(e)
    }
  }
.start()

(.) {
    restart()
  } {
    startFirstTime()
  }
}

3、在processEvent中接收清理元数据消息

/** Processes all events */private def processEvent(event: JobGeneratorEvent) {  logDebug("Got event " + event)  event match {case GenerateJobs(time) => generateJobs(time)case ClearMetadata(time) => clearMetadata(time) //清理元数据case DoCheckpoint(time, clearCheckpointDataLater) =>      doCheckpoint(time, clearCheckpointDataLater)case ClearCheckpointData(time) => clearCheckpointData(time) //清理checkpoint  }}

具体的方法实现内容就不再这里说,我们进一步分析下这些清理动作是在什么时候被调用的,在Spark Streaming应用程序中,最终Job是交给JobHandler来执行的,所以我们分析下JobHandler

private class JobHandler(job: Job) extends Runnable with Logging {import JobScheduler._

def run() {try {val formattedTime = UIUtils.formatBatchTime(          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then        // it‘s possible that when `post` is called, `eventLoop` happens to null.var _eventLoop = eventLoopif (_eventLoop != null) {          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))// Disable checks for existing output directories in jobs launched by the streaming          // scheduler, since we may need to write output to an existing directory during checkpoint          // recovery; see SPARK-4835 for more details.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {            job.run()          }          _eventLoop = eventLoopif (_eventLoop != null) {_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))          }        } else {// JobScheduler has been stopped.}      } finally {        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)      }    }  }}

当Job完成的时候,会发JobCompleted消息给onReceive,通过processEvent来执行具体的方法

private def processEvent(event: JobSchedulerEvent) {try {    event match {case JobStarted(job, startTime) => handleJobStart(job, startTime)case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)case ErrorReported(m, e) => handleError(m, e)    }  } catch {case e: Throwable =>      reportError("Error in job scheduler", e)  }}
private def handleJobCompletion(job: Job, completedTime: Long) {val jobSet = jobSets.get(job.time)  jobSet.handleJobCompletion(job)  job.setEndTime(completedTime)listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))  logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)if (jobSet.hasCompleted) {jobSets.remove(jobSet.time)jobGenerator.onBatchCompletion(jobSet.time)    logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(      jobSet.totalDelay / 1000.0, jobSet.time.toString,jobSet.processingDelay / 1000.0))listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))  }  job.result match {case Failure(e) =>      reportError("Error running job " + job, e)case _ =>  }}

通过jobGenerator.onBatchCompletion来清理元数据

/** * Callback called when a batch has been completely processed. */def onBatchCompletion(time: Time) {eventLoop.post(ClearMetadata(time))}

到这里Spark Streaming清理元数据的步骤基本上完成了

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 05-31

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密的相关文章

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: Spark Streaming数据清理原因和现象 Spark Streaming数据清理代码解析 对Spark Streaming解析了这么多课之后,我们越来越能感知,Spark Streaming只是基于Spark Core的一个应用程序,因此掌握Spark Streaming对于我们怎么编写Spark应用是绝对有好处的. Spark Streaming 不像Spark Core的应用程序,Spark Core的应用的数据是存储在底层文件系统,如HDFS等别的存储系统中,而Spar

Spark 定制版:016~Spark Streaming源码解读之数据清理内幕彻底解密

本讲内容: a. Spark Streaming数据清理原因和现象 b. Spark Streaming数据清理代码解析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们之所以用一节课来讲No Receivers,是因为企业级Spark Streaming应用程序开发中在越来越多的采用No Receivers的方式.No Receiver方式有自己的优势,比如更大的控制的自由度.语义一致性等等.所以对No Receivers方

Spark Streaming源码解读之数据清理内幕彻底解密

本期内容 : Spark Streaming数据清理原理和现象 Spark Streaming数据清理代码解析 Spark Streaming一直在运行的,在计算的过程中会不断的产生RDD ,如每秒钟产生一个BachDuration同时也会产生RDD, 在这个过程中除了基本的RDD外还有累加器.广播变量等,对应Spark Streaming也有自己的对象.源数据及数据清理机制, 在运行中每个BachDuration会触发了Job ,由于会自动产生对象.数据及源数据等运行完成后肯定要自动进行回收 

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

第12课:Spark Streaming源码解读之Executor容错安全性

一.Spark Streaming 数据安全性的考虑: Spark Streaming不断的接收数据,并且不断的产生Job,不断的提交Job给集群运行.所以这就涉及到一个非常重要的问题数据安全性. Spark Streaming是基于Spark Core之上的,如果能够确保数据安全可好的话,在Spark Streaming生成Job的时候里面是基于RDD,即使运行的时候出现问题,那么Spark Streaming也可以借助Spark Core的容错机制自动容错. 对Executor容错主要是对数

第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容: Direct Access Kafka 前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读.但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receive

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成