INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD[86] at reduceByKey at <console>:24)
-
spark-stagecomputer/spark 2016. 10. 6. 20:14
원문 : https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-dagscheduler-stages.html
[ 소개 ]
stage는 물리적 실행 단위이다. 물리적 실행 계획안의 단계이다.
stage는 RDD 파티션당 하나인, 병렬 task들의 집합이다.
stage는 스파크 job의 부분으로써 실행된 function의 부분적인 결과들을 계산한다.
그림1. stage, tasks 그리고 보내진 job
바꿔 말하면, spark job은 stages들을 분할한 계산들을 계산하는 것이다.
stage는 유일하게 id에 의해 식별된다. stage가 생성될때, DAGScheduler는 stage 제출(submission)의 수를 추적하기 위해, 내부카운터 nextStageId를 증가시킨다.
Stage는 싱글 RDD(RDD에 구별된)의 파티션들에서 동작한다. 하지만, shuffle dependencies에 의해 표시된 stage의 경계선 내에서 많은 다른 의존된 부모 Stage에 관련되어 있다. ( 내부 필드인 parents를 통해서 )
그러므로 제출된 stage는 의존하는 일련의 부모 stages들의 실행을 작동하는 계기가 된다.
그림2. 보내진 job이 stage와 그 부모 stage를 실행하는 계기
결국엔, 모든 stage는 stage를 보낸 job의 id인 firstJobId를 가진다.
Stage의 2가지 타입
* ShuffleMapStage : 실행중 DAG의 중간에 있는 Stage. 다른 stage들을 위해 데이터를 생산한다. 그것은 셔플을 위해 map output files를 쓴다. 그것은 (조정할수 있는)adaptive query planning의 job 안에 마지막 stage가 될수 도 있다.
* ResultStage : RDD function이 동작하는 유저 프로그램안의 spark action을 실행하는 마지막 stage.
job 이 보내질 때, 새로운 stage가 연결된 부모 shuffleMapStage 와 함께 생성된다. - 만약 다른 잡이 벌써 그것들을 사용한다면, 그들은 공유되거나 연결되거나 scratch로 부터 생성된다.
stage는 자신이 속한 job들에 대해 안다. ( 내부 필드의 jobIds로 )
DAGScheduler는 stage들의 집합으로 job을 나눈다. 각 stage는 전체 데이터 집합이 셔플링 없이 구성된 일련의 narrow 트랜스포메이션(map, filter)으로 구성된다. shuffle 경계선과는 분리된다. 따라서 Stage들은 shuffle 경계선에서 RDD 그래프를 자른 결과이다.
그림4. stage의 그래프
셔플 경계선은 map output들을 fetch 하기 전에 완료된 이전의 stage를 위해 기다린 stage들과 task들의 장벽으로 소개된다.
map()과 filter() 같은 narrow dependencies와 같은 RDD operation들은 각 stage안에서 하나의 task들의 집합으로 함께 pipelined 한다.
하지만, shuffle dependencies같은 operation들은 multistage를 필요로 한다. (i.e. one to write a set of map output files, and another to read those files after a barrier.)
In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the
RDD.compute()
functions of various RDDs, e.g.MappedRDD
,FilteredRDD
, etc.At some point of time in a stage’s life, every partition of the stage gets transformed into a task -ShuffleMapTask or
ResultTask
for ShuffleMapStage and ResultStage, respectively.Partitions are computed in jobs, and result stages may not always need to compute all partitions in their target RDD, e.g. for actions like
first()
andlookup()
.DAGScheduler
prints the following INFO message when there are tasks to submit:There is also the following DEBUG message with pending partitions:
DEBUG DAGScheduler: New pending partitions: Set(0)
Tasks are later submitted to Task Scheduler (via
taskScheduler.submitTasks
).When no tasks in a stage can be submitted, the following DEBUG message shows in the logs:
FIXME
numTasks - where and what
CautionFIXME Why do stages have numTasks
? Where is this used? How does this correspond to the number of partitions in a RDD?Stage.findMissingPartitions
Stage.findMissingPartitions()
calculates the ids of the missing partitions, i.e. partitions for which the ActiveJob knows they are not finished (and so they are missing).A ResultStage stage knows it by querying the active job about partition ids (
numPartitions
) that are not finished (usingActiveJob.finished
array of booleans).Figure 6. ResultStage.findMissingPartitions and ActiveJobIn the above figure, partitions 1 and 2 are not finished (
F
is false whileT
is true).Stage.failedOnFetchAndShouldAbort
Stage.failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean
checks whether the number of fetch failed attempts (usingfetchFailedAttemptIds
) exceeds the number of consecutive failures allowed for a given stage (that should then be aborted)NoteThe number of consecutive failures for a stage is not configurable. 'computer > spark' 카테고리의 다른 글
spark-Driver (0) 2016.10.07 spark-Executors (0) 2016.10.07 spark-Spark Architecture (0) 2016.10.06