当前位置: 首页 > news >正文

用html5做的网站的原代码/网站制作报价

用html5做的网站的原代码,网站制作报价,静态网页建站,灯具公司网站模板Spark任务的执行源码 目录Spark任务的执行源码一、概述1.1 任务切分和任务调度原理1.2 本地化调度1.3 失败重试与黑名单机制二、阶段的划分三、任务的切分四、任务的调度4.1 提交任务4.2 FIFO和公平调度器4.3 读取任务4.4 FIFO和公平调度器规则4.5 发送给Executor端执行任务五、…

Spark任务的执行源码

目录

  • Spark任务的执行源码
    • 一、概述
      • 1.1 任务切分和任务调度原理
      • 1.2 本地化调度
      • 1.3 失败重试与黑名单机制
    • 二、阶段的划分
    • 三、任务的切分
    • 四、任务的调度
      • 4.1 提交任务
      • 4.2 FIFO和公平调度器
      • 4.3 读取任务
      • 4.4 FIFO和公平调度器规则
      • 4.5 发送给Executor端执行任务
    • 五、任务的执行

一、概述

1.1 任务切分和任务调度原理

在这里插入图片描述
在这里插入图片描述

1.2 本地化调度

任务分配原则:根据每个Task的优先位置,确定Task的Locality(本地化)级别,本地化一共有五种,优先级由高到低顺序:

移动数据不如移动计算
在这里插入图片描述

1.3 失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

二、阶段的划分

0)在WordCount程序中查看源码

package com.hadoop.sparkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {// 1 创建scval conf: SparkConf = new SparkConf().setAppName("WC").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)// 2 读取数据 hello atguigu spark sparkval lineRDD: RDD[String] = sc.textFile("input")// 3 一行变多行val wordRDD: RDD[String] = lineRDD.flatMap((x: String) => x.split(" "))// 4 变换结构  一行变一行val wordToOneRDD: RDD[(String, Int)] = wordRDD.map((x: String) => (x, 1))// 5  聚合key相同的单纯val wordToSumRDD: RDD[(String, Int)] = wordToOneRDD.reduceByKey((v1, v2) => v1 + v2)// 6 收集打印wordToSumRDD.collect().foreach(println)//7 关闭资源sc.stop()}
}

1)在WordCount代码中点击collect

RDD.scala

def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)
}

SparkContext.scala

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {runJob(rdd, func, 0 until rdd.partitions.length)
}def runJob[T, U: ClassTag](rdd: RDD[T],func: Iterator[T] => U,partitions: Seq[Int]): Array[U] = {val cleanedFunc = clean(func)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int]): Array[U] = {val results = new Array[U](partitions.size)runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)results
}def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {... ...dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)... ...
}

DAGScheduler.scala

def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = {  ... ... val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)... ...
}def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {... ...val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))waiter
}

EventLoop.scala

def post(event: E): Unit = {if (!stopped.get) {if (eventThread.isAlive) {eventQueue.put(event)} else {... ...}}
}private[spark] val eventThread = new Thread(name) {override def run(): Unit = {while (!stopped.get) {val event = eventQueue.take()try {onReceive(event)} catch {... ...}}}
}

查找onReceive实现类(ctrl + h)

DAGScheduler.scala

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {... ...override def onReceive(event: DAGSchedulerEvent): Unit = {val timerContext = timer.time()try {doOnReceive(event)} finally {timerContext.stop()}}private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)... ...}... ...private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties): Unit = {var finalStage: ResultStage = nullfinalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)... ...}private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage = {… …val parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement()val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage}private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {getShuffleDependencies(rdd).map { shuffleDep =>getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList}private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new ListBuffer[RDD[_]]waitingForVisit += rddwhile (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.remove(0)if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.prepend(dependency.rdd)}}}parents}private def getOrCreateShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _],firstJobId: Int): ShuffleMapStage = {shuffleIdToMapStage.get(shuffleDep.shuffleId) match {case Some(stage) =>stagecase None =>getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>if (!shuffleIdToMapStage.contains(dep.shuffleId)) {createShuffleMapStage(dep, firstJobId)}}// Finally, create a stage for the given shuffle dependency.createShuffleMapStage(shuffleDep, firstJobId)}}def createShuffleMapStage[K, V, C](shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {... ...val rdd = shuffleDep.rddval numTasks = rdd.partitions.lengthval parents = getOrCreateParentStages(rdd, jobId)val id = nextStageId.getAndIncrement()        val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)... ...      }    ... ...
}

三、任务的切分

DAGScheduler.scala

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties): Unit = {var finalStage: ResultStage = nulltry {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)} catch {... ...}val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    ... ...submitStage(finalStage)
}private def submitStage(stage: Stage): Unit = {val jobId = activeJobForStage(stage)if (jobId.isDefined) {        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)if (missing.isEmpty) {submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()... ...val tasks: Seq[Task[_]] = try {val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()stage match {case stage: ShuffleMapStage =>        stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = partitions(id)stage.pendingPartitions += id            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())}case stage: ResultStage =>partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptNumber,taskBinary, part, locs, id, properties, serializedTaskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,stage.rdd.isBarrier())}}} catch {... ...}
}

Stage.scala

private[scheduler] abstract class Stage(... ...)extends Logging {... ...def findMissingPartitions(): Seq[Int]... ...
}

全局查找(ctrl + h)findMissingPartitions实现类。

ShuffleMapStage.scala

private[spark] class ShuffleMapStage(... ...)extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {private[this] var _mapStageJobs: List[ActiveJob] = Nil... ...override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)}
}

ResultStage.scala

private[spark] class ResultStage(... ...)extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {... ...override def findMissingPartitions(): Seq[Int] = {val job = activeJob.get(0 until job.numPartitions).filter(id => !job.finished(id))}... ...
}

四、任务的调度

4.1 提交任务

DAGScheduler.scala

private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {... ...if (tasks.nonEmpty) {taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))} else {markStageAsFinished(stage, None)stage match {case stage: ShuffleMapStage =>markMapStageJobsAsFinished(stage)case stage : ResultStage =>logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")}submitWaitingChildStages(stage)}
}

TaskScheduler.scala

def submitTasks(taskSet: TaskSet): Unit

全局查找submitTasks的实现类TaskSchedulerImpl

TaskSchedulerImpl.scala

override def submitTasks(taskSet: TaskSet): Unit = {val tasks = taskSet.tasksthis.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])... ...stageTaskSets(taskSet.stageAttemptId) = manager// 向队列里面设置任务schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) ... ...}// 取任务backend.reviveOffers()
}

4.2 FIFO和公平调度器

点击schedulableBuilder,查找schedulableBuilder初始化赋值的地方

private var schedulableBuilder: SchedulableBuilder = nulldef initialize(backend: SchedulerBackend): Unit = {this.backend = backendschedulableBuilder = {schedulingMode match {case SchedulingMode.FIFO =>new FIFOSchedulableBuilder(rootPool)case SchedulingMode.FAIR =>new FairSchedulableBuilder(rootPool, conf)case _ =>throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +s"$schedulingMode")}}schedulableBuilder.buildPools()
}

点击schedulingMode,default scheduler is FIFO

private val schedulingModeConf = conf.get(SCHEDULER_MODE)
val schedulingMode: SchedulingMode =... ...SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))... ...
}
private[spark] val SCHEDULER_MODE =
ConfigBuilder("spark.scheduler.mode").version("0.8.0").stringConf.createWithDefault(SchedulingMode.FIFO.toString)

4.3 读取任务

SchedulerBackend.scala

private[spark] trait SchedulerBackend {... ...def reviveOffers(): Unit... ...
}

全局查找reviveOffers实现类CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend.scala

override def reviveOffers(): Unit = {// 自己给自己发消息driverEndpoint.send(ReviveOffers)
}
// 自己接收到消息
override def receive: PartialFunction[Any, Unit] = {... ...case ReviveOffers =>makeOffers()... ...
}private def makeOffers(): Unit = {val taskDescs = withLock {... ...// 取任务scheduler.resourceOffers(workOffers)}if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}

TaskSchedulerImpl.scala

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {... ...val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)for (taskSet <- sortedTaskSets) {val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sumif (taskSet.isBarrier && availableSlots < taskSet.numTasks) {} else {var launchedAnyTask = falseval addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()for (currentMaxLocality <- taskSet.myLocalityLevels) {var launchedTaskAtCurrentMaxLocality = falsedo {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,currentMaxLocality, shuffledOffers, availableCpus,availableResources, tasks, addressesWithDescs)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}... ...}}... ...return tasks
}

Pool.scala

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue
}private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {schedulingMode match {case SchedulingMode.FAIR =>new FairSchedulingAlgorithm()case SchedulingMode.FIFO =>new FIFOSchedulingAlgorithm()case _ =>… …}
}

4.4 FIFO和公平调度器规则

SchedulingAlgorithm.scala

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}res < 0}
}private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val minShare1 = s1.minShareval minShare2 = s2.minShareval runningTasks1 = s1.runningTasksval runningTasks2 = s2.runningTasksval s1Needy = runningTasks1 < minShare1val s2Needy = runningTasks2 < minShare2val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDoubleval taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble… …}
}

4.5 发送给Executor端执行任务

CoarseGrainedSchedulerBackend.scala

private def makeOffers(): Unit = {val taskDescs = withLock {... ...// 取任务scheduler.resourceOffers(workOffers)}if (taskDescs.nonEmpty) {launchTasks(taskDescs)}
}private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {for (task <- tasks.flatten) {val serializedTask = TaskDescription.encode(task)if (serializedTask.limit() >= maxRpcMessageSize) {... ...}else {… …// 序列化任务发往Executor远程终端executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}
}

五、任务的执行

在CoarseGrainedExecutorBackend.scala中接收数据LaunchTask

override def receive: PartialFunction[Any, Unit] = {... ...case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = TaskDescription.decode(data.value)logInfo("Got assigned task " + taskDesc.taskId)taskResources(taskDesc.taskId) = taskDesc.resourcesexecutor.launchTask(this, taskDesc)}... ...
}

Executor.scala

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {val tr = new TaskRunner(context, taskDescription)runningTasks.put(taskDescription.taskId, tr)threadPool.execute(tr)
}
http://www.lbrq.cn/news/1268479.html

相关文章:

  • 网站开发Java与Python/搜索引擎营销的内容和层次有哪些
  • 外贸通道支持asp的网站吗/seo搜索引擎优化招聘
  • 做程序教程网站赚钱吗/百度广告一级代理
  • 做网站的简称/湖南seo优化服务
  • 空气过滤棉上海网站建设/查询网址域名
  • 在线设计平台官网/百度seo算法
  • 如何查询网站使用什么框架做的/seo西安
  • 珠海做企业网站多少钱/网站统计分析工具的主要功能
  • 搜索引擎及门户网站介绍总结/上海搜索排名优化公司
  • 天河网站 建设seo信科分公司/做网站怎么优化
  • 做视频网站视频源/百度做广告多少钱一天
  • 建设网站诈骗是什么罪/windows优化大师官方免费
  • 游戏网站怎么做推广/太原百度快速优化
  • 七宝网站建设/使用 ahrefs 进行 seo 分析
  • 怎么做网站注册推广/如何对网站进行推广
  • 做推广任务的网站有哪些/东莞网站公司哪家好
  • 郑州做网站比较好公司/长沙seo推广公司
  • 泉州做网站开发公司/seo和sem分别是什么
  • 做破解网站合法/seo指什么
  • 商城网站seo/推广公司哪家好
  • 淘宝内部卷网站怎么做/seo排名优化培训怎样
  • html5网站建设思路/今日热点新闻事件
  • 做淘宝券推广的网站有哪些/公司页面设计
  • 高端的网站名称/附近成人电脑培训班
  • 新建南昌网站建设公司/上海网站seo排名优化
  • 交互式网站设计 深圳/最新网站查询
  • dw免费网站模板下载/推广网站文案
  • 网上设计网站/完整的品牌推广方案
  • 自驾游网站建设方案/谷歌代理
  • c2c电子商务网站需具备哪些业务功能/厦门人才网个人会员
  • 关于 xrdp远程桌面报错“Error connecting to sesman on 127.0.0.1:3350“的解决方法
  • 【AI论文】Rep-MTL:释放表征级任务显著性在多任务学习中的潜力
  • VisualStudio的一些开发经验
  • PPT自动化 python-pptx - 9: 图表(chart)
  • 关于鸦片战争的历史
  • 【AI学习】RadioDiff:代码学习