(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

本期内容:

1、Spark Streaming资源动态分配

2、Spark Streaming动态控制消费速率

为什么需要动态? 
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。

b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。 
Spark Streaming资源动态调整的时候会面临挑战: 
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。

Spark Streaming资源动态申请 
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。

// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {  logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")}

_executorAllocationManager =if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(this, listenerBus, _conf))  } else {    None  }_executorAllocationManager.foreach(_.start())

设置spark.dynamicAllocation.enabled参数为true

这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。

/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = {  listenerBus.addListener(listener)

val scheduleTask = new Runnable() {override def run(): Unit = {try {        schedule() //动态调整Executor分配数量      } catch {case ct: ControlThrowable =>throw ctcase t: Throwable =>          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)      }    }  }executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}
private def schedule(): Unit = synchronized {val now = clock.getTimeMillis

  updateAndSyncNumExecutorsTarget(now) //更新Executor数量

removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId)    }    !expired  }}
/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we‘ve made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeeded

if (initializing) {// Do not change our target while we are still initializing,    // Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new    // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget    numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1

// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) {      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)      logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed")    }numExecutorsTarget - oldNumExecutorsTarget  } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded)    logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta  } else {0}}

动态控制消费速率: 
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 06-08

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析的相关文章

第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

本期内容: Spark Streaming资源动态分配 Spark Streaming动态控制消费速率 为什么需要动态? Spark默认情况下粗粒度的,先分配好资源再计算.而Spark Streaming有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费. Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素. Spark Streaming资源动态调整的时候会面临挑战: Spark Streaming是按照Batch Dur

Spark Streaming资源动态申请和动态控制消费速率剖析

本期内容 : Spark Streaming资源动态分配 Spark Streaming动态控制消费速率 为什么需要动态处理 : Spark 属于粗粒度资源分配,也就是在默认情况下是先分配好资源然后再进行计算,粗粒度有个好处,因为资源是提前给你分配好,当有计算任务的时候直接使用就可以了, 粗粒度不好的方面就是从Spark  Streaming角度讲有高峰值.低峰值,在高与低峰值时候需要的资源是不一样的,如果资源分配按照高峰值考虑的话,在低峰值就是对资源的浪费, 随着Spark Streaming

spark版本定制十八:Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容: 1.Spark Streaming中RDD的空处理 2.StreamingContext程序的停止 一.Spark Streaming中RDD的空处理 案例代码: Scala代码: package com.dt.spark.sparkstreaming import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用Scala开发集群运行的Sp

Spark Streaming发行版笔记17:资源动态分配和动态控制消费速率

本篇从二个方面讲解: 高级特性: 1.Spark Streaming资源动态分配 2.Spark Streaming动态控制消费速率 原理剖析,动态控制消费速率其后面存在一套理论,资源动态分配也有一套理论. 先讲理论,后面讨论. 为什么要动态资源分配和动态控制速率? Spark默认是先分配资源,然后计算:粗粒度的分配方式,资源提前分配好,有计算任务提前分配好资源: 不好的地方:从Spark Streaming角度讲有高峰值和低峰值,如果资源分配从高峰值.低峰值考虑都有大量资源的浪费. 其实当年S

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

本期内容: 1.ReceiverTracker的架构设计 2.消息循环系统 3.ReceiverTracker具体实现 上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现. ReceiverTracker主要的功能: 在Executor上启动Receivers. 停止Receivers . 更新Receiver接收数据的速度(也就是限流) 不断的等待Receivers的运行

(版本定制)第18课:Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容: 1. Spark Streaming中RDD为空处理 2. Streaming Context程序停止方式 Spark Streaming运用程序是根据我们设定的Batch Duration来产生RDD,产生的RDD存在partitons数据为空的情况,但是还是会执行foreachPartition,会获取计算资源,然后计算一下,这种情况就会浪费 集群计算资源,所以需要在程序运行的时候进行过滤,参考如下代码: package com.dt.spark.sparkstreamingim

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1.Spark Streaming元数据清理详解 2.Spark Streaming元数据清理源码解析 一.如何研究Spark Streaming元数据清理 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手.因为DStream是RDD的模板,DStream之间有依赖关系. DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的.由此,DStream负责RDD的整个