Cloud Compution BlockManagerMaster

Driver内含BlockManageMaster
和BlockManageMasterEndpoint
,Endpoint存放若干BlockManageSlaveEndpoint
的Ref,并通过此Ref利用BlockManageSlaveEndpoint
实现Driver对BlockManage的管理。
Executor内含BlockManageMaster
和BlockManageSlaveEndpoint
,以及若干BlockManage。Executor将BlockManage以及相应的BlockManageSlaveEndpoint
注册到BlockManageMasterEndpoint
中,并可以更新Endpoint中Block的消息、询问所需要Block所在的位置、在Executor结束时移除Executor等等。
Details from code view
BlockManageMaster
Driver和Executor通过此类完成对BlockManage的管理,利用Master中的Transmission Tools将操作转化为消息发送到Endpoint中执行具体操作。
Transmission Tools
ask
和askSync
函数都在RpcEndpointRef
类中被实现,在BlockManageMaster
中被使用,用于实现driver和executor到BlockManageMasterEndpoint
和BlockManagerSlaveEndpoint
的消息传输。
tell
函数在BlockManagerMaster
中被实现,也在BlockManageMaster
中被使用,本质是askSync
。
ask
1 |
|
ask
函数是传输工具中最基础的函数,使用了scala.concurrent.Future
类完成消息的非阻塞式的异步处理:ask
函数发送消息并返回Future(某个尚未就绪的值的对象),等待发送完成后Future才就位。
由于笔者对Java/Scala异步处理不甚了解,这里不赘述。对Future类的理解详见Scala 中的异步事件处理和Scala之Future。
askSync
1 | /** |
在以前的版本中同步请求功能由基于AkkaUtil.askWithReply
的askDriverWithReply
函数实现;
新版本中阻塞式地基于ask
函数使用awaitResult
方法在timeout时间内等待ask
的异步结果,保证在timeout时间内一定有结果返回,否则报错。这里本质是用异步实现同步,只要定时等待即可。
tell
1 | /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */ |
tell
函数是对askSync
函数的一种强化,它要求在我们将消息发送到指定Endpoint之后必须获得true
的返回,否则抛出异常。
Functionality
主要操作函数包括:
removeExecutor
:移出死亡的Executor;registerBlockManager
:注册BlockManage;updateBlockInfo
:更新BlockManage信息;getLocations
:获取给定blockId对应的BlockManage的位置;getLocationsAndStatus
:获取给定blockId对应的BlockManage的位置和状态;contains
:检查是否包含某BlockManage,利用getLocations实现;getPeers
:获取集群中其他节点中的BlockManage的ID(blockId);getExecutorEndpointRef
:获取某Executor对应的SlaveEndpoint的Ref;removeBlock
:移除给定blockId对应的BlockManage,首先在blockManagerInfo找到block,再利用其对应的SlaveEndpoint实现移除操作;removeRdd
:移除给定RDD中所有BlockManage;removeShuffle
:移除给定Shuffle中所有BlockManage;removeBroadcast
:移除给定Broadcast中所有BlockManage;getMemoryStatus
、getStorageStatus
:获取存储信息;getBlockStatus
:获取BlockManage信息;getMatchingBlockIds
:搜索式查询blockId;HasExclusiveCachedBlocks
:查询executor是否包含某些BlockManage。
EndPoint
BlockManageMasterEndpoint
这里的指官方代码中的BlockManageMasterEndpoint.scala
文档,其中包含了BlockManageMasterEndpoint
和BlockManagerInfo
两个类。BlockManageMasterEndpoint
类为主体,BlockManagerInfo
类描述BlockManage信息,并在BlockManageMasterEndpoint
中被实例化。
BlockManageMasterEndpoint
BlockManagerMasterEndpoint
is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses of all slaves’ block managers.
其在以前的版本中也叫BlockManageMasterActor
(来源:《深入理解Spark核心思想——源码分析》)。
BlockManageMasterEndpoint
只存在于Driver上,Executor在BlockManageMaster中获取BlockManageMasterEndpoint
的引用,并向其发送消息(使用ask
、askSync
、tell
),实现和Driver的交互。其源码内维护了很多缓存数据结构:
1 | // Mapping from block manager id to the block manager's information. |
类的主体函数为receiveAndReply
,此函数作为匹配BlockManageMasterEndpoint
接收到消息的偏函数,将接收到的消息和此类中的具体函数相匹配,完成类似于switch case
的操作。
1 | override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { |
类中其余函数均为和消息相匹配的,实现注册、更新、获取消息等操作的函数,这里对其实现不做赘述。
BlockManagerInfo
描述BlockManage信息的类,BlockManage在BlockManageMasterEndpoint
中以从BlockManageId到BlockManagerInfo
的映射来存储,此类存储BlockManage的blockManagerId、slaveEndpoint等内容:
1 | private[spark] class BlockManagerInfo( |
BlockManagerSlaveEndpoint
SlaveEndpoint配合BlockManage执行一些来自于driver和executor的要求操作(通过BlockManageMaster
),其主体函数同样是receiveAndReply
,不过内部执行操作的选项较少,主要包括去除Block、RDD、Broadcast,获取信息等操作,所有匹配后的具体操作都是通过相应的具体类(如BlockManage、shuffleManager等)完成。
Others
- 关于BlockManage、Shuffle、Broadcast的概念不甚了解,因此在文章的某些地方描述的有些含糊。希望等到对概念的理解更广泛时做一次refine。
- 突然写
BlockManageMaster
主要是云计算最后一节课要求大家准备一份对Spark源码的阅读后的理解和分析,在课上草草地选了较为简单的BlockManageMaster
部分,翻了翻源码,看了几篇讲得很含糊的文章,有了一些基础的理解。尽管在课上没有被叫起来分享,但是为了不浪费那几十分钟整理出来的内容,在此做一个更加详细的梳理。