|
版权声明:本文为CSDN博主「xiaoxin_ysj」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/zlx_code/article/details/90761501
DAGScheduler的stage分配算法原理分析
为了直观的描述DAGScheduler的划分算法,我们使用上一篇分析Job触发原理的wordcount程序示例来进行分析,具体代码请看上一篇博客:Spark源码分析之 — Job触发流程原理与源码分析
如下图所示,清晰的演示了stage是如何划分的:
如上所示,DAGScheduler的stage划分算法,会从触发action操作的那个RDD开始倒推,首先会为最后一个RDD创建一个Stage(finalStage),然后往前倒推的时候,如果发现某个RDD是宽依赖,那么就将宽依赖的那个RDD,创建一个Stage,如上图所示,以shuffledRDD为界,创建了一个Stage1,这个shuffledRDD,就是这个Stage1中的最后一个RDD,然后以此类推,以宽依赖为标志,创建stage,直到RDD遍历完为止。
综上所述,Stage的划分是以宽依赖为标志,进行Stage的划分。
下面是wordcount程序实际运行中stage的划分情况(4040端口查看):
其中wordcount触发Job的action操作是foreach(),这里可以看出以reduceByKey(代码中的第19行)为界,将整个Job划分为了两个stage,0和1,其中stage的ID是从0开始的。详细的stage划分如下图所示:
可以很清楚的看到,stage0里做了哪些操作,可以看出直到map操作都属于stage0,而当进行到reduceByKey操作的时候,就进入到了stage1中,这里就很直观的展示了wordcount程序的stage划分情况。
附上scala的wordcount的程序:
def main(args : Array[String]): Unit ={
val conf = new SparkConf()
.setMaster("local")
.setAppName("wordcount")
val sc = new SparkContext(conf)
val lines = sc.textFile("spark.txt")
val words = lines.flatMap( line => line.split(" ") )
val pairs = words.map( word => (word, 1) )
val counts = pairs.reduceByKey(_ + _)
counts.foreach( count => println(count._1 + " : " + count._2) )
}
|
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
|