用html5做的网站的原代码/网站制作报价
Spark任务的执行源码
目录
- Spark任务的执行源码
- 一、概述
- 1.1 任务切分和任务调度原理
- 1.2 本地化调度
- 1.3 失败重试与黑名单机制
- 二、阶段的划分
- 三、任务的切分
- 四、任务的调度
- 4.1 提交任务
- 4.2 FIFO和公平调度器
- 4.3 读取任务
- 4.4 FIFO和公平调度器规则
- 4.5 发送给Executor端执行任务
- 五、任务的执行
一、概述
1.1 任务切分和任务调度原理
1.2 本地化调度
任务分配原则:根据每个Task的优先位置,确定Task的Locality(本地化)级别,本地化一共有五种,优先级由高到低顺序:
移动数据不如移动计算。
1.3 失败重试与黑名单机制
除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。
在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。
二、阶段的划分
0)在WordCount程序中查看源码
package com.hadoop.sparkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {// 1 创建scval conf: SparkConf = new SparkConf().setAppName("WC").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)// 2 读取数据 hello atguigu spark sparkval lineRDD: RDD[String] = sc.textFile("input")// 3 一行变多行val wordRDD: RDD[String] = lineRDD.flatMap((x: String) => x.split(" "))// 4 变换结构 一行变一行val wordToOneRDD: RDD[(String, Int)] = wordRDD.map((x: String) => (x, 1))// 5 聚合key相同的单纯val wordToSumRDD: RDD[(String, Int)] = wordToOneRDD.reduceByKey((v1, v2) => v1 + v2)// 6 收集打印wordToSumRDD.collect().foreach(println)//7 关闭资源sc.stop()}
}
1)在WordCount代码中点击collect
RDD.scala
def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)
}
SparkContext.scala
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {runJob(rdd, func, 0 until rdd.partitions.length)
}def runJob[T, U: ClassTag](rdd: RDD[T],func: Iterator[T] => U,partitions: Seq[Int]): Array[U] = {val cleanedFunc = clean(func)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int]): Array[U] = {val results = new Array[U](partitions.size)runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)results
}def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {... ...dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)... ...
}
DAGScheduler.scala
def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = { ... ... val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)... ...
}def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {... ...val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))waiter
}
EventLoop.scala
def post(event: E): Unit = {if (!stopped.get) {if (eventThread.isAlive) {eventQueue.put(event)} else {... ...}}
}private[spark] val eventThread = new Thread(name) {override def run(): Unit = {while (!stopped.get) {val event = eventQueue.take()try {onReceive(event)} catch {... ...}}}
}
查找onReceive实现类(ctrl + h)
DAGScheduler.scala
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {... ...override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {doOnReceive(event)} finally {timerContext.stop()}}private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)... ...}... ...private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties): Unit = {var finalStage: ResultStage = nullfinalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)... ...}private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {… …val parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement()val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage}private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {getShuffleDependencies(rdd).map { shuffleDep =>getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList}private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += rddwhile (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.remove(0)if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.prepend(dependency.rdd)}}}parents}private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {shuffleIdToMapStage.get(shuffleDep.shuffleId) match {case Some(stage) =>stagecase None =>getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>if (!shuffleIdToMapStage.contains(dep.shuffleId)) {createShuffleMapStage(dep, firstJobId)}}// Finally, create a stage for the given shuffle dependency.createShuffleMapStage(shuffleDep, firstJobId)}}def createShuffleMapStage[K, V, C](shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {... ...val rdd = shuffleDep.rddval numTasks = rdd.partitions.lengthval parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement() val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)... ... } ... ...
}
三、任务的切分
DAGScheduler.scala
private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties): Unit = {var finalStage: ResultStage = nulltry {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)} catch {... ...}val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ... ...submitStage(finalStage)
}private def submitStage(stage: Stage): Unit = {val jobId = activeJobForStage(stage)if (jobId.isDefined) { if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)if (missing.isEmpty) {submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()... ...val tasks: Seq[Task[_]] = try {val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage => stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = partitions(id)stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, id, properties, serializedTaskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,stage.rdd.isBarrier())}}} catch {... ...}
}
Stage.scala
private[scheduler] abstract class Stage(... ...)extends Logging {... ...def findMissingPartitions(): Seq[Int]... ...
}
全局查找(ctrl + h)findMissingPartitions实现类。
ShuffleMapStage.scala
private[spark] class ShuffleMapStage(... ...)extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {private[this] var _mapStageJobs: List[ActiveJob] = Nil... ...override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)}
}
ResultStage.scala
private[spark] class ResultStage(... ...)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {... ...override def findMissingPartitions(): Seq[Int] = {val job = activeJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}... ...
}
四、任务的调度
4.1 提交任务
DAGScheduler.scala
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {... ...if (tasks.nonEmpty) {taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))} else {markStageAsFinished(stage, None)stage match {case stage: ShuffleMapStage =>markMapStageJobsAsFinished(stage)case stage : ResultStage =>logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")}submitWaitingChildStages(stage)}
}
TaskScheduler.scala
def submitTasks(taskSet: TaskSet): Unit
全局查找submitTasks的实现类TaskSchedulerImpl
TaskSchedulerImpl.scala
override def submitTasks(taskSet: TaskSet): Unit = {val tasks = taskSet.tasksthis.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])... ...stageTaskSets(taskSet.stageAttemptId) = manager// 向队列里面设置任务schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ... ...}// 取任务backend.reviveOffers()
}
4.2 FIFO和公平调度器
点击schedulableBuilder,查找schedulableBuilder初始化赋值的地方
private var schedulableBuilder: SchedulableBuilder = nulldef initialize(backend: SchedulerBackend): Unit = {this.backend = backendschedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +s"$schedulingMode")}}schedulableBuilder.buildPools()
}
点击schedulingMode,default scheduler is FIFO
private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =... ...SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))... ...
}
private[spark] val SCHEDULER_MODE =
ConfigBuilder("spark.scheduler.mode").version("0.8.0").stringConf.createWithDefault(SchedulingMode.FIFO.toString)
4.3 读取任务
SchedulerBackend.scala
private[spark] trait SchedulerBackend {... ...def reviveOffers(): Unit... ...
}
全局查找reviveOffers实现类CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend.scala
override def reviveOffers(): Unit = {// 自己给自己发消息driverEndpoint.send(ReviveOffers)
}
// 自己接收到消息
override def receive: PartialFunction[Any, Unit] = {... ...case ReviveOffers =>makeOffers()... ...
}private def makeOffers(): Unit = {val taskDescs = withLock {... ...// 取任务scheduler.resourceOffers(workOffers)}if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}
TaskSchedulerImpl.scala
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {... ...val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)for (taskSet <- sortedTaskSets) {val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sumif (taskSet.isBarrier && availableSlots < taskSet.numTasks) {} else {var launchedAnyTask = falseval addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()for (currentMaxLocality <- taskSet.myLocalityLevels) {var launchedTaskAtCurrentMaxLocality = falsedo {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,currentMaxLocality, shuffledOffers, availableCpus,availableResources, tasks, addressesWithDescs)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}... ...}}... ...return tasks
}
Pool.scala
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue
}private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()case _ =>… …}
}
4.4 FIFO和公平调度器规则
SchedulingAlgorithm.scala
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0}
}private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble… …}
}
4.5 发送给Executor端执行任务
CoarseGrainedSchedulerBackend.scala
private def makeOffers(): Unit = {val taskDescs = withLock {... ...// 取任务scheduler.resourceOffers(workOffers)}if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {val serializedTask = TaskDescription.encode(task)if (serializedTask.limit() >= maxRpcMessageSize) {... ...}else {… …// 序列化任务发往Executor远程终端executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}
五、任务的执行
在CoarseGrainedExecutorBackend.scala中接收数据LaunchTask
override def receive: PartialFunction[Any, Unit] = {... ...case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo("Got assigned task " + taskDesc.taskId)taskResources(taskDesc.taskId) = taskDesc.resourcesexecutor.launchTask(this, taskDesc)}... ...
}
Executor.scala
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val tr = new TaskRunner(context, taskDescription)runningTasks.put(taskDescription.taskId, tr)threadPool.execute(tr)
}