Spark Streaming发行版笔记17:资源动态分配和动态控制消费速率



1、Spark Streaming资源动态分配

2、Spark Streaming动态控制消费速率





不好的地方:从Spark Streaming角度讲有高峰值和低峰值,如果资源分配从高峰值、低峰值考虑都有大量资源的浪费。

其实当年Spark Streaming参考了Storm的设计思想,在其基础上构建的Spark Streaming2.0x内核有

很大变化,此框架的最大好处就是和兄弟框架联手。我们考虑Spark Streaming资源分配按高峰值分配的话,就会造成预分配资源浪费,尤其


Spark Streaming本身基于Spark Core的,Spark Core的核心是SparkContext对象,从SparkContext类代码的556行开始,支持资源的动态分配,源码如下:

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
  logWarning("Dynamic Allocation and num executors both set, thus
dynamic allocation disabled.")

_executorAllocationManager =
  if (dynamicAllocationEnabled)
    Some(new ExecutorAllocationManager(this, listenerBus, _conf))
  } else {

_cleaner =
  if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
    Some(new ContextCleaner(this))
  } else {


/** * Return whether dynamic allocation is enabled in the given conf * Dynamic allocation and explicitly setting the number of executors are inherently * incompatible. In environments where dynamic allocation is turned on by default, * the latter should override the former (SPARK-9092). */
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {  conf.getBoolean("spark.dynamicAllocation.enabled", false) &&    conf.getInt("spark.executor.instances", 0) == 0}
 * An agent that dynamically allocates and removes executors based on the workload.
 * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
 * synced to the cluster manager. The target starts at a configured initial value and changes with
 * the number of pending and running tasks.
 * Decreasing the target number of executors happens when the current target is more than needed to
 * handle the current load. The target number of executors is always truncated to the number of
 * executors that could run all current running and pending tasks at once.
 * Increasing the target number of executors happens in response to backlogged tasks waiting to be
 * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
 * the queue persists for another M seconds, then more executors are added and so on. The number
 * added in each round increases exponentially from the previous round until an upper bound has been
 * reached. The upper bound is based both on a configured property and on the current number of
 * running and pending tasks, as described above.
 * The rationale for the exponential increase is twofold: (1) Executors should be added slowly
 * in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
 * we may add more executors than we need just to remove them later. (2) Executors should be added
 * quickly over time in case the maximum number of executors is very high. Otherwise, it will take
 * a long time to ramp up under heavy workloads.
 * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
 * been scheduled to run any tasks, then it is removed.
 * There is no retry logic in either case because we make the assumption that the cluster manager
 * will eventually fulfill all requests it receives asynchronously.
 * The relevant Spark properties include the following:
 *   spark.dynamicAllocation.enabled - Whether this feature is enabled
 *   spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
 *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
 *   spark.dynamicAllocation.initialExecutors - Number of executors to start with
 *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
 *     If there are backlogged tasks for this duration, add new executors
 *   spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) -
 *     If the backlog is sustained for this duration, add more executors
 *     This is used only after the initial backlog timeout is exceeded
 *   spark.dynamicAllocation.executorIdleTimeout (K) -
 *     If an executor has been idle for this duration, remove it
private[spark] class ExecutorAllocationManager(
    client: ExecutorAllocationClient,
    listenerBus: LiveListenerBus,
    conf: SparkConf)
  extends Logging {

  allocationManager =>

  import ExecutorAllocationManager._

  // Lower and upper bounds on the number of executors.
  private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
  private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",

动态控制执行的executors个数。扫描executor情况,正在运行的Stage,增加executor或减少executor个数,例如减少executor情况;例如60秒发现一个任务都没有运行就会remove executor;当前应用程序含有所有启动的executors,在driver保持对executors的引用。




 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        waitingDrivers -= driver



 * This is called at a fixed interval to regulate the number of pending executor requests
 * and number of executors running.
 * First, adjust our requested executors based on the add time and our current needs.
 * Then, if the remove time for an existing executor has expired, kill the executor.
 * This is factored out into its own method for testing.
private def schedule(): Unit = synchronized {
  val now = clock.getTimeMillis


  removeTimes.retain { case (executorId, expireTime) =>
    val expired = now >= expireTime
    if (expired) {
      initializing = false



 * Register for scheduler callbacks to decide when to add and remove executors, and start
 * the scheduling task.
def start(): Unit = {

  val scheduleTask = new Runnable() {
    override def run(): Unit = {
      try {
      } catch {
        case ct: ControlThrowable =>
          throw ct
        case t: Throwable =>
          logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
  executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)



Spark Streaming本身有对rateController控制,在运行时手动控制流入的速度。如果delay,则控制速度,流入慢点,需要调整流入的数据和处理的时间比例关系。









邮箱:[email protected]


时间: 05-28

Spark Streaming发行版笔记17:资源动态分配和动态控制消费速率的相关文章

Spark Streaming发行版笔记14:updateStateByKey和mapWithState源码解密

本篇从二个方面进行源码分析: 一.updateStateByKey解密 二.mapWithState解密 通过对Spark研究角度来研究jvm.分布式.图计算.架构设计.软件工程思想,可以学到很多东西. 进行黑名单动态生成和过滤例子中会用到updateStateByKey方法,此方法在DStream类中没有定义,需要在 DStream的object区域通过隐式转换来找,如下面的代码: object DStream {   // `toPairDStreamFunctions` was in Sp

Spark Streaming发行版笔记16:数据清理内幕彻底解密

本讲从二个方面阐述: 数据清理原因和现象 数据清理代码解析 Spark Core从技术研究的角度讲 对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序. Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器.广播变量,所以需要对对象及 元数据需要定期清理.每个batch duration运行时不断触发job后需要清理rdd和元数据.Clinet模式 可以看到打印的日志,从文件日志也可以看到清理日志内容. 现在要看其背后的事情: Sp

Spark Streaming发行版笔记15:no receivers彻底思考

数据接入Spark Streaming的二种方式:Receiver和no receivers方式 建议企业级采用no receivers方式开发Spark Streaming应用程序,好处: 1.更优秀的自由度控制 2.语义一致性 no receivers更符合数据读取和数据操作,Spark 计算框架底层有数据来源,如果只有direct直接操作数据来源则更天然.操作数据来源封装其一定是rdd级别的. 所以Spark 推出了自定义的rdd即Kafkardd,只是数据来源不同. 进入源码区: 注释基

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

本期内容: 1.Spark Streaming资源动态分配 2.Spark Streaming动态控制消费速率 为什么需要动态? a)Spark默认情况下粗粒度的,先分配好资源再计算.对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费. b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素. Spark Streaming资源动态调整的时候会面临挑战: Spark Streaming

第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

本期内容: Spark Streaming资源动态分配 Spark Streaming动态控制消费速率 为什么需要动态? Spark默认情况下粗粒度的,先分配好资源再计算.而Spark Streaming有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费. Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素. Spark Streaming资源动态调整的时候会面临挑战: Spark Streaming是按照Batch Dur

Spark Streaming资源动态申请和动态控制消费速率剖析

本期内容 : Spark Streaming资源动态分配 Spark Streaming动态控制消费速率 为什么需要动态处理 : Spark 属于粗粒度资源分配,也就是在默认情况下是先分配好资源然后再进行计算,粗粒度有个好处,因为资源是提前给你分配好,当有计算任务的时候直接使用就可以了, 粗粒度不好的方面就是从Spark  Streaming角度讲有高峰值.低峰值,在高与低峰值时候需要的资源是不一样的,如果资源分配按照高峰值考虑的话,在低峰值就是对资源的浪费, 随着Spark Streaming

spark发行版笔记4Spark Streaming事务处理彻底掌握

Spark Streaming事务处理彻底掌握 感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制. 内容概括: 1Exactly once 2 输出不重复 1 正如银行转账业务一样,如果你给一个朋友转账一次,银行的系统必须保证此次的转账数据有且只能处理一次,不能出现另外的情况.事务的意思就是保证数据有且只能处理一次. 而Spark Streaming流处理在事务处理方面也是做得非常好的,并且这一部分内容也是非常重要的. 所谓一图胜千言,我们就来画一张图吧. 整个数

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

本节的主要内容: 一.ReceivedBlockTracker容错安全性 二.DStreamGraph和JobGenerator容错安全性 从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息. 从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关. ReceivedBlockTracker在接收到元数据信息后调用addBlock方法,先写入磁盘中,然

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收