|
假设:您已经对yarn有了初步的了解,在此,请原谅小呆呆不会介绍什么是YARN,还请各位google下
假设:您已经理解了分布式系统通信核心:RPC调用,准备好了吗?我们就开始了呀
========================================================================
本文将使用的一些术语
RM:resourceManager
AM:applicationMaster
NM:nodeManager
简单的说,yarn涉及到3个通信协议:
ApplicationClientProtocol:client通过该协议与RM通信,以后会简称其为CR协议
ApplicationMasterProtocol:AM通过该协议与RM通信,以后会简称其为AR协议
ContainerManagementProtocol:AM通过该协议与NM通信,以后会简称其为AN协议
---------------------------------------------------------------------------------------------------------------------
通常而言,客户端向RM提交一个程序,流程是这样滴:
step1:创建一个CR协议的客户端
rmClient=(ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol,rmAddress,conf)
step2:客户端通过CR协议#getNewApplication从RM获取唯一的应用程序ID,简化过的代码:
//GetNewApplicationRequest包含两项信息:ApplicationId 和 最大可申请的资源量
//Records.newRecord(...)是一个静态方法,通过序列化框架生成一些RPC过程需要的对象(yarn默认采用ProtocolBuffers(序列化框架,google ProtocolBuffers这些东东,麻烦大家google下呀,喵))
GetNewApplicationRequest request=Records.newRecord(GetNewApplicationRequest.class);
继续看代码(代码都是简化过的,亲们原谅):
GetNewApplicationResponse newApp =rmClient.getNewApplication(request);
ApplicationId appId = newApp.getApplicationId();
step3:客户端通过CR协议#submitApplication将AM提交到RM上,简化过的代码:
// 客户端将启动AM需要的所有信息打包到ApplicationSubmissionContext 中
ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);
。。。。//设置应用程序名称,优先级,队列名称云云
context.setApplicationName(appName);
//构造一个AM启动上下文对象
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext .class)
。。。//设置AM相关的变量
amContainer.setLocalResource(localResponse);//设置AM启动所需要的本地资源
amContainer.setEnvironment(env);
context.setAMContainerSpec(amContainer);
context.setApplicationId(appId);
SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(request);
rmClien.submitApplication(request);//将应用程序提交到RM上
--------------------------------------------------------------------------------------------------------------------------------------------------
通常而言,AM向RM注册自己,申请资源,请求NM启动Container的流程是这样滴:
AM-RM流程:
step1:创建一个AR协议的客户端
ApplicationMasterProtocol rmClient = (ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class,rmAddress,conf);
step2:AM向RM注册自己
//这里的 recordFactory.newRecordInstance(。。。)与上面的Records.newRecord(。。。)作用一样,都属于静态调用
RegisterApplicationMasterRequest request =recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
request.setHost(host);
request.setRpcPort(port);
request.setTrackingUrl(appTrackingUrl)
RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request);//完成注册
step3:AM向RM请求资源
一段简化的代码如下(感兴趣的朋友,还请亲自阅读源码):
synchronized(this){
askList =new ArrayList<ResourceRequest>(ask);
releaseList = new ArrayList<ContainerId>(release);
allocateRequest = BuilderUtils.newAllocateRequest(....);构造一个 allocateRequest 对象
}
//向RM申请资源,同时领取新分配的资源(CPU,内存等)
allocateResponse = rmClient.allocate(allocateRequest ) ;
//根据RM的应答信息设计接下来的逻辑(资源分配)
.....
step4:AM告诉RM应用程序执行完毕,并退出
//构造请求对象
FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class );
request.setFinishApplicationStatus(appStatus);
..//设置诊断信息
..//设置trackingUrl
//通知RM自己退出
rmclient.finishApplicationMaster(request);
--------------------------------------------------------------------------------------------------------------------------------------------
AM-NM流程 :
step1:构造AN协议客户端,并启动Container
String cmIpPortStr = container.getNodeId().getHost()+":"+container.getNodeId().getPort();
InetSocketAddress cmAddress=NetUtils.createSocketAddr(cmIpPortStr);
anClient = (ContainerManagementProtocol)rpc.getProxy(ContainerManagementProtocol.class,cmAddress,conf)
ContainerLaunchContext ctx=Records.newRecord(ContainerLaunchContext.class);
。。。//设置ctx变量
StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
request.setContainerLaunchContext(ctx);
request.setContainer(container);
anClient.startContainer(request);
Step2:为了实时掌握各个Container运行状态,AM可通过AN协议#getContainerStatus向NodeManager询问Container运行状态
Step3:一旦一个Container运行完成后,AM可通过AN协议#stopContainer释放Container
===============================================================================================
基本设计流程介绍完毕,后续将详分析YARN提供的API
本文参考:董和hadoop-yarn-sourceCode
|
|