stage划分算法原理分析

[复制链接]
查看: 3554|回复: 0

34

主题

38

帖子

489

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
489
发表于 2019-11-21 21:50:38 | 显示全部楼层 |阅读模式
版权声明:本文为CSDN博主「xiaoxin_ysj」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zlx_code/article/details/90761501

DAGScheduler的stage分配算法原理分析

为了直观的描述DAGScheduler的划分算法,我们使用上一篇分析Job触发原理的wordcount程序示例来进行分析,具体代码请看上一篇博客:Spark源码分析之 — Job触发流程原理与源码分析

如下图所示,清晰的演示了stage是如何划分的:




如上所示,DAGScheduler的stage划分算法,会从触发action操作的那个RDD开始倒推,首先会为最后一个RDD创建一个Stage(finalStage),然后往前倒推的时候,如果发现某个RDD是宽依赖,那么就将宽依赖的那个RDD,创建一个Stage,如上图所示,以shuffledRDD为界,创建了一个Stage1,这个shuffledRDD,就是这个Stage1中的最后一个RDD,然后以此类推,以宽依赖为标志,创建stage,直到RDD遍历完为止。

综上所述,Stage的划分是以宽依赖为标志,进行Stage的划分。

下面是wordcount程序实际运行中stage的划分情况(4040端口查看):



其中wordcount触发Job的action操作是foreach(),这里可以看出以reduceByKey(代码中的第19行)为界,将整个Job划分为了两个stage,0和1,其中stage的ID是从0开始的。详细的stage划分如下图所示:



可以很清楚的看到,stage0里做了哪些操作,可以看出直到map操作都属于stage0,而当进行到reduceByKey操作的时候,就进入到了stage1中,这里就很直观的展示了wordcount程序的stage划分情况。

附上scala的wordcount的程序:

def main(args : Array[String]): Unit ={
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("wordcount")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("spark.txt")

    val words = lines.flatMap( line => line.split(" ") )

    val pairs = words.map( word => (word, 1) )

    val counts = pairs.reduceByKey(_ + _)

    counts.foreach( count => println(count._1 + " : " + count._2) )

  }


本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

精选推荐

返回顶部 关注微信 下载APP 返回列表