spark源码学习阅读笔记

上传人:痛*** 文档编号:215125504 上传时间:2023-06-01 格式:PDF 页数:103 大小:17.82MB
收藏 版权申诉 举报 下载
spark源码学习阅读笔记_第1页
第1页 / 共103页
spark源码学习阅读笔记_第2页
第2页 / 共103页
spark源码学习阅读笔记_第3页
第3页 / 共103页
资源描述:

《spark源码学习阅读笔记》由会员分享,可在线阅读,更多相关《spark源码学习阅读笔记(103页珍藏版)》请在装配图网上搜索。

1、BONCspark源码解析文档(vl.0)北京东方国信科技股份有限公司目录1 spark 源码阅读笔记.42 spark 源码之 master 与 worker.52.1.1 master节点的启动过程分析.52.1.2 worker节点启动和注册.73 driver进程的启动.113.1.1 client节点主要流程.113.1.2 mster节点主要流程.133.1.3 woker节点主要流程.143.1.4 启动流程图.164 spark 境变量.184.1 sparkConf.184.2 SparkContext.185 RDD.225.1 RDD分区列表。.235.2 RDD的依赖列

2、表.235.2.1 窄依赖.245.2.2 宽依赖.265.3 RDD的分区计算函数.265.4 分区器.275.5 位置偏好.285.6 全排序.295.6.1 水塘采样.305.6.2 获取分区边界.326 stage的划分和提交.356.1 Job的提交.356.2 stage 的划分.366.3 stage 的提交.397 excutor启动流程.437.1 CoarseGrainedSchedulerBackend 的启动.437.2 appliction 的注册.447.3 资源调度.457.4 excutor 的启动.488 task的分发.508.1 taskscheduler

3、 初始化.508.2 submitMissingTasks.518.3 resourceoffers.528.3.1 Task 的调度.528.3.2 任务本地性.548.3.3 task 的分发.569 task的运行.619.1 task多线程模型.619.1.1 MapReduce的多进程模型和Spark的多线程模型.629.2 Task 的 执 行(一).639.3 Shuffle.649.4 shuffleMapTask-shffleWrite.649.4.1 HashShuffleManager 运行原理.659.4.2 SortShuffleManager 运行原理.679.5

4、task 的 执 行(二).779.6 shuffleMapTask-shffleRead.789.6.1 Netty网络传输服务.799.7 shuffle相关参数调优.859.8 ResultTask.881 0 存储模块.8910.1 BlockManager 初始化.8910.2 持久化策略.9010.3 数据持久化到内存中.9110.4 数据持久化到磁盘中.9910.5 数据块的获取.1011 spark源码阅读笔记s p a r k 是由伯克利A M P 实验室开发出来的一个非常优秀的数据处理计算框架,利用这个框架我们可以非常容易的开发并行程序,然后在集群上运行,s p a r k

5、 社区为我们提供了 J a v a,S c a l a,p y t h o n,R 语言等很多接口,每个程序员可以选择自己熟悉的语言进行开发。s p a r k 在实现过程中,利用了很多现成的技术和思想,例如通信框架选用的是 a k k a 和 n e t t y,这两种通信技术方案已经在工业界盛行多年,有很多成功的案例可以借鉴;s p a r k s h f f l e 的实现s o r t s h f f l e 的实现其实是完全借鉴了m a p r e d u c e s h f f l e 实现的思路,只是实现细节不太一样;s p a r k 在底层调度的过程中也尽可能的复用了 y a

6、 r n 和m e s o s 这两种的资源调度机制。可以这样认为s p a r k 其实是h a d o o p 的加强版,他们同根同源利用的都是m a p r e d u c e 这种先切分后聚合的思想,只是h a d o o p 对于数据处理的思路单一,加工方式过于生硬。s p a r k 相比较M a p r e d u c e 有下面儿个优化的地方。D A G编程模型。将数据加工链路整理成一个D A G(有向无环图),这其实在理论上给h a d o o p数据加工过程一个很好的总结,对于数据加工链路比较长的j o b,s pa r k 会将其切分成多个s t a g e 对应了 H

7、a d o o p的多个Ma p和 R e d u c e 逻辑,一个 s t a g e 包含了多个并行执行的t a s k s。s t a g e 之间通过s h f f l e 传递数据,通过依赖相互串联起来。这样的话整个j o b 过程只需要在H d f s 读写一 次,不需要像m a pr e d u c e 那样在h d f s 中间写入多次,大大提高了速度。申请资源方式不同。s pa r k 上 Ex c u t o r 对应一个Jv m 资源,多个t a s k对应Jv m 上的多个线程,Ex c u t o r 可以被多个t a s k 所复用,申请资源次数比较少。Ma pr

8、 e d u c e 每一个 t a s k 任务会对应一个进程,且相互之间不能复用,申请资源次数比较多。R D D 缓存机制。s pa r k 对于中间运算的结果可以缓存在内存,下次再取数据时就不必重新计算,这非常适用于一些迭代式的任务的运行,可以大大提高速度。容错机制。s pa r k 在 D A G的基础上建立了每个R D D 的血统依赖和检查点机制。当数据加工过程中的某个t a s k 中间执行失败后,可以根据其依赖关系以最小代价重启t a s k,而不必像Ma pr e d u c e 那样从头开始运行t a s k o本次源码阅读用的是s pa r k 1.2 版本,部署版本为s

9、t a n d a l o n e 模式,另外说明一下s t a n d a l o n e 的意思独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统,这也是s pa r k y a r n 版本和m e s o s 版本的 基 石 出。pa r k 后续版本可能在某些细节上有所优化,增加了一些库文件,但基本思想并未改变,所以下文引入的源码基本上都是s pa r k 1.2 的源码。2 spark 源码之 master 与 worker2.1.1 master节点的启动过程分析阅读程序一般都是从m a i n 函数开始或者从j o b 的提交开始,但这不是分布式程序

10、源码阅读的很好思路,分布式程序因为是程序分布在不同的节点上,并不是我们以前看到的单机版程序,在 jo b 提交之前或者在集群安装过程中,其实发生了很多很有意思的事情,这些东西是s p a r k jo b 能够正常提交的很重要基础。s p a r k集群启动完之后,会有两个常驻进程,分别是ma s t e r 和 w o r ke r 进程,之所以说是常驻进程,是因为这两个进程不会随着jo b 结束就消失。与之相反的是d r iv e r 和 e x c ut o r 进程,这两个进程是随着jo b 的提交动态生成的,位置和数目都是随机的(或者人工指定的),当jo b 结束后这两个进程就会消失

11、,一直到下一个jo b 的提交才会重新产生。首先来看ma s t e r 节点进程,代码所在的位置在o r g.a p a c he,s p a r k.d e p lo y,ma s t e r 包中(我很讨厌贴代码不说代码位置,这对于新手很麻烦),找到ma in 函数,如程序清单2.1 所示。def main(argStrings:ArrayString)SignalLogger.register(log)val conf=new SparkConfval args=new MasterArguments(argStrings,conf)/*这是最关键的部分,在这里通过akka注册了 ma

12、ser节 点*/val(actorSystem,_ _)=startSystemAndActor(args.host,args.port,args.webUiPort,conf)/*等待注册完成,系统终止*/actorSystem.awaitTerminationO)程序清单2.1 maser节点的main函数在 s t a r t S y s t e mAn d Ac t o r 函数里边注册了一1 个 ma s e r 的 a c t o r,在 a c t o r 启动时会进入p r e S t a r t 函数,如程序清单2.2 所示。override def preStart()lo

13、glnfo(Starting Spark master at+masterUrl)/*对远程消息进行订阅*/context.system.eventStream.subscribe(selt classOfRemotingLifecycleEvent/*启动web UI服 务*/webUi.bind()masterWebllillrl=http:/+masterPublicAddress+webUi.boundPort/*定期对workder节点进行检查,心跳是否超时,检查节点是否坏死*/context.system.scheduler.schedule(0 millis,WORKER_TIM

14、EOUT millis,self,CheckForWorkerTimeOut)/*mster原始据收集*/masterMetricsSystem.registerSource(masterSource)masterMetricsSystem.start()applicationMetricsSystem.start()m a se r节 点 的 主 备 切 换 机 制*/persistenceEngine=RECOVERY_MODE match case ZOOKEEPER=loglnfo(Persisting recovery state to ZooKeeper)new ZooKeeper

15、PersistenceEngine(SerializationExtension(context.system),conf)case FILESYSTEM=loglnfo(Persisting recovery state to directory:+RECOVERY_DIR)new FileSystemPersistenceEngine(RECOVERY_DIR,SerializationExtension(context.system)case _=new BlackHolePersistenceEngine()leaderElectionAgent=RECOVERY_MODE match

16、 case ZOOKEEPER1=context.actorOf(Props(classOfZooKeeperLeaderElectionAgent,self masterUrl,conf)case _=context.actorOffPropsfclassOftMonarchyLeaderAgent,self)程序清单2.2 maser节点的启动过程从以上代码可以看到,maser节点在注册过程中,主要进行了元数据收集信息,这是通过MetricsS ystem做到的,它是为了衡量系统的各种指标的度量系统。算是一个key-value形态的东西,这种指标可以通过webUI 显示出来,举个比较简单的

17、例子,我怎么把当前JVM相关信息展示出去呢?做法自然很多,通过MetricsS ystem就可以做的更标准化些,具体方式如下:1.S ource数据来源。比如对应的有org.apache,spark,metrics,source.JvmS ource2.S i n k o 数据发送到哪去。有被动和主动。一般主动的是通过定时器来完成输出,譬如CS VS ink,被动的如MetricsS ervlet等需要被用户主动调用。3.桥接 S ource 和 S ink 的则是 MetricRegistry 了。S park并没有实现底层Metrics的功能,而是使用了一个第三方库:http:/。感兴趣大

18、家可以看看,有个更完整的认识。另外在注册过程中,master节点配置了主备切换机制,也就是上边的持久化引擎-persistenceEngine。Master实际上可以配置两个,S park原生的standalone模式是支持Master主备切换的。也就是说,当Active Master节点挂掉时,可以将 S tandBy master 节点切换为 Active Master。S park Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于Zookeeper的。基于文件系统的主备切换机制,需要在Active Master挂掉之后,由我们手动切换到S tandBy Master

19、上;而基于Zookeeper的主备切换机制,可以自动实现切换Master。所以这里说的主备切换机制,实际上指的是在Active Master挂掉之后,切换到S tandBy Master时,Master会执行的操作。首先,S tandBy Master会使用持久化引擎去读取持久化的storedApps,storedDrivers,storedWorkerso 持久化引擎有两种:FileS ystemPersistenceEngine 和 ZookeeperPersistentEngine.,读取出来后,会进行判断,如果 storedApps,storedDrivers,storedWorker

20、s 有任何一个是非空的,继续向下执行,去启动master恢复机制,将持久化的Application,Driver,Worker信息重新进行注册,注册到Master内部的缓存结构中。注册完之后,将Application和Worker的状态修改为UNK NOWN,然后向Application所对应的 Driver,以及 Worker 发送 S tandBy Master 的地址。Driver 和 Worker,理论上来说,如果它们目前都在正常运行的话,那么在接收到Master发送来的地址之后,就会返回相应消息给新的Mastero此时,Master在陆续接收到Driver和 Worker发送来的响应

21、消息后,会使用completeRecovery()方法对没有发送响应消息的Driver和 Worker进行处理,过滤掉它们的信息。最后,调用Master自己的schedule。方法,对正在等待资源调度的Driver和 Application进行调度,比如在某个worker上启动Driver,或者为Application在 Worker上启动它需要的Executoro2.1.2 worker节点启动和注册接下来我们看看worker启动的过程,启动代码位于org.apache,spark,deploy,worker包中,首先进入main函数,如程序清单2.3所示。def main(argStrin

22、gs:ArrayString)SignalLogger.register(log)val conf=new SparkConfval args=new WorkerArguments(argStringsz conf)val(actorSystem,_)=startSystemAndActor(args.host,args,port,args.webUiPort,args.cores,args.memory,args.masters,args.workDir)actorSystem.awaitTermination()程序清单2.3 woker节点main函数worker启动过程于maser很

23、类似,也是注册一个worker actor,代码比较简单,只是每个worer创建了一个工作目录,同时向maser进行注册worker节点,woker节点会向所有的maser节点进行注册,同时worker节点本身会启动一个定时器registrationRetryT imer进行调度,不断对maser节点发起注册请求,如果刚开始注册一直失败,则worker本身会延长注册时间,直至注册成功,源码如程序清单2.4 所示。override def preStart()assert(!registered)loglnfo(Starting Spark worker%s:%d with%d cores,%s

24、 RAM.format(host,port,cores,Utils.megabytesToString(memory)loglnfo(Spark home:+sparkHome)createWorkDir()context.system.eventstream.subscribe(self,classOfRemotingLifecycleEvent)shuffleService.startlfEnabled()weblli=new WorkerWebUI(thisz workDir,webUiPort)webUi.bind()registerWithMaster()metricsSystem.

25、registerSource(workerSource)metricssystem.start()def registerWithMaster()/DisassociatedEvent may be triggered multiple times,so dont attempt registration/if there are outstanding registration attempts scheduled.registrationRetryTimer match case None=registered=falsetryRegisterAIIMasters()connectionA

26、ttemptCount=0registrationRetryTimer=Some context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,INITIAL_REGISTRATION_RETRY_INTERVAL,self,ReregisterWithMaster)case Some(J=loglnfo(Not spawning another attempt to register with the master;since there is an+attempt scheduled already.)程序清单2

27、.4 worke节点的启动和注册m a s t e r 节点收到注册请求后,会将w o r k e r 的信息进行封装成W o r k e r I n f o,并将W o r k e r I n f o 加入到m a s t e r 节点上相应的数据结构上以作记录,同时m a s t e r节点对应的持久化引擎也会记录w o r k e r 信息以作主备切换。然后m a s t e r 会向w o r k e r 发送的注册进行相应,报告w o k e r 节点已经注册成功,由于有新的w o r k e r节点加入,资源的增加会触发新一轮的调度,调度策略比较复杂,我们后边再讲。主要过程源码如程

28、序清单2.5 所示。case RegisterWorker(id,workerHost,workerPort,cores,memory,workerUiPort,publicAddress)=(loglnfo(Registering worker%s:%d with%d cores,%s RAM.format(workerHost,workerPort,cores,Utils.megabytesToString(memory)if(state=Recoverystate.STANDBY)/ignore,dont send response else if(idToWorker.contains

29、(id)sender!RegisterWorkerFailed(Duplicate worker ID)else val worker=new Workerlnfo(id,workerHost,workerPort,cores,memory,sender;workerlliPort,publicAddress)if(registerWorker(worker)persistenceEngine.addWorker(worker)sender!RegisteredWorker(masterUrl,masterWeblliUrl)schedule()else val workerAddress=w

30、orker.actor.path.addresslogWarning(Worker registration failed.Attempted to re-register worker at same +address:+workerAddress)sender!RegisterWorkerFailed(Attempted to re-register worker at same address:+workerAddress)程序清单2.5 maser节点注册workerw o k e r 节点收到ma s e r 发送的注册信息成功后,会立即对ma s e r 信息的进行保存,例如ma

31、s t e r 的U R I,ma s e r 对应的a c t o r 等,然后会关闭重复向ms t e r节点发送注册信息的定时器,同时w o k e r 节点开启一个调度任务默认每隔1 5s向ma s t e r 节点发送心跳信号,表示w o r k e r 节点的存活状态,源码如程序清单2.60case RegisteredWorkerfmasterUrl,masterWeblliUrl)=loglnfo(Successfully registered with master+masterllrl)registered=truechangeMasterfmasterllrl,maste

32、rWebUiUrl)context.system.scheduler.schedule(0 millis,HEARTBEAT_MILLIS millis,self,SendHeartbeat)if(CLEANUP_ENABLED)loglnfo(sWorker cleanup enabled;old application directories will be deleted in:$workDirH)context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,CLEANUP_INTERVAL_MILUS millis,s

33、elf,WorkDirCleanup)程序清单2.6 worker注册成功后开始向master发送心跳信号ma s t e r 收到w o k e r 心跳信号后,会更新收到w o r k e r 心跳信号的最后时间,用来检测w o r k e r 是否超时或者失效。这里分析一下w o r k e r 心跳超时的情况,我们前边讲到ma s e r 节点启动后会同时启动一个调度定时器,这个定时器调度会不间断的检查每个w o r k e r 的心跳信号是否超时,如果心跳信号距离上次心跳超时60 s,会默认这个w o r k e r 已经失效,ma s e r 节点会把这个节点标记为de a d,同

34、时移除这个w o k e r 节点上的E x c u t o r进程,然后通知dr i v e r 在另外的机器上重新分配e x c u lo r 进程,如果心跳信号距离上次心跳超过1 6分钟,就认为这个w o r k e r 彻底de a d,就更新w o k e r s 信息,并通过w e b U I 显示出来,源码如程序清单2.7 所示。def timeOutDeadWorkers()/Copy the workers into an array so we dont modify the hashset while iterating through itval currentTime

35、=System.currentTimeMillis()val toRemove=workers.filter(_.lastHeartbeat currentTime-WORKER_TIMEOUT).toArrayfor(worker-toRemove)if(worker.state!=WorkerState.DEAD)logWarning(HRemoving%s because we got no heartbeat in%d secondsH.format(worker.id,WORKER_TIMEOUT/1QOO)removeWorker(worker)else if(worker.las

36、tHeartbeat CLIENTcase cluster=CLUSTERcase _=printErrorAndExit(Deploy mode must be either client or cluster);-1)/In client mode,launch the application main class directly/In addition,add the main application jar and any added jars(if any)to the classpathif(deployMode=CLIENT)childMainClass=args.mainCl

37、assif(isllserJar(args.primaryResource)childClasspath+=args.primaryResource)if(args.jars!=null)childClasspath+=args.jars.split(,)if(args.childArgs!=null)childArgs+=args.childArgs)/In standalone-cluster mode,use Client as a wrapper around the user classif(clusterManager=STANDALONE&deployMode=CLUSTER)c

38、hildMainClass=org.apache.spark.deploy.Clientif(args.supervise)childArgs+=-supervise)childArgs+=launchchildArgs+=(args.master,args.primaryResource,args.mainClass)if(args.childArgs!=null)childArgs+=args.childArgs)程序 清 单3.3 deploy两种模式client和cluster如果d e p l o y-m o d e 选择c l u s t e r 模式,将会进入o r g.a p

39、a c h e,s p a r k,d e p l o y.Cl i e n t 类中,这里生成了一 个 c l i e n t Ac t o r,然后利用这个a c t o r 会向m a s e r 节点发起d r i v e r 注册请求源码如程序清单3.4 所示。case launch=/TODO:We could add an env variable here and intercept it in sc.addJar that would/truncate filesystem paths similar to what YARN does.For now,we just req

40、uire/people call addJar assuming the jar is in the same directory.val mainClass=org.apache.spark.deploy.worker.DriverWrapperval classPathConf=spark.driver.extraClassPathval classPathEntries=sys.props.get(classPathConf).toSeq.flatMap cp=cp.split(java.io.File.pathseparator)val libraryPathConf=spark.dr

41、iver.extraLibraryPathval libraryPathEntries=sys.props.get(libraryPathConf).toSeq.flatMap cp=cp.split(java.io.File.pathSeparator)val extraJavaOptsConf=spark.driver.extraJavaOptionsval extraJavaOpts=sys.props.get(extraJavaOptsConf).map(Utils.splitCommandString).getOrElse(Seq.empty)val sparkJavaOpts=Ut

42、ils.sparkJavaOpts(conf)val javaOpts=sparkJavaOpts+extraJavaOptsval command=new Command(mainClass,Seq(WORKER_URL,driverArgs.mainClass)+driverArgs.driverOptions,sys.env,classPathEntries,libraryPathEntries,javaOpts)val driverDescription=new DriverDescription(driverArgs.jarUrl,driverArgs.memory,driverAr

43、gs.cores,driverArgs.supervise,command)masterActor!RequestSubmitDriver(driverDescription)程序清单3.4向master节点注册driver3.1.2 mster节点主要流程m a s t e r 节点收到d r i v e r 注册信息后,首先会判断m a s t e r 节点是否存活,如果处于s t a n d b y 模式,将会向c l i e n t 发送注册失败的回应信息,如果m a s t e r节点存活,将会根据封装的d r i v e r 资源信息,创 建 个 d r i v e r 数据结构并

44、进行保存,然后加入m a s t e r 的持久化引擎中。等到上边都完成后,m a s t e r 节点将会启动调度程序,在调度程序中随机选择一个w o r k e r 节点启动d r i v e r 程序,并向c l i e n t 发送d r i v e r 注册成功信息作为回应。这部分程序在o r g.a p a c h e,s p a r k,d e p l o y,m a s t e r 源码如程序清单 3.5 所示。case RequestSubmitDriver(description)=if(state!=RecoveryState.ALIVE)val msg=sHCan on

45、ly accept driver submissions in ALIVE state.Current state:$state.sender!SubmitDriverResponse(falsez None,msg)else loglnfo(Driver submitted +mand.mainClass)val driver=createDriver(description)persistenceEngine.addDriver(driver)waitingDrivers+=driverdrivers.add(driver)schedule()sender!SubmitDriverResp

46、onse(true,Some(driver.id),sDriver successfully submitted as$driver.id)def launchDriver(worker:Workerinfo,driver:Driverinfo)loglnfo(Launching driver+driver.id+on worker+worker.id)worker.addDriver(driver)driver,worker=Some(worker)worker.actor!LaunchDriverfdriver.id,driver,desc)driver.state=D r i ve rS

47、tate.RUNNING程序清单3.5 master处理driver注册信息client收到master的注册回应信息后,会对maser节点进行driver状态的进行一次查询请求,查看当前driver是否已经启动。3.1.3 woker节点主要流程接下来我们分析真正的driver启动过程,启动程序在org.apache,spark,deploy,worker 中,worker 节点收至!J maser 发送的启动driver请求后,会将driver本身需要的内存,cpu核数进行记录。然后worker会启动一个线程,在这个线程中将会为driver创建一个目录用于存放driver本身需要的jar包

48、和第三方文件等资源,然后在这个线程中正式启动driver进程,进程启动后,这时候进入的主类还并不是用户的主程序而是org.apache,spark,deploy,worker.DriverWrapper,在这个类中创建了一 个名字叫workerWatcher的Actor,这个actor负责driver和 worker之间的通信,然后通过java反射得到用户提交的job,这才真正进入最终进入用户的主程序。接下来开始执行用户提交的job,执行过程中启动driver进程的线程将会阻塞直到用户提交的主程序执行结束,driver进程最终终止后才恢复执行。如果 driver进程执行过程中被异常终止,当前线

49、程将会根据当前driver进程返回的错误状态,重新启动driver进程。源码如程序清单3.6 所示。case LaunchDriver(driverld,driverDesc)=loglnfo(sAsked to launch driver$driverld)val driver=new DriverRunner(conf,driverld,workDir,sparkHome,driverDesc,self,akkaUrl)drivers(driverld)=driverdriver.start()coresllsed+=driverDesc.coresmemoryllsed+=driverD

50、esc.mem)def start()=new Thread(DriverRunner for+driverld)override def run()tryval driverDir=createWorkingDirectory()val localJarFilename=downloadUserJar(driverDir)/Make sure user application jar is on the classpath/TODO:If we add ability to submit multiple jars they should also be added hereval buil

51、der=Commandlltils.buildProcessBuilder(driverDmand,driverDesc.mem,sparkHome.getAbsolutePath,substitutevariables,Seq(localJarFilename)launchDriver(builder;driverDir;driverDesc.supervise)catch case e:Exception=finalException=Some(e)valstate=if(killed)DriverState.KILLED else if(finalException.isDefined)

52、DriverState.ERROR else finalExitCode match case Some(O)=DriverState.FINISHEDcase _=DriverState.FAILED)finalstate=Some(state)worker!DriverStateChanged(driverldz state,finalException).start()程序清单3.6 driver进程的启动driver进程结束后,会向当前woker节点汇报进程结束状态,woker会将此状态进行转发至master节点,master节点会将driver从当前的保存数据结构中移除,然 后 刷

53、新driver进 程所在的worker资源列表信息,代码位于o rg.a p a c h e.s p a rk.d e p l o y,m a s t e r 中,源码如所示。case DriverStateChanged(driverld,state,exception)=state match case DriverState.ERROR|Driverstate.FINISHED|DriverState.KILLED|DriverState.FAILED=removeDriver(driverld,state,exception)case_=throw new Exception(sRec

54、eived unexpected state update for driver$driverld:$state)def removeDriver(driverld:String,finalstate:DriverState,exception:OptionException)drivers.find(d=d.id=driverld)match case Some(driver)=loglnfo(sRemoving driver:$driverld)drivers-=driverif(completedDrivers.size=RETAINED_DRIVERS)val toRemove=mat

55、h.max(RETAINED_DRIVERS/10,1)completedDrivers.trimStart(toRemove)completedDrivers+=driverpersistenceEngine.removeDriver(driver)driver.state=finalstatedriver.exception=exceptiondriver.worker.foreach(w=w.removeDriver(driver)schedule()case None=logWarning(sAsked to remove unknown driver:$driverld)程序清单3.

56、7 driver信息的移除3.1.4启动流程图d ri v e r进程启动整体的流程图如图3.1 所示图 3.1 driver进程流程图4 spark境变量前边的很多工作,其实都是系统层面的东西,与用户的a p p l i c a t i o n 还没有太大关系,从这篇开始涉及到的基本都是用户级的a p p l i c a t i o n,s p a rk 也真正开始执行用户的j o b。4.1 sparkConf用户执行程序中都会申明一个s p a rk C o n f 变量,这个变量会读取我们为s p a rk 配置的环境参数,它是一组K-V 属性对,内部用了一个m a p 用来封装我们设

57、置的属性。需要注意的是,s p a rk C o n f 内部读取属性时必须以 s p a rk.开头,否则设置的属性不能被读到,这算是个小技巧问题,给用户了一种传参的手段。源码在o rg.a p a c h e,s p a rk.S p a rk C o n f 中,核心如程序清单4.1 所示。privatespark val settings=new HashMapString,String()if(loadDefaults)/Load any spark.*system propertiesfor(k,v)mesos模式。对于这四种模式,spark都有对应的任务调度器和调度程序。核心源码

58、如程序清单4.3所示。/Create and start the schedulerprivatespark var(schedulerBackend,taskScheduler)=SparkContext.createTaskScheduler(this,master)private val heartbeatReceiver=env.actorSystem.actorOf(Props(new HeartbeatReceiver(taskScheduler),HeartbeatReceiver)(volatile privatespark var dagScheduler:DAGSchedu

59、ler=_try(dagScheduler=new DAGScheduler(this)catch case e:Exception=thrownew SparkException(DAGScheduler cannot be initialized due to%s.format(e.getMessage)/*standalone模式,创建的任务调度器和调度方法*/case SPARK_REGEX(sparkUrl)=val scheduler=new TaskSchedulerlmpl(sc)val masterUrls=sparkUrl.splitC/TrnapCspark:/+_)va

60、l backend=new SparkDeploySchedulerBackend(scheduleG sc,masterUrls)scheduler.initialize(backend)(backend,scheduler)/*yarn client模式,创建的任务调度器和调度方法*/case yarn-client=val scheduler=try val clazz=Class.forName(org.apache.spark.scheduler.cluster.YarnClientClusterScheduler)val cons=clazz.getConstructor(clas

61、sOfSparkContext)cons.newlnstance(sc).aslnstanceOfTaskSchedulerlmpl catch case e:Exception=throw new SparkException(YARN mode not available?,e)val backend=try val clazz=Class.forName(org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend)val cons=clazz.getConstructor(classOfTaskSchedulerlmpl,c

62、lassOfSparkContext)cons.newlnstance(scheduler,sc),aslnstanceOfCoarseGrainedSchedulerBackend catch case e:Exception=throw new SparkException(YARN mode not available?”,e)scheduler.initialize(backend)(backend,scheduler)程序清单4.3创建spark调度器总结一下,DagS cheduler是用户job级别的调度,它负责把我们的job切分 成 一 个一个的stage,stage对应一组

63、task;T askS cheduler是系统层面的调度,负责将每组task按照优先级排序并选出优先级最高的tasks(即用户的stage)o7.加入 sprkListenerEvent 事件往 LiveListenerBus 中加入了S parkListenerEnvironmentUpdate,parkListenerAppli cationS tart 两类事件,LiveListenerBus对这两种事件监听的监听器就会调用onEnvironmentUpdateonApplicationS tart方法进行处理参考资料http: RDDs p a r k框架之所以比h a d o o p

64、 m a p r e d u c e这套框架优秀,很重要的一点就是s p a r k框架给出了数据加工处理流程一些清晰而极具抽象的概念。根据这些概念,我们可以将数据加工流程很容易的划分成一个阶段一个阶段的(对应与s p a r k的s t a g e),这有点类似于数据加工的标准化,相比较而言m a p r e d u c e还是比较粗糙和混沌的,这在一些数据加工链比较长的作业体现的更加明显,从这点来说,s p a r k对于大数据而言意义还是比较大的。s p a r k 提出的一个新的概念是 R D D (R e s i l i e n t D i st r i b u t e d D a

65、t a se t s:AF a u l t-T o l e r a n t Ab st r a c t i o n f o r I n-M e m o r y C l u st e r C o m p u t i n g),中文意思就是弹性数据集,提出来源是B e r k e l e y实验室,从这点来看带有浓郁的学术色彩。R D D是sp a r k数据计算的最小单元,我们做各种数据操作,比如过滤,加减等对应的都是从一个R D D到另一个R D D,前边提到的st a g e的构成就是由一系列的R D D构成,只不过这些R D D一般都满足特定的规律。R D D是sp a r k中的一个类,

66、主要包括5个重要属性,这5个属性基本涵盖了分布式运算中常用的操作,分别如下所示。分 区列表,这是做分布式计算首先考虑的问题,考虑到分布式运算其实是将数据平均打散到各个节点或者各个j v m里边中,体现到数据上就是每个分区,合理的控制分区的数据集是分布式运算一个比较重要的参考指标,分区列表中的每个分区对应了 sp a r k中的一个t a sk.分 区器,这个和分区列表相对应的,主要指定了分区的具体策略,上边提到了将数据平均打散到各个分区内,这里就是负责数据打散的策略。依 赖列表,sp a r k加工的数据链路一般比较长,每一次的数据加工都会生成一个R D D,利用依赖列表可以有效的将R D D串联起来。分 区计算函数,获得分区数据后,会对分区数据进行相关操作,这也是真正进行数据处理的一步。每 个R D D的分区计算逻辑是不同的,如果当前R D D没有父R D D例如H a d o o p R D D就会直接取出每条数据并以迭代器的形式返回最终的数据;如果当前R D D含有父R D D,则会首先获取父R D D的迭代器,这个过程需要传入当前的分区作为形参并可以递归的执行直到当前R D D

展开阅读全文
温馨提示:
1: 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
2: 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
3.本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
5. 装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
关于我们 - 网站声明 - 网站地图 - 资源地图 - 友情链接 - 网站客服 - 联系我们

copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!