MasterofProject

Spark cultivation of the road (advanced articles) - Spark source reading: tenth section Standalone operating mode analysis

Label Source analysisSpark
5941 people read comment(3) Collection Report
Classification:

Standalone Spark uses the Master/Slave architecture, mainly related to the class includes:

Class: org.apache.spark.deploy.master.Master
Description: responsible for the entire cluster of resource scheduling and Application management.
Message type:
Receive the message sent by Worker
OneRegisterWorker
TwoExecutorStateChanged
ThreeWorkerSchedulerStateResponse
FourHeartbeat

Message sent to Worker
OneRegisteredWorker
TwoRegisterWorkerFailed
ThreeReconnectWorker
FourKillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished

Message sent to AppClient
OneRegisteredApplication
TwoExecutorAdded
ThreeExecutorUpdated
FourApplicationRemoved
Receive the message sent by AppClient
OneRegisterApplication
TwoUnregisterApplication
ThreeMasterChangeAcknowledged
FourRequestExecutors
FiveKillExecutors

Message sent to Client Driver
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

Receive messages sent by Client Driver
1.RequestSubmitDriver
2.RequestKillDriver
3.RequestDriverStatus
Class org.apache.spark.deploy.worker.Worker
Description: to register with Master and start CoarseGrainedExecutorBackend, start running Task Executor task at run time
Message type:
Message sent to Master
OneRegisterWorker
TwoExecutorStateChanged
ThreeWorkerSchedulerStateResponse
FourHeartbeat
Receive the message sent by Master
OneRegisteredWorker
TwoRegisterWorkerFailed
ThreeReconnectWorker
FourKillExecutor
5.LaunchExecutor
6.LaunchDriver
7.KillDriver
8.ApplicationFinished
Class org.apache.spark.deploy.client.AppClient.ClientEndpoint
Note: register and monitor Application to Master, request or kill Executors, etc.
Message type:
Message sent to Master
OneRegisterApplication
TwoUnregisterApplication
ThreeMasterChangeAcknowledged
FourRequestExecutors
FiveKillExecutors
Receive the message sent by Master
OneRegisteredApplication
TwoExecutorAdded
ThreeExecutorUpdated
FourApplicationRemoved
Class: org.apache.spark.scheduler.cluster.DriverEndpoint
Note: run time register Executor and start Task running and processing Executor sent to the status update, etc.
Message type:
Message sent to Executor
1.LaunchTask
2.KillTask
3.RegisteredExecutor
4.RegisterExecutorFailed
Receive the message sent by Executor
1.RegisterExecutor
2.StatusUpdate
Class: org.apache.spark.deploy.ClientEndpoint
Description: the management of Driver includes the submission of Kill, Driver dropped Driver and access to the Driver state information
Message sent to Master
1.RequestSubmitDriver
2.RequestKillDriver
3.RequestDriverStatus

Receive the message sent by Master
1.SubmitDriverResponse
2.KillDriverResponse
3.DriverStatusResponse

All of the above classes are inherited from the org.apache.spark.rpc.ThreadSafeRpcEndpoint, the underlying implementation is currently achieved through the AKKA, as shown in the figure below:

Write the picture here.

The interaction between the various types as shown in the following figure:
Write the picture here.

1 interaction between Master and AppClient

SparkContext at the time of creation, will call, createTaskScheduler method to create the corresponding TaskScheduler and SchedulerBackend

 And start the scheduler / / Create
    Val(sched, TS) = (= SparkContext.createTaskScheduler)ThisMaster)
_schedulerBackend = sched
_taskScheduler = TS
_dagScheduler =NewDAGScheduler (This)
_heartbeatReceiver.ask[Boolean] (TaskSchedulerIsSet)

    Start TaskScheduler after taskScheduler sets DAGScheduler / reference in DAGScheduler s "
    / / constructor
_taskScheduler.start ()
Standalone operating mode to create the TaskScheduler and SchedulerBackend specific source code as follows:

/ * *
* a task scheduler based on Create a given master URL.
*返回一个二元调度程序的后台和任务调度。
*
  私人 DEFcreatetaskscheduler(
SC:sparkcontext,
师父:字符串):(schedulerbackend,TaskScheduler)= {

   / /省略其它非关键代码
   案例spark_regex(sparkurl)= >
        瓦尔调度程序=新的taskschedulerimpl(SC)
        瓦尔masterurls = sparkurl分裂(。“,”)。“火花:/ /”+ _)
        瓦尔后端=新的sparkdeployschedulerbackend(调度器,SC,masterurls)
调度程序。初始化(后端)
(后端,调度程序)
    / /省略其它非关键代码

}

创建完成TaskScheduler及schedulerbackend后,调用TaskScheduler的开始方法,启动schedulerbackend(独立模式对应sparkdeployschedulerbackend)

/ / taskschedulerimpl中的开始方法
重写 DEFstart() {
    / /调用schedulerbackend的开始方法
start()后端。
    / /省略其它非关键代码
}

对应sparkdeployschedulerbackend中的开始方法源码如下:

重写 DEFstart() {
    好极了.星

    / /省略其它非关键代码
    / /应用相关信息(包括应用程序名、执行运行内存等)
    瓦尔appdesc =新的applicationdescription(sc.appname,maxcores,sc.executormemory,
命令,appuiaddress,sc.eventlogdir,sc.eventlogcodec,coresperexecutor)
    / /创建appclient,传入相应启动参数
客户=新的appclient(sc.env.rpcenv,大师,appdesc,conf),
start()客户端。
waitforregistration()
}

appclient类中的开始方法原码如下:

/ / appclient开始方法
  DEFstart() {
    / /刚刚推出一个rpcendpoint;它将回到监听电话。
    / / clientendpoint,该clientendpoint为appclient的内部类
    / /它是appclient的rpcendpoint
终点= rpcenv setupendpoint(。“appclient”新的clientendpoint(rpcenv))
}
clientendpoint在启动时,会向大师注册应用

重写 DEFonstart():单位= {
      试试看{
registerwithmaster()
}抓住{
        案例电子:例外=
logwarning(“未能连接到主”,电子
markdisconnected()
stop()
}
}

registerwithmaster方法便是向大师注册应用,其源码如下:

* * * *
*异步注册所有硕士。这将` registerwithmaster `每
* registration_timeout_seconds秒直至超过registration_retries倍。
*一旦我们成功地连接到一个主,所有的安排工作和期货将被取消。
*
* nthretry意味着这是个尝试注册硕士。
*
    私人 DEFregisterwithmaster(nthretry:int){
registermasterfutures = tryregisterallmasters()
      / /注册失败时重试
registrationretrytimer = registrationretrythread scheduleAtFixedRate(。新的运行{
        重写 DEFrun():单位= {
utils.tryorexit {
            如果(注册){
registermasterfutures。foreach(_取消(。真正的)
registermasterthreadpool。shutdownnow()
}其他的 如果(nthretry > = registration_retries){
markdead(“所有的大师都没有反应!放弃。”)
}其他的{
registermasterfutures。foreach(_取消(。真正的)
registerwithmaster(nthretry +)
}
}
}
},registration_timeout_seconds,registration_timeout_seconds,timeunit。秒)
}

向所有大师注册,因为大师可能实现了高可靠(HA),例如管理员的哈方式,所以存在多个大师,但最终只有主动掌握响应,具体源码如下:

* * * *
*寄存器随着所有大师异步返回一个数组的未来的对于取消。
*
私人DEF tryregisterallmasters()数组[ jfuture [ _ ] ] = {
      对于(masteraddress <- masterrpcaddresses)产量{
registermasterthreadpool。提交(新运行{
重写DEF 运行()单位=试试看{
            如果(注册){
              返回
}
LOGINFO(“连接到主”masteraddress.tosparkurl + +“……”)
/ /获取大师rpcendpoint
瓦尔masterref =
rpcenv。setupendpointref(master.system_name,masteraddress,大师。endpoint_name)
/ /向大师发送registerapplication信息masterref。发送(registerapplication(appdescription,自我))
{ } }
即:InterruptedException = > /取消
例非致命性(E)= > logwarning(S“无法连接到主masteraddress美元”,电子
}
})
}
}

主会接收来自appclient的registerapplication消息,具体源码如下:

/ / org。Apache的火花。部署。大师。大师。收到方法接受appclient发送来的registerapplication消息
重写DEF 接收partialfunction [任何单位] = {

案例registerapplication(描述、司机)= > {
/ /做防止重复登记一些驱动程序
      如果(状态= = recoverystate。备用){
/ /忽略,不发送响应
}其他的{
LOGINFO(“注册应用程序”+说明。
/ /创建applicationinfo
瓦尔应用= createapplication(描述、驱动)
/ /注册应用
registerapplication(APP)
LOGINFO(“注册应用程序”description.name + +“带着身份证”+应用程序。
persistenceengine addapplication(APP)。
/ /向appclient发送registeredapplication消息
司机送(registeredapplication(app.id,自我))
schedule()
}
}

appclient内部类clientendpoint接收大师发来的registeredapplication消息

 重写 DEF接收:partialfunction [任何单位] = {
      案例registeredapplication(appid_,masterref)= >
        / / fixme如何处理以下情况?
        / / / / / / / /。主接收多个注册并发送回多个
        / / registeredapplications由于不稳定的网络。
        / / / / / / / /。接收多个registeredapplication不同大师由于主人
        /改变。
appid_ AppID =
注册=真正的
掌握一些(masterref)=
监听连接(AppID)。
       / /省略其它非关键代码
}

通过上述过程便完成的注册其它交互信息如下应用。

/ / ------------------ appclient向大师发送的消息------------------ / /

  / / appclient向大师注册应用
  案例  registerapplication(appdescription:applicationdescription,司机:rpcendpointref)
    延伸deploymessage

 / / appclient向大师注销应用
  案例  unregisterapplication(AppID:字符串)

 / /主从故障中恢复后,发送masterchange消息给appclient,appclient接收到该消息后,更改保存的大师信息,然后发送masterchangeacknowledged给大师
  案例  masterchangeacknowledged(AppID:字符串)

/ /为应用的运行申请数量为requestedtotal的遗嘱执行人
  案例  requestexecutors(AppID:字符串,requestedtotal:int)

/ /杀死应用对应的遗嘱执行人
  案例  killexecutors(AppID:字符串,executorids:SEQ [字符串])
/ / ------------------大师向appclient发送的消息------------------ / /

  / /向appclient发送应用注册成功的消息
  案例  registeredapplication(AppID:字符串,师父:rpcendpointref) 延伸 deploymessage

  / /待办事项(好友):主机更换hostport
  / /工人启动了执行后,发送该消息通知appclient
  案例  executoradded(编号:int,workerid:字符串,字符串,hostport:核心:int,int的记忆:){
checkhostport(hostport工具,“需要hostport”)
}
   / /执行状态更新后,发送该消息通知appclient
  案例  executorupdated(编号:int,状态:executorstate,消息:选择[字符串],
exitstatus:选择[国际])

  / /应用成功运行或失败时,掌握发送该消息给appclient
  / / appclient接收该消息后,停止应用的运行
  案例  applicationremoved(信息:字符串)

   / /主发生变化时,会利用masterchanged消息通知工人及appclient
  案例  masterchanged(主人:rpcendpointref,masterwebuiurl:字符串)

2。主与工人间的交互

这里只给出其基本的消息交互,后面有时间再来具体分析。

 / / ------------------工人向大师发送的消息------------------ / /

  / /向大师注册工人,工人师傅在完成注册后,向工人发送registeredworker消息,此后便可接收来自大师的调度
  案例  registerworker(
标识:字符串,
主机:字符串,
港口:int,
工人:rpcendpointref,
铁心:int,
记忆:int,
webuiport:int,
广播:String)
    扩展deploymessage {
utils.checkhost(host,“需要主机名”)
assert(港口)0)
}

  / /向master汇报executor的状态变化
  复选框  executorstatechanged(
AppID:字符串,
execid:int,
state:executorstate,
信息:选择(String)
exitstatus [国际]):选择
    扩展deploymessage

  / /向master汇报driver状态变化
  复选框  driverstatechanged(
driverid:字符串,
state:driverstate,
例外:例外)[选项]
    扩展deploymessage

  / / worker向master汇报其运行的executor及driver信息
  复选框  workerschedulerstateresponse(编号:字符串,列表executordescription executors:[ ],
driverids:序列(字符串)

  / / worker向master发送的心跳信息,主要向master报活
  复选框  心跳(workerid:String,工人:rpcendpointref) 扩展 deploymessage
 master向worker发送的消息------------------ / / / / ------------------

  / / worker发送registerworker消息注册worker,注册成功后master回复registeredworker消息给worker
  复选框  registeredworker(硕士:rpcendpointref,masterwebuiurl:String) 扩展 deploymessage

  / / worker发送registerworker消息注册worker,注册失败后master回复registerworkerfailed消息给worker
  复选框  registerworkerfailed(消息:String) 扩展 deploymessage

   / / worker心跳超时后,master向worker发送reconnectworker消息,通知worker节点需要重新注册
  复选框  reconnectworker(masterurl:String) 扩展 deploymessage

  / / application运行完毕后,master向worker发送killexecutor消息,worker接收到消息后,删除对应execid的executor
  复选框  killexecutor(masterurl:String,String,execid AppID::Int) 扩展 deploymessage
   / /向worker节点发送启动executor消息
  复选框  launchexecutor(
masterurl:字符串,
AppID:字符串,
execid:int,
appdesc:应用;
铁心:int,
记忆:int)
    扩展deploymessage

  / /向worker节点发送启动driver消息
  复选框  launchdriver(driverid:字符串,driverdesc:driverdescription) 扩展 deploymessage

  / /杀死对应driver
  复选框  killdriver(driverid:String) 扩展 deploymessage

  复选框  applicationfinished(编号:String)

3。司机client与master间的消息交互

司机client主要是管理driver,包括向master提交driver、请求杀死driver等,其源码位于org.apache.spark.deploy.client.scala源码文件当中,类名为:org.apache.spark.deploy.clientendpoint.要注意其与org.apache.spark.deploy.client.appclient.clientendpoint类的本质不同.

 / / / / ------------------司机client间master信息的交互------------------

  client向master请求提交driver /司机
  复选框  requestsubmitdriver(driverdescription:driverdescription) 扩展 deploymessage
  / / master向driver client返回注册是否成功的消息
  复选框  submitdriverresponse(
硕士学位:rpcendpointref,成功:布尔,字符串选项driverid:[ ],消息:String)
    扩展deploymessage


  / /司机client向master请求kill司机
  复选框  requestkilldriver(driverid:String) 扩展 deploymessage
  / / master回复kill driver是否成功
  复选框  killdriverresponse(
硕士学位:rpcendpointref,driverid:成功:布尔,字符串,字符串)消息:
    扩展deploymessage


  client向master请求driver状态/司机
  复选框  requestdriverstatus(driverid:String) 扩展 deploymessage
  / / master向driver client返回状态请求信息
  复选框  driverstatusresponse(found:布尔,state:选择[ driverstate ]
workerid:[ ]选项字符串,字符串选项workerhostport:[ ],[选项]例外:例外)

4。driver与executor间的消息交互

driver向executor发送的消息------------------ / / / / ------------------
  / /启动task
  复选框  launchtask(数据:serializablebuffer) 扩展 coarsegrainedclustermessage
  / /杀死task
  复选框  killtask(taskid:长,执行程:字符串,interruptthread:布尔)
    扩展coarsegrainedclustermessage
 / / executor注册成功
复选框 对象 registeredexecutor 扩展 coarsegrainedclustermessage
 / /执行注册失败
案例  registerexecutorfailed(信息:字符串) 延伸 coarsegrainedclustermessage



/ / ------------------执行向司机发送的消息------------------ / /
 / /向司机注册遗嘱执行人
 案例  registerexecutor(
executorid:字符串,
executorref:rpcendpointref,
hostport:字符串,
核心:int,
logurls:地图[字符串,字符串])
    延伸coarsegrainedclustermessage {
checkhostport(hostport工具,“预期的主机端口”)
}

 / /向司机汇报状态变化
 案例  statusupdate(executorid:字符串:长,状态:taskstate taskId,
数据:serializablebuffer) 延伸 coarsegrainedclustermessage

  对象 statusupdate{
    / *备用工厂方法将缓冲区直接对数据场* /
    DEF申请(executorid:字符串:长,taskstate taskId状态,数据缓冲区)
:statusupdate = {
statusupdate(executorid taskId,状态,新的serializablebuffer(数据))
}
}

作者:周志湖
网名:摇摆少年梦

猜你在找
查看评论
*以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
    个人资料
    • 访问:446236次
    • 积分:五千七百六十三
    • 等级:
    • 排名:2391名第
    • 原创:91篇
    • 转载:0篇
    • 译文:1篇
    • 评论:159条
    博客专栏
    文章分类
    文章存档
    最新评论