spark的spark.executor.cores怎么理解

博客地址:&
实际任务的运行,都是通过Executor类来执行的。这一节,我们只介绍Standalone模式。
源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
SignalLogger.register(log)
SparkHadoopUtil.get.runAsSparkUser { () =&
// Debug code
Utils.checkHost(hostname)
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf//创建Executor sparkConf
val port = executorConf.getInt(&spark.executor.port&, 0)
//创建akkaRpcEnv,内部包含actorSystem
val fetcher = RpcEnv.create(
&driverPropsFetcher&,
executorConf,
new SecurityManager(executorConf))
//获取driver的ActorRef
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
Seq[(String, String)]((&spark.app.id&, appId))
fetcher.shutdown()
// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()//创建driver sparkConf
for ((key, value) &- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
driverConf.set(key, value)
if (driverConf.contains(&spark.yarn.credentials.file&)) {
logInfo(&Will periodically update credentials from: & +
driverConf.get(&spark.yarn.credentials.file&))
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
//创建Executor 的sparkEnv,下面分析
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
// SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
val boundPort = env.conf.getInt(&spark.executor.port&, 0)
assert(boundPort != 0)
// Start the CoarseGrainedExecutorBackend endpoint.
val sparkHostPort = hostname + &:& + boundPort
//这里创建Executor 的ActorRef,onStart方法主要是向driver注册Executor,见下面分析
env.rpcEnv.setupEndpoint(&Executor&, new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
//这个workerWatcher我没看出起什么作用的
workerUrl.foreach { url =&
env.rpcEnv.setupEndpoint(&WorkerWatcher&, new WorkerWatcher(env.rpcEnv, url))
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
}先介绍createExecutorEnv,这个与driver端的几乎一样,之前已经介绍过了,这里就介绍一下与driver不同的地方
1、mapOutputTracker在Executor端是MapOutputTrackerWorker对象,mapOutputTracker.trackerEndpoint实际引用的是driver的ActorRef。
2、blockManagerMaster在内部保存的也是driver的ActorRef
3、outputCommitCoordinator.coordinatorRef实际包含的也是driver的ActorRef
现在介绍一下CoarseGrainedExecutorBackend的onStart方法,看它主动干了什么事。
发送RegisterExecutor消息到driver端,注册Executor。成功返回后再向自己发送RegisteredExecutor消息
override def onStart() {
logInfo(&Connecting to driver: & + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =&
// This is a very fast action so we can use &ThreadUtils.sameThread&
driver = Some(ref)
ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use &ThreadUtils.sameThread&
case Success(msg) =& Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
case Failure(e) =& logError(s&Cannot register with driver: $driverUrl&, e)
}(ThreadUtils.sameThread)
}看driver端接收到后如何处理?重点看最后的makeOffers。当由Executor注册上来之后,如果有等待执行的任务,这时就可以开始了。这个方法后续还会用到,且目前还没讲到任务调度的章节,后续再解释。这里只需要知道,Executor注册上来之后,会触发一把任务调度(如果有任务的话)
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =&
Utils.checkHostPort(hostPort, &Host port expected & + hostPort)
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed(&Duplicate executor ID: & + executorId))
logInfo(&Registered executor: & + executorRef + & with ID & + executorId)
context.reply(RegisteredExecutor)//反馈RegisteredExecutor消息到Executor
addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)//每注册成功一个Executor,就记录总的cores
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors & 0) {
numPendingExecutors -= 1
logDebug(s&Decremented number of pending executors ($numPendingExecutors left)&)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}Executor端接收到之后,创建真正的Executor对象,Executor类是运行任务的接口,里面维护着该Executor进程上的所有任务case RegisteredExecutor =&
logInfo(&Successfully registered with driver&)
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)至此,Executor端的注册逻辑就介绍完了,后续将结合真正的任务介绍其他的内容。
本文已收录于以下专栏:
相关文章推荐
spark通信流程
spark作为一套高效的分布式运算框架,但是想要更深入的学习它,就要通过分析spark的源码,不但可以更好的帮助理解spark的工作过程,还可以提高对集群的排错...
任务提交流程概述
在阐明了Spark的Master的启动流程与Worker启动流程。接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程
源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend
private def run(
driverUrl: String...
前几篇博文都在介绍Spark的调度,这篇博文我们从更加宏观的调度看Spark,讲讲Spark的部署模式。Spark部署模式分以下几种:
local 模式local-cluster 模式S...
这一节介绍具体task的运行以及最终结果的处理
看线程运行的run方法,见代码注释
override def run(): Unit = {
val taskMemoryManager = ...
我们讲到了如何启动Master和Worker,还讲到了如何回收资源。但是,我们没有将AppClient是如何启动的,其实它们的启动也涉及到了资源是如何调度的。这篇博文,我们就来讲一下AppClient...
上一节介绍了worker启动了一个名为CoarseGrainedExecutorBackend的进程,首先看下CoarseGrainedExecutorBackend类的main方法
def main...
spark内核揭秘-13-Worker中Executor启动过程源码分析
spark内核揭秘-13-Worker中Executor启动过程源码分析
spark内核揭秘-13-Worker中Exec...
本文链接:http://blog.csdn.net/u/article/details/
该论文来自Berkeley实验室,英文标题为:Resilient Dist...
他的最新文章
讲师:汪剑
讲师:刘道宽
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)35749人阅读
00.Cloud(59)
作者:刘旭晖 Raymond 转载请注明出处Email:BLOG:随着Spark的逐渐成熟完善,&越来越多的可配置参数被添加到Spark中来,&本文试图通过阐述这其中部分参数的工作原理和配置思路,&和大家一起探讨一下如何根据实际场合对Spark进行配置优化。由于篇幅较长,所以在这里分篇组织,如果要看最新完整的网页版内容,可以戳这里:,主要是便于更新内容Storage相关配置参数spark.local.dir&这个看起来很简单,就是Spark用于写中间数据,如RDD Cache,Shuffle,Spill等数据的位置,那么有什么可以注意的呢。&首先,最基本的当然是我们可以配置多个路径(用逗号分隔)到多个磁盘上增加整体IO带宽,这个大家都知道。&其次,目前的实现中,Spark是通过对文件名采用hash算法分布到多个路径下的目录中去,如果你的存储设备有快有慢,比如SSD+HDD混合使用,那么你可以通过在SSD上配置更多的目录路径来增大它被Spark使用的比例,从而更好地利用SSD的IO带宽能力。当然这只是一种变通的方法,终极解决方案还是应该像目前HDFS的实现方向一样,让Spark能够感知具体的存储设备类型,针对性的使用。&需要注意的是,在Spark 1.0 以后,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)参数会覆盖这个配置。比如Spark On YARN的时候,Spark Executor的本地路径依赖于Yarn的配置,而不取决于这个参数。&&spark.executor.memory&Executor 内存的大小,和性能本身当然并没有直接的关系,但是几乎所有运行时性能相关的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值&理论上Executor 内存当然是多多益善,但是实际受机器配置,以及运行环境,资源共享,JVM GC效率等因素的影响,还是有可能需要为它设置一个合理的大小。 多大算合理,要看实际情况&Executor的内存基本上是Executor内部所有任务共享的,而每个Executor上可以支持的任务的数量取决于Executor所管理的CPU Core资源的多少,因此你需要了解每个任务的数据规模的大小,从而推算出每个Executor大致需要多少内存即可满足基本的需求。&如何知道每个任务所需内存的大小呢,这个很难统一的衡量,因为除了数据集本身的开销,还包括算法所需各种临时内存空间的使用,而根据具体的代码算法等不同,临时内存空间的开销也不同。但是数据集本身的大小,对最终所需内存的大小还是有一定的参考意义的。&通常来说每个分区的数据集在内存中的大小,可能是其在磁盘上源数据大小的若干倍(不考虑源数据压缩,Java对象相对于原始裸数据也还要算上用于管理数据的数据结构的额外开销),需要准确的知道大小的话,可以将RDD cache在内存中,从BlockManager的Log输出可以看到每个Cache分区的大小(其实也是估算出来的,并不完全准确)&如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134(size: 495.3 MB)&&反过来说,如果你的Executor的数量和内存大小受机器物理配置影响相对固定,那么你就需要合理规划每个分区任务的数据规模,例如采用更多的分区,用增加任务数量(进而需要更多的批次来运算所有的任务)的方式来减小每个任务所需处理的数据大小。&&spark.storage.memoryFraction&如前面所说spark.executor.memory决定了每个Executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少可以用于Memory Store管理RDD Cache数据,剩下的内存用来保证任务运行时各种其它内存空间的需要。&spark.executor.memory默认值为0.6,官方文档建议这个比值不要超过JVM Old Gen区域的比值。这也很容易理解,因为RDD Cache数据通常都是长期驻留内存的,理论上也就是说最终会被转移到Old Gen区域(如果该RDD还没有被删除的话),如果这部分数据允许的尺寸太大,势必把Old Gen区域占满,造成频繁的FULL GC。&如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来说,如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能&&spark.streaming.blockInterval&这个参数用来设置Spark Streaming里Stream Receiver生成Block的时间间隔,默认为200ms。具体的行为表现是具体的Receiver所接收的数据,每隔这里设定的时间间隔,就从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。理论上来说,为了每个StreamingBatch 间隔里的数据是均匀的,这个时间间隔当然应该能被Batch的间隔时间长度所整除。总体来说,如果内存大小够用,Streaming的数据来得及处理,这个blockInterval时间间隔的影响不大,当然,如果数据Cache Level是Memory+Ser,即做了序列化处理,那么BlockInterval的大小会影响序列化后数据块的大小,对于Java 的GC的行为会有一些影响。&此外spark.streaming.blockQueueSize决定了在StreamBlock被存储到BlockMananger之前,队列中最多可以容纳多少个StreamBlock。默认为10,因为这个队列Poll的时间间隔是100ms,所以如果CPU不是特别繁忙的话,基本上应该没有问题。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1184650次
积分:9212
积分:9212
排名:第2738名
原创:114篇
评论:187条
文章:19篇
阅读:91501
(2)(4)(3)(5)(1)(2)(1)(2)(2)(4)(1)(1)(3)(2)(1)(2)(2)(4)(1)(5)(12)(1)(1)(2)(1)(8)(1)(1)(2)(1)(1)(2)(8)(6)(2)(1)(1)(8)(7)2971人阅读
spark(12)
Spark 默认采用的是资源预分配的方式。这其实也和按需做资源分配的理念是有冲突的。这篇文章会详细介绍Spark 动态资源分配原理。
最近在使用Spark Streaming程序时,发现如下几个问题:
高峰和低峰Spark Streaming每个周期要处理的数据量相差三倍以上,预分配资源会导致低峰的时候资源的大量浪费。
Spark Streaming 跑的数量多了后,资源占用相当可观。
所以便有了要开发一套针对Spark Streaming 动态资源调整的想法。我在文章最后一个章节给出了一个可能的设计方案。不过要做这件事情,首先我们需要了解现有的Spark 已经实现的 Dynamic Resource Allocation 机制,以及为什么它无法满足现有的需求。
在SparkContext 中可以看到这一行:
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
通过spark.dynamicAllocation.enabled参数开启后就会启动ExecutorAllocationManager。
这里有我第一个吐槽的点,这么直接new出来,好歹也做个配置,方便第三方开发个新的组件可以集成进去。但是Spark很多地方都是这么搞的,完全没有原来Java社区的风格。
动态调整资源面临的问题
我们先看看,动态资源调整需要解决哪几个问题:
Cache问题。如果需要移除的Executor含有RDD cache该如何办?
Shuffle问题。 如果需要移除的Executor包含了Shuffle Write先关数据该怎么办?
添加和删除之后都需要告知DAGSchedule进行相关信息更新。
Cache去掉了重算即可。为了防止数据抖动,默认包含有Cache的Executor是不会被删除的,因为默认的Idle时间设置的非常大:
private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.cachedExecutorIdleTimeout",
s"${Integer.MAX_VALUE}s")
你可以自己设置从而去掉这个限制。
而对于Shuffle,则需要和Yarn集成,需要配置yarn.nodemanager.aux-services。具体配置方式,大家可以Google。这样Spark Executor就不用保存Shuffle状态了。
添加Worker的触发条件是:
有Stage正在运行,并且预估需要的Executors & 现有的
删除Woker的触发条件是:
一定时间内(默认60s)没有task运行的Executor
我们看到触发条件还是比较简单的。这种简单就意味着用户需要根据实际场景,调整各个时间参数,比如到底多久没有运行task的Executor才需要删除。
默认检测时间是100ms:
private val intervalMillis: Long = 100
如何实现Container的添加和释放
只有ApplicationMaster才能够向Yarn发布这些动作。而真正的中控是org.apache.spark.ExecutorAllocationManager,所以他们之间需要建立一个通讯机制。对应的方式是在ApplicationMaster有一个private class AMEndpoint(类,比如删除释放容器的动作在里就有:
case KillExecutors(executorIds) =&
logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
Option(allocator) match {
case Some(a) =& executorIds.foreach(a.killExecutor)
case None =& logWarning("Container allocator is not ready to kill executors yet.")
context.reply(true)
而ExecutorAllocationManager则是引用YarnSchedulerBackend实例,该实例持有ApplicationMaster的 RPC引用
private var amEndpoint: Option[RpcEndpointRef]
如何获取调度信息
要触发上面描述的操作,就需要任务的调度信息。这个是通过ExecutorAllocationListener extends SparkListener来完成的。具体是在 ExecutorAllocationMaster的start函数里,会将该Listener实例添加到SparkContext里的listenerBus里,从而实现对DAGSchecude等模块的监听。机制可以参看这篇文章 。
根据上面的分析,我们至少要知道如下三个信息:
Executor上是否为空,如果为空,就可以标记为Idle.只要超过一定的时间,就可以删除掉这个Executor.
正在跑的Task有多少
等待调度的Task有多少
这里是以Stage为区分的。分别以三个变量来表示:
private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
名字已经很清楚了。值得说的是stageIdToTaskIndices,其实就是stageId 对应的正在运行的task id 集合。
那么怎么计算出等待调度的task数量呢?计算方法如下:
stageIdToNumTasks(stageId) - stageIdToTaskIndices(stageId).size
这些都是动态更新变化的,因为有了监听器,所以任务那边有啥变化,这边都会得到通知。
定时扫描器
有了上面的铺垫,我们现在进入核心方法:
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =&
val expired = now &= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
该方法会每隔100ms被调度一次。你可以理解为一个监控线程。
Executor判定为空闲的机制
只要有一个task结束,就会判定有哪些Executor已经没有任务了。然后会被加入待移除列表。在放到removeTimes的时候,会把当前时间now + executorIdleTimeoutS * 1000 作为时间戳存储起来。当调度进程扫描这个到Executor时,会判定时间是不是到了,到了的话就执行实际的remove动作。在这个期间,一旦有task再启动,并且正好运行在这个Executor上,则又会从removeTimes列表中被移除。 那么这个Executor就不会被真实的删除了。
Executor 需要增加的情况
首先,系统会根据下面的公式计算出实际需要的Executors数目:
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
接着每个计算周期到了之后,会和当前已经有的Executors数:numExecutorsTarget 进行比较。
maxNumExecutorsNeeded & numExecutorsTarget 则会发出取消还有没有执行的Container申请。并且重置每次申请的容器数为1,也就是numExecutorsToAdd=1
否则如果发现当前时间now &= addTime(addTime 每次会增加一个sustainedSchedulerBacklogTimeoutS ,避免申请容器过于频繁),则会进行新容器的申请,如果是第一次,则增加一个(numExecutorsToAdd),如果是第二次则增加2个以此按倍数类推。直到maxNumExecutorsNeeded &= numExecutorsTarget ,然后就会重置numExecutorsToAdd。
所以我们会发现,我们并不是一次性就申请足够的资源,而是每隔sustainedSchedulerBacklogTimeoutS次时间,按[1,2,4,8]这种节奏去申请资源的。因为在某个sustainedSchedulerBacklogTimeoutS期间,可能已经有很多任务完成了,其实不需要那么多资源了。而按倍数上升的原因是,防止为了申请到足够的资源时间花费过长。这是一种权衡。
我们发现,DRA(Dynamic Resource Allocation)涉及到的点还是很多的,虽然逻辑比较简单,但是和任务调度密切相关,是一个非常动态的过程。这个设计本身也是面向一个通用的调度方式。
我个人建议如果采用了DRA,可以注意如下几点:
设置一个合理的minExecutors-maxExecutors值
将Executor对应的cpuCore 最好设置为&=3 ,避免Executor数目下降时,等不及新申请到资源,已有的Executor就因为任务过重而导致集群挂掉。
如果程序中有shuffle,例如(reduce*,groupBy*),建议设置一个合理的并行数,避免杀掉过多的Executors。
对于每个Stage持续时间很短的应用,其实不适合这套机制。这样会频繁增加和杀掉Executors,造成系统颠簸。而Yarn对资源的申请处理速度并不快。
Spark Streaming该使用什么机制动态调整资源
现有的DRA机制其实适合长时的批处理过程中,每个Stage需要的资源量不一样,并且耗时都比较长。Spark Streaming 可以理解为循环的微批处理。而DRA是在每次微批处理起作用,可能还没等DRA反应过来,这个周期就已经过了。
Spark Streaming需要一个从全局一天24小时来考虑。每个调度周期的processing time可能更适合作为增减Executors的标准。同时如果发生delay的话,则可以扩大资源申请的速度。并且,因为是周期性的,释放和新增动作只会发生在一个新的周期的开始,所以他并不会面临现有 DRA的问题,譬如需要通过额外的方式保存Shuffle 状态等。 所以实现起来更加容易。我们可能需要同时监听StreamingContext的一些信息。
具体而言:
每个周期检查上个周期的处理时间 ,设为 preProcessingTime,周期为duration, 一般而言,我们的Spark Streaming程序都会让preProcessingTime & duration。否则会发生delay。
如果 preProcessingTime & 0.8 * duration,则一次性将资源申请到maxExecutors。
如果preProcessingTime & duration,则应该删除的Worker为
removeExecutorNum =
currentExecutors * ((duration -preProcessingTime)/duration - 0.2)
其中0.2 为预留的worker数。如果removeExecutorNum如果&=0 则不进行任何操作。
假设duration =10s, preProcessingTime= 5s, currentExecutors=100,则我们理论上认为 只要保留50%的资源即可。
但是为了防止延时,我们其实额外保留一些20%资源。也就意味着我们删除30个Executor。 我们并不会一次性将资源都释放掉。假设我们增加一个新的参数spark.streaming.release.num.duration=5,这个参数意味着我们需要花费5个周期释放掉这30个Executor的资源。也就是当前这个周期,我们要释放掉 6个Executor。
接着到下一个周期,重复上面的计算。 直到计算结果 &=0 为止。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:676839次
积分:2483
积分:2483
排名:第15536名
原创:58篇
评论:118条
(1)(6)(5)(21)(1)(1)(1)(1)(2)(2)(2)(1)(2)(1)(3)(10)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'博客分类:
在Spark Standalone集群模式下,Driver运行在客户端,所谓的客户端就是提交代码的那台机器。在Standalone模式下,角色包括:
Driver(Client,这里的Client对应到Spark的代码中是AppClient吗?)如下图所示,Driver位于提交代码的那台机器(提交代码的机器是Client),
Worker(Worker是一个进程,它其中会有多个Executor)
为什么说Driver是在提交代码的那台机器上呢?
SparkConf类中有个关于Driver的参数设置,如下代码在SparkContext的构造方法中
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName()) ////host是本地,意思是可以设置的??
conf.setIfMissing("spark.driver.port", "0")
1.Client(Driver)向Master提交Application----通过spark-sumbit提交,指定master=spark:///
2. Master收到Driver的Application请求,申请资源(实际上是Worker的Executor),启动StandaloneExecutorBackend,StandaloneExecutorBackend是Worker跟外界通信的代表
3.图中的第3步代码中是否有体现?
4.Executor启动后,Driver就可以分配Task(launchTask)
5.作业执行过程中,Worker向Driver汇报任务的执行情况
用户的程序分成两部分,一个是初始化SparkContext,定义针对数据的各种函数操作实现业务逻辑(对应不同的RDD),当SparkContext通过runJob提交后,接下来的工作由Driver完成?
Driver是作业的驱动器(或者主进程),负责Job的解析,生成Stage,并调度Task到Executor上执行,其核心和精髓是DAGScheduler和TaskScheduler,通过AKKA消息驱动的方式完成
不是很理解!!这些工作都是SparkContext来完成的,SparkContext中有DAGScheduler和TaskScheduler,为什么会分成两部分?
Driver分为两部分:
1是SparkContext以及围绕这SparkContext的SparkConf和SparkEnv
2是DAGScheduler,TaskScheduler以及部署模块(部署模块主要是TaskScheduler使用)
Driver通过launchTask发送任务给Executor?Executor内部以线程池多线程的方式并行的运行任务(实际顺序是SparkContext.runJob-&DagScheduler.runJob-&DAGScheduler.submitJob-&TaskScheduler.runbJob-&TaskSetManager给LocalActor或者CoarseGrainedActor发送lanchTask消息,CoarseGrainedActor受到消息后调用Executor的lauchTask方法)
SparkConf一旦传递给SparkContext后就不能再修改,因为SparkContext构造时使用了SparkConf的clone方法。
1.LiveListenerBus
里面有个org.apache.spark.scheduler.LiveListenerBus用于广播SparkListenerEvents到SparkListeners,SparkListenerEvents都定义在SparkListener.scala中
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
* Until start() is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
2. SparkEnv类似集群全局变量,在Driver中有,在Worker的Executors中也有,而Worker的Executors有多个,那么每个Executor的每个线程都会访问SparkEnv变量,Spark使用ThreadLocal来保存SparkEnv变量。因此,SparkEnv是一个重量级的东西。
CoarseGrainedSchedulerBackend
1. 在org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend其中创建了DriverActor
// TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
2.CoarseGrainedSchedulerBackend有一个子类org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
关注它的start方法,其中的一句:
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
这个命令用于在Standalone模式下,通过CoarseGrainedExecutorBackend的命令方式启动Executor?
override def start() {
super.start()
// The endpoint for executors to talk to us
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
"{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =&
cp.split(java.io.File.pathSeparator)
val libraryPathEntries =
sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =&
cp.split(java.io.File.pathSeparator)
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
///用于启动Executor的指令?
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
////将command封装到appDesc类中
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) ////App的Client,
///启动ClientActor
client.start()
waitForRegistration()
3.AppClient类
def start() {
// J it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
org.apache.spark.deploy.Client(是一个object)
org.apache.spark.deploy.yarn.Client(是一个object)
org.apache.spark.deploy.yarn.client(这是一个私有类)
org.apache.spark.deploy.client.AppClient(这是一个私有类)
这几个类都在什么集群模式下起作用,用来做什么的?
1.除了action触发Job提交,checkpoint也会触发job提交
2.提交Job时,首先计算Stage的依赖关系,从后面往前追溯,前面
浏览: 396540 次
来自: 北京
关于第一个reduceByKey对应的cache,shuffl ...
看了你的文章,updateStateByKey 这个方式的使用 ...
棒极啦,解决了我的问题。
你好,这个代码生成主要在,那个地方使用。
看楼主这么厉害的样子,请问楼主如何知道类库的版本呢?比如g++ ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 spark executor 日志 的文章

 

随机推荐