(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容:

1、在线动态计算分类最热门商品案例回顾与演示

2、基于案例分析Spark Streaming的运行源码

第一部分案例:

package com.dt.spark.sparkstreaming

import com.robinspark.utils.ConnectionPool
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
  下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
  * 实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用MLsqlgraphx等功能是因为有foreachRDDTransform
  * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
  假设说这里的数据的格式:user item category,例如Rocky Samsung Android
  */
object OnlineTheTop3ItemForEachCategory2DB {
  def main(args: Array[String]){
    /**
      * 1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
      例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置
      local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
      只有1G的内存)的初学者       *
      */
    val conf = new SparkConf() //创建SparkConf对象
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
    //conf.setMaster("local[2]")
    //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
    val ssc = new StreamingContext(conf, Seconds(5))

ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
    val userClickLogsDStream = ssc.socketTextStream("Master", 9999)

//格式为(category_item,1) =>(K,V)
    val formattedUserClickLogsDStream = userClickLogsDStream.map(clickLog =>
        (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(1), 1))

val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
      _-_, Seconds(60), Seconds(20))

categoryUserClickLogsDStream.foreachRDD { rdd => {
      if (rdd.isEmpty()) {
        println("No data inputted!!!")
      } else {
        val categoryItemRow = rdd.map(reducedItem => {
          val category = reducedItem._1.split("_")(0)
          val item = reducedItem._1.split("_")(1)
          val click_count = reducedItem._2
          Row(category, item, click_count)
        })

val structType = StructType(Array(
          StructField("category", StringType, true),
          StructField("item", StringType, true),
          StructField("click_count", IntegerType, true)
        ))

val hiveContext = new HiveContext(rdd.context)
        val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

categoryItemDF.registerTempTable("categoryItemTable")
        val reseltDataFram = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
          " OVER (PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
          " WHERE rank <= 3")
        reseltDataFram.show()

val resultRowRDD = reseltDataFram.rdd

        resultRowRDD.foreachPartition { partitionOfRecords => {

if (partitionOfRecords.isEmpty){
            println("This RDD is not null but partition is null")
          } else {
            // ConnectionPool is a static, lazily initialized pool of connections
            val connection = ConnectionPool.getConnection()
            partitionOfRecords.foreach(record => {
              val sql = "insert into categorytop3(category,item,click_count) values(‘" + record.getAs("category") + "‘,‘" +
                record.getAs("item") + "‘," + record.getAs("click_count") + ")"
              val stmt = connection.createStatement();
              stmt.executeUpdate(sql);

})
            ConnectionPool.returnConnection(connection) // return to the pool for future reuse
          }
        }
        }
      }
    }
    }
    /**
      * StreamingContext调用start方法的内部其实是会启动JobSchedulerStart方法,进行消息循环,在JobScheduler
      * start内部会构造JobGeneratorReceiverTacker,并且调用JobGeneratorReceiverTackerstart方法:
      *   1JobGenerator启动后会不断的根据batchDuration生成一个个的Job
      *   2ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到
      *   数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker
      *   内部会通过ReceivedBlockTracker来管理接受到的元数据信息
      每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD
      * DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个
      单独的线程来提交Job到集群运行(其实是在线程中基于RDDAction触发真正的作业的运行),为什么使用线程池呢?
      *   1,作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
      *   2,有可能设置了JobFAIR公平调度的方式,这个时候也需要多线程的支持;
      *
      */
    ssc.start()
    ssc.awaitTermination()
  }
}

第二部分源码解析:

1.根据传递的SparkConf参数创建StreamingContext对象,在内部创建SparkContext

/**

* Create a StreamingContext by providing the configuration necessary for a new SparkContext.

* @param conf a org.apache.spark.SparkConf object specifying Spark parameters

* @param batchDuration the time interval at which streaming data will be divided into batches

*/

def this(conf: SparkConf, batchDuration: Duration) = {

this(StreamingContext.createNewSparkContext(conf), null, batchDuration)

}

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {

new SparkContext(conf)

}

这说明Spark Streaming也是Spark上的一个应用程序

2.创建Socket输入流,socketTextStream方法定义如下:

/**

* Create a input stream from TCP source hostname:port. Data is received using

* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\\n` delimited

* lines.

* @param hostname      Hostname to connect to for receiving data

* @param port          Port to connect to for receiving data

* @param storageLevel  Storage level to use for storing the received objects

*                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)

*/

def socketTextStream(

hostname: String,

port: Int,

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

): ReceiverInputDStream[String] = withNamedScope("socket text stream") {

socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)

}

可看到代码最后面调用socketStream,socketStream定义如下:

/**

* Create a input stream from TCP source hostname:port. Data is received using

* a TCP socket and the receive bytes it interepreted as object using the given

* converter.

* @param hostname      Hostname to connect to for receiving data

* @param port          Port to connect to for receiving data

* @param converter     Function to convert the byte stream to objects

* @param storageLevel  Storage level to use for storing the received objects

* @tparam T            Type of the objects received (after converting bytes to objects)

*/

def socketStream[T: ClassTag](

hostname: String,

port: Int,

converter: (InputStream) => Iterator[T],

storageLevel: StorageLevel

): ReceiverInputDStream[T] = {

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

}

实际上生成SocketInputDStream,SocketInputDStream类如下:

private[streaming]

class SocketInputDStream[T: ClassTag](

ssc_ : StreamingContext,

host: String,

port: Int,

bytesToObjects: InputStream => Iterator[T],

storageLevel: StorageLevel

) extends ReceiverInputDStream[T](ssc_) {

def getReceiver(): Receiver[T] = {

new SocketReceiver(host, port, bytesToObjects, storageLevel)

}

}

SocketInputDStream继承ReceiverInputDStream。

其中实现getReceiver方法,返回SocketReceiver对象。

总结一下SocketInputDStream的继承关系:

SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。

DStream是生成RDD的模板,是逻辑级别,当达到Interval的时候这些模板会被BatchData实例化成为RDD和DAG。

看看DStream的源码片段:

// RDDs generated, marked as private[streaming] so that testsuites can access it

@transient

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

看看DStream的getOrCompute方法:

/**

* Get the RDD corresponding to the given time; either retrieve it from cache

* or compute-and-cache it.

*/

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {

// If RDD was already generated, then retrieve it from HashMap,

// or else compute the RDD

generatedRDDs.get(time).orElse {

// Compute the RDD if time is valid (e.g. correct time in a sliding window)

// of RDD generation, else generate nothing.

if (isTimeValid(time)) {

val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details. We need to have this call here because

// compute() might cause Spark jobs to be launched.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

compute(time)

}

}

rddOption.foreach { case newRDD =>

// Register the generated RDD for caching and checkpointing

if (storageLevel != StorageLevel.NONE) {

newRDD.persist(storageLevel)

logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")

}

if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {

newRDD.checkpoint()

logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")

}

generatedRDDs.put(time, newRDD)

}

rddOption

} else {

None

}

}

}

主要是生成RDD,再将生成的RDD放在HashMap中。具体生成RDD过程以后剖析。

目前大致讲了DStream和RDD这些核心概念在Spark Streaming中的使用。

先看看ScreamingContext的start()。start()方法启动StreamContext,由于Spark应用程序不能有多个SparkContext对象实例,所以Spark Streaming框架在启动时对状态进行判断。代码如下:

/**

* Start the execution of the streams.

*

* @throws IllegalStateException if the StreamingContext is already stopped.

*/

def start(): Unit = synchronized {

state match {

case INITIALIZED =>

startSite.set(DStream.getCreationSite())

StreamingContext.ACTIVATION_LOCK.synchronized {

StreamingContext.assertNoOtherContextIsActive()

try {

validate()

// Start the streaming scheduler in a new thread, so that thread local properties

// like call sites and job groups can be reset without affecting those of the

// current thread.

ThreadUtils.runInNewThread("streaming-start") {

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")

//启动JobScheduler

scheduler.start()

}

state = StreamingContextState.ACTIVE

} catch {

case NonFatal(e) =>

logError("Error starting the context, marking it as stopped", e)

scheduler.stop(false)

state = StreamingContextState.STOPPED

throw e

}

StreamingContext.setActiveContext(this)

}

shutdownHookRef = ShutdownHookManager.addShutdownHook(

StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

// Registering Streaming Metrics at the start of the StreamingContext

assert(env.metricsSystem != null)

env.metricsSystem.registerSource(streamingSource)

uiTab.foreach(_.attach())

logInfo("StreamingContext started")

case ACTIVE =>

logWarning("StreamingContext has already been started")

case STOPPED =>

throw new IllegalStateException("StreamingContext has already been stopped")

}

}

初始状态时,会启动JobScheduler。

来看下JobScheduler的启动过程start()。其中启动了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多项工作。

def start(): Unit = synchronized {

if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")

eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {

override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

// 启动消息循环处理线程。用于处理JobScheduler的各种事件。

eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates

for {

inputDStream <- ssc.graph.getInputStreams

rateController <- inputDStream.rateController

} ssc.addStreamingListener(rateController)

// 启动监听器。用于更新Spark UI中StreamTab的内容。

listenerBus.start(ssc.sparkContext)

receiverTracker = new ReceiverTracker(ssc)

// 生成InputInfoTracker。用于管理所有的输入的流,以及他们输入的数据统计。这些信息将通过 StreamingListener监听。

inputInfoTracker = new InputInfoTracker(ssc)

// 启动ReceiverTracker。用于处理数据接收、数据缓存、Block生成。

receiverTracker.start()

// 启动JobGenerator。用于DStreamGraph初始化、DStream与RDD的转换、生成Job、提交执行等工作。

jobGenerator.start()

logInfo("Started JobScheduler")

}

JobScheduler中的消息处理函数processEvent,处理三类消息:Job已开始,Job已完成,错误报告。

private def processEvent(event: JobSchedulerEvent) {

try {

event match {

case JobStarted(job, startTime) => handleJobStart(job, startTime)

case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

case ErrorReported(m, e) => handleError(m, e)

}

} catch {

case e: Throwable =>

reportError("Error in job scheduler", e)

}

}

我们再粗略地分析一下JobScheduler.start()中启动的工作。

先看JobScheduler.start()启动的第一项工作EventLoop。EventLoop用于处理JobScheduler的各种事件。

EventLoop中有事件队列:

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

还有一个线程处理队列中的事件:

private val eventThread = new Thread(name) {

setDaemon(true)

override def run(): Unit = {

try {

while (!stopped.get) {

val event = eventQueue.take()

try {

onReceive(event)

} catch {

case NonFatal(e) => {

try {

onError(e)

} catch {

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

}

} catch {

case ie: InterruptedException => // exit even if eventQueue is not empty

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

这个线程中的onReceive、onError,在JobScheduler中的EventLoop实例化时已定义。
JobScheduler.start()启动的第二项工作StreamListenerBus。用于异步传递StreamingListenerEvents到注册的StreamingListeners。用于更新Spark UI中StreamTab的内容。

  以下代码用于传递各种事件:

override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {

event match {

case receiverStarted: StreamingListenerReceiverStarted =>

listener.onReceiverStarted(receiverStarted)

case receiverError: StreamingListenerReceiverError =>

listener.onReceiverError(receiverError)

case receiverStopped: StreamingListenerReceiverStopped =>

listener.onReceiverStopped(receiverStopped)

case batchSubmitted: StreamingListenerBatchSubmitted =>

listener.onBatchSubmitted(batchSubmitted)

case batchStarted: StreamingListenerBatchStarted =>

listener.onBatchStarted(batchStarted)

case batchCompleted: StreamingListenerBatchCompleted =>

listener.onBatchCompleted(batchCompleted)

case outputOperationStarted: StreamingListenerOutputOperationStarted =>

listener.onOutputOperationStarted(outputOperationStarted)

case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>

listener.onOutputOperationCompleted(outputOperationCompleted)

case _ =>

}

}

看JobScheduler.start()启动的第三项工作ReceiverTracker。ReceiverTracker的start()中,内部实例化ReceiverTrackerEndpoint这个Rpc消息通信体。

def start(): Unit = synchronized {

if (isTrackerStarted) {

throw new SparkException("ReceiverTracker already started")

}

if (!receiverInputStreams.isEmpty) {

endpoint = ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

if (!skipReceiverLaunch) launchReceivers()

logInfo("ReceiverTracker started")

trackerState = Started

}

}

在ReceiverTracker启动的过程中会调用其launchReceivers方法:

/**

* Get the receivers from the ReceiverInputDStreams, distributes them to the

* worker nodes as a parallel collection, and runs them.

*/

private def launchReceivers(): Unit = {

val receivers = receiverInputStreams.map(nis => {

val rcvr = nis.getReceiver()

rcvr.setReceiverId(nis.id)

rcvr

})

runDummySparkJob()

logInfo("Starting " + receivers.length + " receivers")

endpoint.send(StartAllReceivers(receivers))

}

其中调用了runDummySparkJob方法来启动Spark Streaming的框架第一个Job,其中collect这个action操作会触发Spark Job的执行。这个方法是为了确保每个Slave都注册上,避免所有Receiver都在一个节点,使后面的计算能负载均衡。

/**

* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the

* receivers to be scheduled on the same node.

*

* TODO Should poll the executor number and wait for executors according to

* "spark.scheduler.minRegisteredResourcesRatio" and

* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.

*/

private def runDummySparkJob(): Unit = {

if (!ssc.sparkContext.isLocal) {

ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()

}

assert(getExecutors.nonEmpty)

}

ReceiverTracker.launchReceivers()还调用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信体发送StartAllReceivers消息。

ReceiverTrackerEndpoint它自己接收到消息后,先根据调度策略获得Recevier在哪个Executor上运行,然后在调用startReceiver(receiver, executors)方法,来启动Receiver。

override def receive: PartialFunction[Any, Unit] = {

// Local messages

case StartAllReceivers(receivers) =>

val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

for (receiver <- receivers) {

val executors = scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId, executors)

receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation

startReceiver(receiver, executors)

}

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的。而在startReceiverFunc方法中是实例化ReceiverSupervisorImpl对象,该对象是对Receiver进行管理和监控。这个Job是Spark Streaming框架为我们启动的第二个Job,且一直运行。因为supervisor.awaitTermination()该方法会阻塞等待退出。

/**

* Start a receiver along with its scheduled executors

*/

private def startReceiver(

receiver: Receiver[_],

scheduledLocations: Seq[TaskLocation]): Unit = {

def shouldStartReceiver: Boolean = {

// It‘s okay to start when trackerState is Initialized or Started

!(isTrackerStopping || isTrackerStopped)

}

val receiverId = receiver.streamId

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

return

}

val checkpointDirOption = Option(ssc.checkpointDir)

val serializableHadoopConf =

new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node

val startReceiverFunc: Iterator[Receiver[_]] => Unit =

(iterator: Iterator[Receiver[_]]) => {

if (!iterator.hasNext) {

throw new SparkException(

"Could not start receiver as object not found.")

}

if (TaskContext.get().attemptNumber() == 0) {

val receiver = iterator.next()

assert(iterator.hasNext == false)

//实例化Receiver监控者

val supervisor = new ReceiverSupervisorImpl(

receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

} else {

// It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.

}

}

// Create the RDD using the scheduledLocations to run the receiver in a Spark job

val receiverRDD: RDD[Receiver[_]] =

if (scheduledLocations.isEmpty) {

ssc.sc.makeRDD(Seq(receiver), 1)

} else {

val preferredLocations = scheduledLocations.map(_.toString).distinct

ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

}

receiverRDD.setName(s"Receiver $receiverId")

ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")

ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](

receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

// We will keep restarting the receiver job until ReceiverTracker is stopped

future.onComplete {

case Success(_) =>

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

} else {

logInfo(s"Restarting Receiver $receiverId")

self.send(RestartReceiver(receiver))

}

case Failure(e) =>

if (!shouldStartReceiver) {

onReceiverJobFinish(receiverId)

} else {

logError("Receiver has been stopped. Try to restart it.", e)

logInfo(s"Restarting Receiver $receiverId")

self.send(RestartReceiver(receiver))

}

}(submitJobThreadPool)

logInfo(s"Receiver ${receiver.streamId} started")

}

接下来看下ReceiverSupervisorImpl的启动过程,先启动所有注册上的BlockGenerator对象,然后向ReceiverTrackerEndpoint发送RegisterReceiver消息,再调用receiver的onStart方法。

/** Start the supervisor */

def start() {

onStart()

startReceiver()

}

其中的onStart():

override protected def onStart() {

registeredBlockGenerators.foreach { _.start() }

}

其中的startReceiver():

/** Start receiver */

def startReceiver(): Unit = synchronized {

try {

if (onReceiverStart()) {

logInfo("Starting receiver")

receiverState = Started

receiver.onStart()

logInfo("Called receiver onStart")

} else {

// The driver refused us

stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)

}

} catch {

case NonFatal(t) =>

stop("Error starting receiver " + streamId, Some(t))

}

}

override protected def onReceiverStart(): Boolean = {

val msg = RegisterReceiver(

streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)

trackerEndpoint.askWithRetry[Boolean](msg)

}

其中在Driver运行的ReceiverTrackerEndpoint对象接收到RegisterReceiver消息后,将streamId, typ, host, executorId, receiverEndpoint封装为ReceiverTrackingInfo保存到内存对象receiverTrackingInfos这个HashMap中。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

// Remote messages

case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>

val successful =

registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)

context.reply(successful)

case AddBlock(receivedBlockInfo) =>

if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {

walBatchingThreadPool.execute(new Runnable {

override def run(): Unit = Utils.tryLogNonFatalError {

if (active) {

context.reply(addBlock(receivedBlockInfo))

} else {

throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")

}

}

})

} else {

context.reply(addBlock(receivedBlockInfo))

}

/** Register a receiver */

private def registerReceiver(

streamId: Int,

typ: String,

host: String,

executorId: String,

receiverEndpoint: RpcEndpointRef,

senderAddress: RpcAddress

): Boolean = {

if (!receiverInputStreamIds.contains(streamId)) {

throw new SparkException("Register received for unexpected id " + streamId)

}

if (isTrackerStopping || isTrackerStopped) {

return false

}

val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations

val acceptableExecutors = if (scheduledLocations.nonEmpty) {

// This receiver is registering and it‘s scheduled by

// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.

scheduledLocations.get

} else {

// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling

// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.

scheduleReceiver(streamId)

}

def isAcceptable: Boolean = acceptableExecutors.exists {

case loc: ExecutorCacheTaskLocation => loc.executorId == executorId

case loc: TaskLocation => loc.host == host

}

if (!isAcceptable) {

// Refuse it since it‘s scheduled to a wrong executor

false

} else {

val name = s"${typ}-${streamId}"

val receiverTrackingInfo = ReceiverTrackingInfo(

streamId,

ReceiverState.ACTIVE,

scheduledLocations = None,

runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),

name = Some(name),

endpoint = Some(receiverEndpoint))

receiverTrackingInfos.put(streamId, receiverTrackingInfo)

listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)

true

}

}

Receiver的启动,以ssc.socketTextStream("localhost", 9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接Socket Server,读取socket数据并存储。

private[streaming]

class SocketReceiver[T: ClassTag](

host: String,

port: Int,

bytesToObjects: InputStream => Iterator[T],

storageLevel: StorageLevel

) extends Receiver[T](storageLevel) with Logging {

def onStart() {

// Start the thread that receives data over a connection

new Thread("Socket Receiver") {

setDaemon(true)

override def run() { receive() }

}.start()

}

def onStop() {

// There is nothing much to do as the thread calling receive()

// is designed to stop by itself isStopped() returns false

}

/** Create a socket connection and receive data until receiver is stopped */

def receive() {

var socket: Socket = null

try {

logInfo("Connecting to " + host + ":" + port)

socket = new Socket(host, port)

logInfo("Connected to " + host + ":" + port)

val iterator = bytesToObjects(socket.getInputStream())

while(!isStopped && iterator.hasNext) {

store(iterator.next)

}

if (!isStopped()) {

restart("Socket data stream had no more data")

} else {

logInfo("Stopped receiving")

}

} catch {

case e: java.net.ConnectException =>

restart("Error connecting to " + host + ":" + port, e)

case NonFatal(e) =>

logWarning("Error receiving data", e)

restart("Error receiving data", e)

} finally {

if (socket != null) {

socket.close()

logInfo("Closed socket to " + host + ":" + port)

}

}

}

}

接下来看JobScheduler.start()中启动的第四项工作JobGenerator。

JobGenerator有成员RecurringTimer,用于启动消息系统和定时器。按照batchInterval时间间隔定期发送GenerateJobs消息。

//根据创建StreamContext时传入的batchInterval,定时发送GenerateJobs消息

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

JobGenerator的start()方法:

/** Start generation of jobs */

def start(): Unit = synchronized {

if (eventLoop != null) return // generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.

// See SPARK-10125

checkpointWriter

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {

override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = {

jobScheduler.reportError("Error in job generator", e)

}

}

// 启动消息循环处理线程

eventLoop.start()

if (ssc.isCheckpointPresent) {

restart()

} else {

// 开启定时生成Job的定时器

startFirstTime()

}

}

JobGenerator.start()中的startFirstTime()的定义:

/** Starts the generator for the first time */

private def startFirstTime() {

val startTime = new Time(timer.getStartTime())

graph.start(startTime - graph.batchDuration)

timer.start(startTime.milliseconds)

logInfo("Started JobGenerator at " + startTime)

}

JobGenerator.start()中的processEvent()的定义:

/** Processes all events */

private def processEvent(event: JobGeneratorEvent) {

logDebug("Got event " + event)

event match {

case GenerateJobs(time) => generateJobs(time)

case ClearMetadata(time) => clearMetadata(time)

case DoCheckpoint(time, clearCheckpointDataLater) =>

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

其中generateJobs的定义:

/** Generate jobs and perform checkpoint for the given `time`.  */

private def generateJobs(time: Time) {

// Set the SparkEnv in this thread, so that job generation code can access the environment

// Example: BlockRDDs are created in this thread, and it needs to access BlockManager

// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

SparkEnv.set(ssc.env)

Try {

// 根据特定的时间获取具体的数据

jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

//调用DStreamGraph的generateJobs生成Job

graph.generateJobs(time) // generate jobs using allocated block

} match {

case Success(jobs) =>

val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

case Failure(e) =>

jobScheduler.reportError("Error generating jobs for time " + time, e)

}

eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

/** Perform checkpoint for the give `time`. */

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

logInfo("Checkpointing graph for time " + time)

ssc.graph.updateCheckpointData(time)

checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

}

}

DStreamGraph的generateJobs方法,调用输出流的generateJob方法来生成Jobs集合。

// 输出流:具体Action的输出操作

private val outputStreams = new ArrayBuffer[DStream[_]]()

def generateJobs(time: Time): Seq[Job] = {

logDebug("Generating jobs for time " + time)

val jobs = this.synchronized {

outputStreams.flatMap { outputStream =>

val jobOption = outputStream.generateJob(time)

jobOption.foreach(_.setCallSite(outputStream.creationSite))

jobOption

}

}

logDebug("Generated " + jobs.length + " jobs for time " + time)

jobs

}

来看下DStream的generateJob方法,调用getOrCompute方法来获取当Interval的时候,DStreamGraph会被BatchData实例化成为RDD,如果有RDD则封装jobFunc方法,里面包含context.sparkContext.runJob(rdd, emptyFunc),然后返回封装后的Job。

/**

* Generate a SparkStreaming job for the given time. This is an internal method that

* should not be called directly. This default implementation creates a job

* that materializes the corresponding RDD. Subclasses of DStream may override this

* to generate their own jobs.

*/

private[streaming] def generateJob(time: Time): Option[Job] = {

getOrCompute(time) match {

case Some(rdd) => {

val jobFunc = () => {

val emptyFunc = { (iterator: Iterator[T]) => {} }

context.sparkContext.runJob(rdd, emptyFunc)

}

Some(new Job(time, jobFunc))

}

case None => None

}

}

接下来看JobScheduler的submitJobSet方法,向线程池中提交JobHandler。而JobHandler实现了Runnable 接口,最终调用了job.run()这个方法。看一下Job类的定义,其中run方法调用的func为构造Job时传入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作,最终导致Job的提交。

def submitJobSet(jobSet: JobSet) {

if (jobSet.jobs.isEmpty) {

logInfo("No jobs added for time " + jobSet.time)

} else {

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

logInfo("Added jobs for time " + jobSet.time)

}

}

private class JobHandler(job: Job) extends Runnable with Logging {

import JobScheduler._

def run() {

try {

val formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from $batchLinkText""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it‘s possible that when `post` is called, `eventLoop` happens to null.

var _eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobStarted(job, clock.getTimeMillis()))

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

}

} else {

// JobScheduler has been stopped.

}

} finally {

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)

}

}

}

}

参考博客:http://blog.csdn.net/andyshar/article/details/51339746

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 05-06

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码的相关文章

基于案例贯通 Spark Streaming 流计算框架的运行源码

本期内容 : Spark Streaming+Spark SQL案例展示 基于案例贯穿Spark Streaming的运行源码 一. 案例代码阐述 : 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等. 1.案例运行代码 : import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCont

版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCo

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

5.Spark Streaming流计算框架的运行流程源码分析2

1 spark streaming 程序代码实例 代码如下: [html] view plain copy object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ val conf = new SparkConf() //创建SparkConf对象 //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setAppName("OnlineTheTop3ItemForEachCategor

Spark版本定制第5天:案列解析Spark Streaming运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌

通过案例对 spark streaming 透彻理解三板斧之二:spark streaming运行机制

本期内容: 1. Spark Streaming架构 2. Spark Streaming运行机制 Spark大数据分析框架的核心部件: spark Core.spark  Streaming流计算.GraphX图计算.MLlib机器学习.Spark SQL.Tachyon文件系统.SparkR计算引擎等主要部件. Spark Streaming 其实是构建在spark core之上的一个应用程序,要构建一个强大的Spark应用程序 ,spark  Streaming是一个值得借鉴的参考,spa