网站开发语言查看,云南建设工程招投标信息网,网站制作预付款会计分录,公司做网站多背景
之前我们看到了基于ring和tree的两种allreduce算法#xff0c;对于ring allreduce#xff0c;一块数据在reduce scatter阶段需要经过所有的rank#xff0c;allgather阶段又需要经过所有rank#xff1b;对于tree allreduce#xff0c;一块数据数据在reduce阶段要上行…背景
之前我们看到了基于ring和tree的两种allreduce算法对于ring allreduce一块数据在reduce scatter阶段需要经过所有的rankallgather阶段又需要经过所有rank对于tree allreduce一块数据数据在reduce阶段要上行直到根节点在broadcast阶段再下行回来整个流程比较长。
SHARP
因此基于这一点mellanox提出了SHARP将计算offload到了IB switch每个节点只需要发送一次数据这块数据会被交换机完成规约然后每个节点再接收一次就得到了完整结果。 如图1展示了一个胖树物理网络拓扑其中绿色节点为交换机黄色节点的为host。 图 1
图2展示了基于图1的物理网络拓扑这些host节点执行SHARP初始化后得到的SHARP treeSHARP tree是个逻辑概念并不要求物理拓扑一定是胖树。 图 2
值得注意的是
其中一个交换机可以位于最多64个SHARP tree中一个SHARP tree可以建立大量的groupgroup指的是基于现有的host创建的一个子集支持数百个集合通信api的并发执行
整体流程
本篇还是基于2.7.8介绍类似tree allreduce机内仍然是个chain以两机为例数据的发送按照图3箭头的方向运行交换机计算出完整结果后再按照箭头反方向流动。
图 3 初始化
运行IB SHARP依赖libnccl-net.so这个代码开源在https://github.com/Mellanox/nccl-rdma-sharp-plugins本篇基于v2.0.1介绍。 nccl完成bootstrap网络的初始化后开始初始化数据网络
ncclResult_t initNetPlugin(ncclNet_t** net, ncclCollNet_t** collnet) {void* netPluginLib dlopen(libnccl-net.so, RTLD_NOW | RTLD_LOCAL);...ncclNet_t* extNet (ncclNet_t*) dlsym(netPluginLib, STR(NCCL_PLUGIN_SYMBOL));if (extNet NULL) {INFO(NCCL_INIT|NCCL_NET, NET/Plugin: Failed to find STR(NCCL_PLUGIN_SYMBOL) symbol.);} else if (initNet(extNet) ncclSuccess) {*net extNet;ncclCollNet_t* extCollNet (ncclCollNet_t*) dlsym(netPluginLib, STR(NCCL_COLLNET_PLUGIN_SYMBOL));if (extCollNet NULL) {INFO(NCCL_INIT|NCCL_NET, NET/Plugin: Failed to find STR(NCCL_COLLNET_PLUGIN_SYMBOL) symbol.);} else if (initCollNet(extCollNet) ncclSuccess) {*collnet extCollNet;} return ncclSuccess;}if (netPluginLib ! NULL) dlclose(netPluginLib);return ncclSuccess;
}首先dlopen libnccl-net.so其中NCCL_PLUGIN_SYMBOL为ncclNetPlugin_v3NCCL_COLLNET_PLUGIN_SYMBOL为ncclCollNetPlugin_v3所以就是获取符号ncclNetPlugin_v3赋值给extNet 获取符号ncclCollNetPlugin_v3赋值给extCollNet。extNet和extCollNet的关系有点像nccl的bootstrap网络和数据网络的关系extCollNet负责实际数据的通信元信息控制信息之类的是通过extNet执行交换的。 然后执行extNet的初始化extNet支持ibucx等多种后端我们用的是ib因此之后所有的执行其实都是针对的ibPluginibPlugin的逻辑和nccl的ncclNetIb逻辑很像初始化即ncclIbInit就是枚举当前机器所有的网卡保存到全局数组中。 然后执行initCollNet即ncclSharpInit其实也是执行ibPlugin的初始化之前已经执行过了所以这里什么也不做。
图搜索 struct ncclTopoGraph collNetGraph;collNetGraph.id 2; collNetGraph.pattern NCCL_TOPO_PATTERN_TREE;collNetGraph.collNet 1; collNetGraph.crossNic ncclParamCrossNic();collNetGraph.minChannels collNetGraph.maxChannels ringGraph.nChannels;NCCLCHECK(ncclTopoCompute(comm-topo, collNetGraph));pattern为NCCL_TOPO_PATTERN_TREE因此机内仍然是ring不过和NCCL_TOPO_PATTERN_SPLIT_TREE不同的地方在于NCCL_TOPO_PATTERN_TREE的第0个gpu会同时执行网卡的收发假设搜出来的channel如下
NET/0 GPU/0 GPU/1 GPU/2 GPU/3 GPU/4 GPU/5 GPU/6 GPU/7 NET/0channel连接
图搜索完成之后就开始channel的连接即记录当前节点的相邻节点
ncclResult_t ncclTopoPreset(struct ncclComm* comm,struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph,struct ncclTopoRanks* topoRanks) {int rank comm-rank;int localRanks comm-localRanks;int nChannels comm-nChannels;for (int c0; cnChannels; c) {struct ncclChannel* channel comm-channelsc;int* collNetIntra collNetGraph-intrac*localRanks;for (int i0; ilocalRanks; i) {if (collNetIntra[i] rank) {int prev (i-1localRanks)%localRanks, next (i1)%localRanks;channel-collTreeDn.up collNetIntra[prev];channel-collTreeDn.down[0] collNetIntra[next];channel-collTreeUp.down[0] channel-collTreeDn.down[0];channel-collTreeUp.up channel-collTreeDn.up;}}}// Duplicate channels rings/treesstruct ncclChannel* channel0 comm-channels;struct ncclChannel* channel1 channel0nChannels;memcpy(channel1, channel0, nChannels*sizeof(struct ncclChannel));return ncclSuccess;
}假设一共搜索到了10个channel那么前5个channel用于执行SHARP的上行阶段即图三箭头方向后边5个channel用于执行下行阶段即箭头的反方向。所以每个channel中记录了两组连接关系即collTreeUp和collTreeDn前5个上行channel只会用到collTreeUp后5个下行channel只会用到collTreeDn但其实collTreeUp和collTreeDn完全一致后续只介绍collTreeUp。 此时机内channel连接之后如图四所示箭头指向的是up。
图 4 然后开始处理机间的连接其实就是选出机内负责网络收发的rank以发送为例称为sendIndex需要断掉他的机内连接以及断掉连到sendIndex的连接。
ncclResult_t ncclTopoConnectCollNet(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, int rank) {int nranks comm-nRanks;int depth nranks/comm-nNodes;int sendIndex collNetGraph-pattern NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo patternint sendEndIndex (sendIndexcomm-localRanks-1)%comm-localRanks;for (int c0; ccomm-nChannels/2; c) {struct ncclChannel* channel comm-channelsc;// Set root of collTree to id nranksif (rank collNetGraph-intra[sendIndexc*comm-localRanks]) { // is masterchannel-collTreeUp.up channel-collTreeDn.up nranks;} if (rank collNetGraph-intra[sendEndIndexc*comm-localRanks]) { // is bottom of intra-node chainchannel-collTreeUp.down[0] channel-collTreeDn.down[0] -1; } channel-collTreeUp.depth channel-collTreeDn.depth depth;INFO(NCCL_GRAPH, CollNet Channel %d rank %d up %d down %d, c, rank, channel-collTreeUp.up, channel-collTreeUp.down[0]);}int recvIndex 0; // recv GPU index is always 0int recvEndIndex (recvIndexcomm-localRanks-1)%comm-localRanks;for (int c0; ccomm-nChannels/2; c) {struct ncclChannel* channel comm-channelscomm-nChannels/2c;// Set root of collTree to id nranksif (rank collNetGraph-intra[recvIndexc*comm-localRanks]) { // is masterchannel-collTreeUp.up channel-collTreeDn.up nranks;} if (rank collNetGraph-intra[recvEndIndexc*comm-localRanks]) { // is bottom of intra-node chainchannel-collTreeUp.down[0] channel-collTreeDn.down[0] -1; } channel-collTreeUp.depth channel-collTreeDn.depth depth;INFO(NCCL_GRAPH, CollNet Channel %d rank %d up %d down %d, comm-nChannels/2c, rank, channel-collTreeDn.up, channel-collTreeDn.down[0]);}return ncclSuccess;
}
这里选择的sendIndex就是chain中的第一个sendEndIndex就是chain中的最后一个因为最后一个会连接到sendIndex所以要断掉这俩的连接然后遍历上行channel将sendIndex的up设置为nranks将sendEndIndex的down设置为-1同理对下行channel。此时channel如图五所示
图 5 通信链接的建立
然后开始建立机内和机间的通信链接。
if (comm-nNodes 1 ncclParamCollNetEnable() 1 collNetSupport() collNetGraph.nChannels) {int logicChannels comm-nChannels/2;int collNetSetupFail 0; const int recvIndex 0; // recv GPU index is always 0const int sendIndex collNetGraph.pattern NCCL_TOPO_PATTERN_TREE ? 0 : 1; // send GPU index depends on topo patternfor (int c0; clogicChannels; c) {struct ncclChannel* channelRecv comm-channelslogicChannelsc;struct ncclChannel* channelSend comm-channelsc;NCCLCHECK(ncclTransportP2pSetup(comm, collNetGraph, channelRecv, 1, channelRecv-collTreeDn.up, 1, channelRecv-collTreeDn.down));NCCLCHECK(ncclTransportP2pSetup(comm, collNetGraph, channelSend, 1, channelSend-collTreeUp.down, 1, channelSend-collTreeUp.up));const int recvMaster collNetGraph.intra[c*comm-localRanksrecvIndex];const int sendMaster collNetGraph.intra[c*comm-localRankssendIndex];if (collNetSetup(comm, collNetGraph, channelRecv, rank, nranks, recvMaster, sendMaster, comm-nNodes, 1) ! 1)collNetSetupFail 1; else if (collNetSetup(comm, collNetGraph, channelSend, rank, nranks, sendMaster, recvMaster, comm-nNodes, 0) ! 1)collNetSetupFail 1; } // Verify CollNet setup across ranksNCCLCHECK(checkCollNetSetup(comm, rank, collNetSetupFail));}
以上行channel为例即channelSend通过ncclTransportP2pSetup建立机内的连接由于sendIndex和sendEndIndex的up/down被设置为了nranks或-1所以只会建立起图五中箭头所示的连接。 然后开始建立机间的通信链接每个机器上的第一个rank负责网络收发这个rank称为这个节点的master然后开始建立sharp通信组sharp通信组只包含了每个节点的master。
static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) {int rankInCollNet -1;int supported 0;int isMaster (rank masterRank) ? 1 : 0;struct {int collNetRank;ncclConnect connect;} sendrecvExchange;// check if we can connect to collnet, whose root is the nranks-th rankstruct ncclPeerInfo *myInfo comm-peerInforank, *peerInfo comm-peerInfonranks;peerInfo-rank nranks;int ret 1;if (isMaster) {NCCLCHECK(collNetTransport.canConnect(ret, comm-topo, collNetGraph, myInfo, peerInfo));}// send master receives connect info from peer recv master...// selectstruct ncclPeer* root channel-peersnranks;struct ncclConnector* conn (type 1) ? root-recv : root-send;struct ncclTransportComm* transportComm (type 1) ? (collNetTransport.recv) : (collNetTransport.send);conn-transportComm transportComm;// setupstruct ncclConnect myConnect;if (isMaster ret 0) {NCCLCHECK(transportComm-setup(comm-topo, collNetGraph, myInfo, peerInfo, myConnect, conn, channel-id));}...
}先看下对recvMaster执行collNetSetupisMaster表示当前这个节点是不是recvMater然后设置peerInfo这里的peerInfo用的是comm中的第nranks个ncclPeerInfo然后通过canConnect判断是否可以通过collNet连接这里直接返回1表示可以连接。
然后执行transportComm-setup初始化通信相关资源。
ncclResult_t collNetRecvSetup(struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* myInfo, struct ncclPeerInfo* peerInfo, struct ncclConnect* connectInfo, struct ncclConnector* recv, int channelId) {struct collNetRecvResources* resources;NCCLCHECK(ncclCalloc(resources, 1));recv-transportResources resources;NCCLCHECK(ncclTopoGetNetDev(topo, myInfo-rank, graph, channelId, resources-netDev));NCCLCHECK(ncclTopoCheckGdr(topo, myInfo-busId, resources-netDev, 0, resources-useGdr));NCCLCHECK(ncclCudaHostCalloc(resources-hostSendMem, 1));resources-devHostSendMem resources-hostSendMem;int recvSize offsetof(struct ncclRecvMem, buff);for (int p0; pNCCL_NUM_PROTOCOLS; p) recvSize recv-comm-buffSizes[p];if (resources-useGdr) {NCCLCHECK(ncclCudaCalloc((char**)(resources-devRecvMem), recvSize));}NCCLCHECK(ncclCudaHostCalloc((char**)resources-hostRecvMem, recvSize));resources-devHostRecvMem resources-hostRecvMem;NCCLCHECK(ncclIbMalloc((void**)(resources-llData), recv-comm-buffSizes[NCCL_PROTO_LL]/2));struct collNetRecvConnectInfo* info (struct collNetRecvConnectInfo*) connectInfo;NCCLCHECK(collNetListen(resources-netDev, info-collNetHandle, resources-netListenComm));return ncclSuccess;
}
分配collNetRecvResources resources然后分配各个协议用到的显存以及用于同步的headtail等最后执行collNetListen。 这里的赋值有点复杂我们先看下resources的结构在执行完collNetListen之后会变成什么样。collNetRecvResources是nccl这里用到的最后netListenComm指向一个ncclSharpListenCommncclSharpListenComm中的listenCommP2P指向一个ncclIbListenCommncclIbListenComm中保存了使用的网卡和socket的fd。
struct collNetRecvResources {void* netListenComm; // ncclSharpListenComm...
};
struct ncclSharpListenComm {int dev;void *listenCommP2P; // ncclIbListenComm
};struct ncclIbListenComm {int dev;int fd;
};然后我们具体看下。
ncclResult_t ncclSharpListen(int dev, void* opaqueHandle, void** listenComm) {struct ncclSharpListenComm *lComm;ncclResult_t status;NCCLCHECK(ncclIbMalloc((void**)lComm, sizeof(struct ncclSharpListenComm)));status NCCL_PLUGIN_SYMBOL.listen(dev, opaqueHandle, lComm-listenCommP2P);lComm-dev dev;*listenComm lComm;return status;
}collNetListen执行的就是ncclSharpListen其实就是调用ib_plugin的listen函数这里可以看到collNetRecvResources的netListenComm被赋值成为了ncclSharpListenComm lComm。
ncclResult_t ncclIbListen(int dev, void* opaqueHandle, void** listenComm) {struct ncclIbListenComm* comm;comm malloc(sizeof(struct ncclIbListenComm));memset(comm, 0, sizeof(struct ncclIbListenComm));struct ncclIbHandle* handle (struct ncclIbHandle*) opaqueHandle;NCCL_STATIC_ASSERT(sizeof(struct ncclIbHandle) NCCL_NET_HANDLE_MAXSIZE, ncclIbHandle size too large);comm-dev dev;NCCLCHECK(GetSocketAddr((handle-connectAddr)));NCCLCHECK(createListenSocket(comm-fd, handle-connectAddr));*listenComm comm;return ncclSuccess;
}
ib_plugin的listen其实就是创建了一个listen socket然后将网卡号devsocket fd保存到ncclIbListenCommncclSharpListenComm的lComm被赋值成了这个ncclIbListenComm。ip和port保存到了opaqueHandle即myConnect中。 到这里setup就执行结束了继续看collNetSetup。
static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) {...// prepare connect handlesncclResult_t res;struct {int isMaster;ncclConnect connect;} *allConnects NULL;ncclConnect *masterConnects NULL;NCCLCHECK(ncclCalloc(masterConnects, nMasters));if (type 1) { // recv side: AllGather// all ranks must participateNCCLCHECK(ncclCalloc(allConnects, nranks));allConnects[rank].isMaster isMaster;memcpy((allConnects[rank].connect), myConnect, sizeof(struct ncclConnect));NCCLCHECKGOTO(bootstrapAllGather(comm-bootstrap, allConnects, sizeof(*allConnects)), res, cleanup);// consolidateint c 0;for (int r 0; r nranks; r) {if (allConnects[r].isMaster) {memcpy(masterConnectsc, (allConnects[r].connect), sizeof(struct ncclConnect));if (r rank) rankInCollNet c;c;}}} else { // send side : copy in connect info received from peer recv masterif (isMaster) memcpy(masterConnectsrankInCollNet, (sendrecvExchange.connect), sizeof(struct ncclConnect));}// connectif (isMaster ret 0) {NCCLCHECKGOTO(transportComm-connect(masterConnects, nMasters, rankInCollNet, conn), res, cleanup);struct ncclPeer* devRoot channel-devPeersnranks;struct ncclConnector* devConn (type 1) ? devRoot-recv : devRoot-send;CUDACHECKGOTO(cudaMemcpy(devConn, conn, sizeof(struct ncclConnector), cudaMemcpyHostToDevice), res, cleanup);}...
}
然后开始交换master的信息分配nMaster个ncclConnect *masterConnectsnMaster就是节点数然后将myConnect拷贝到masterConnects对应位置执行allgather拿到所有rank的ncclConnect然后将allConnects中所有的master对应的ncclConnect拷贝到masterConnects最后执行transportComm的connect完成sharp通信组的建立。
同理我们先看下在执行connect之后resources中数据结构会变成什么样子。collNetRecvComm指向一个ncclSharpCollCommncclSharpCollComm中的recvComm和sendComm的作用类似bootstrap网络中连接前后节点sharpCollContext为sharp contextsharpCollComm为sharp communicator。
struct collNetRecvResources {...void* collNetRecvComm; // ncclSharpCollComm...
};
struct ncclSharpCollComm {...void* recvComm; // ncclIbRecvCommvoid* sendComm; // ncclIbSendComm...struct sharp_coll_context* sharpCollContext;struct sharp_coll_comm* sharpCollComm;
};ncclResult_t collNetRecvConnect(struct ncclConnect* connectInfos, int nranks, int rank, struct ncclConnector* recv) {// Setup device pointersstruct collNetRecvResources* resources (struct collNetRecvResources*)recv-transportResources;struct collNetSendConnectInfo* info (struct collNetSendConnectInfo*)(connectInfosrank);resources-collNetRank rank;// Intermediate buffering on GPU for GPU Direct RDMAstruct ncclRecvMem* recvMem resources-useGdr ? resources-devRecvMem : resources-devHostRecvMem;int offset 0;for (int p0; pNCCL_NUM_PROTOCOLS; p) {recv-conn.buffs[p] (p NCCL_PROTO_LL ? resources-devHostRecvMem-buff : recvMem-buff) offset;offset recv-comm-buffSizes[p];}recv-conn.direct | resources-useGdr ? NCCL_DIRECT_NIC : 0;// Head/Tail/Opcount are always on hostrecv-conn.tail resources-devHostRecvMem-tail;recv-conn.head resources-devHostSendMem-head;// Connect to coll commcollNetHandle_t** handlePtrs NULL;NCCLCHECK(ncclCalloc(handlePtrs, nranks));for (int i 0; i nranks; i) {struct collNetRecvConnectInfo* info (struct collNetRecvConnectInfo*)(connectInfosi);handlePtrs[i] (info-collNetHandle);}ncclResult_t res;NCCLCHECKGOTO(collNetConnect((void**)handlePtrs, nranks, rank, resources-netListenComm, resources-collNetRecvComm), res, cleanup);// Register buffersNCCLCHECK(collNetRegMr(resources-collNetRecvComm, recv-conn.buffs[NCCL_PROTO_SIMPLE], recv-comm-buffSizes[NCCL_PROTO_SIMPLE],resources-useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, resources-mhandles[NCCL_PROTO_SIMPLE]));NCCLCHECK(collNetRegMr(resources-collNetRecvComm, resources-llData, recv-comm-buffSizes[NCCL_PROTO_LL]/2,NCCL_PTR_HOST, resources-mhandles[NCCL_PROTO_LL]));// Create shared info between send and recv proxiesNCCLCHECK(ncclCalloc((resources-reqFifo), NCCL_STEPS));// Pass info to send sideinfo-reqFifo resources-reqFifo;info-collNetComm resources-collNetRecvComm;for (int p0; pNCCL_NUM_PROTOCOLS; p)info-mhandles[p] resources-mhandles[p];cleanup:if (handlePtrs ! NULL) free(handlePtrs);// Close listen commNCCLCHECK(collNetCloseListen(resources-netListenComm));return res;
}
首先将resource中的headtailbuffer等记录到connhandlePtrs记录每个master rank的listen ip port然后执行collNetConnect建立sharp通信组。
ncclResult_t ncclSharpConnect(void* handles[], int nranks, int rank, void* listenComm, void** collComm) {struct ncclSharpListenComm* lComm (struct ncclSharpListenComm*)listenComm;struct ncclSharpCollComm* cComm;NCCLCHECK(ncclIbMalloc((void**)cComm, sizeof(struct ncclSharpCollComm)));NCCLCHECK(ncclIbMalloc((void**)cComm-reqs, sizeof(struct ncclSharpRequest)*MAX_REQUESTS));cComm-nranks nranks;cComm-rank rank;if (cComm-rank -1) {WARN(Could not determine my rank\n);return ncclInternalError;}int next (cComm-rank 1) % nranks;NCCLCHECK(NCCL_PLUGIN_SYMBOL.connect(lComm-dev, handles[next], cComm-sendComm));NCCLCHECK(NCCL_PLUGIN_SYMBOL.accept(lComm-listenCommP2P, cComm-recvComm)); // From prevstruct ncclSharpInfo* allInfo;pid_t pid getpid();pthread_t tid pthread_self();NCCLCHECK(ncclIbMalloc((void**)allInfo, sizeof(struct ncclSharpInfo)*nranks));allInfo[cComm-rank].hostId gethostid();allInfo[cComm-rank].jobId (((uint64_t)allInfo[cComm-rank].hostId 32) | ((pid ^ tid) ^ rand()));NCCLCHECK(ncclSharpAllGather(cComm, allInfo, sizeof(struct ncclSharpInfo)));// Find my local rank;int localRank 0;for (int i0; icComm-rank; i) {if (allInfo[cComm-rank].hostId allInfo[i].hostId) {localRank;} }uint64_t jobId allInfo[0].jobId;free(allInfo);...
}
创建ncclSharpCollComm cComm这个最后会赋值给collNetRecvComm。类似bootstrap网络这里会通过ib_plugin将所有的master rank首尾相连这里connect和accept的逻辑和之前的ncclNetIb完全一致这里不再赘述。然后创建ncclSharpInfo allInfo记录hostid随机一个jobId执行allgather。
ncclResult_t ncclSharpConnect(void* handles[], int nranks, int rank, void* listenComm, void** collComm) {...struct sharp_coll_init_spec init_spec {0};init_spec.progress_func NULL;init_spec.job_id jobId;init_spec.world_rank cComm-rank;init_spec.world_size nranks;init_spec.world_local_rank 0;init_spec.enable_thread_support 1;init_spec.group_channel_idx 0;init_spec.oob_colls.barrier ncclSharpOobBarrier;init_spec.oob_colls.bcast ncclSharpOobBcast;init_spec.oob_colls.gather ncclSharpOobGather;init_spec.oob_ctx cComm;init_spec.config sharp_coll_default_config;init_spec.config.user_progress_num_polls 10000000;char devName[MAXNAMESIZE];ncclNetProperties_t prop;ncclSharpGetProperties(lComm-dev, prop);snprintf(devName, MAXNAMESIZE, %s:%d, prop.name, prop.port);init_spec.config.ib_dev_list devName;int ret sharp_coll_init(init_spec, cComm-sharpCollContext);INFO(NCCL_INIT, Sharp rank %d/%d initialized on %s, cComm-rank, nranks, devName);if (ret 0) {WARN(NET/IB :SHARP coll init error: %s(%d)\n, sharp_coll_strerror(ret), ret);return ncclInternalError;}struct sharp_coll_comm_init_spec comm_spec;comm_spec.rank cComm-rank;comm_spec.size nranks;comm_spec.oob_ctx cComm;comm_spec.group_world_ranks NULL;ret sharp_coll_comm_init(cComm-sharpCollContext, comm_spec, cComm-sharpCollComm);if (ret 0) {WARN(SHARP group create failed: %s(%d)\n, sharp_coll_strerror(ret), ret);return ncclInternalError;}*collComm cComm;return ncclSuccess;
创建sharp_coll_init_spec init_spec用于初始化sharp通信上下文sharpCollContext初始化init_specjob_id设置为rank0的job_id设置ranksize等设置init_spec.oob_colls的oob_ctx为cComm设置oob_colls.barrier等oob_colls就类比于nccl的bootstrap网络设置使用哪张网卡然后执行sharp_coll_init这里会初始化sharp然后通过sharp_coll_comm_init用sharpCollContext初始化sharpCollComm到这就完成了sharp通信组的建立。
然后回到ncclSharpConnect之后开始注册内存会通过sharp_coll_reg_mr注册sharp内存注册还需要ibv_reg_mr注册rdma内存。最后申请reqFifo将reqFifo和collNetRecvComm记录到info之后会发送给send。
ncclResult_t collNetRecvConnect(struct ncclConnect* connectInfos, int nranks, int rank, struct ncclConnector* recv) {...// Register buffersNCCLCHECK(collNetRegMr(resources-collNetRecvComm, recv-conn.buffs[NCCL_PROTO_SIMPLE], recv-comm-buffSizes[NCCL_PROTO_SIMPLE],resources-useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST, resources-mhandles[NCCL_PROTO_SIMPLE]));NCCLCHECK(collNetRegMr(resources-collNetRecvComm, resources-llData, recv-comm-buffSizes[NCCL_PROTO_LL]/2,NCCL_PTR_HOST, resources-mhandles[NCCL_PROTO_LL]));// Create shared info between send and recv proxiesNCCLCHECK(ncclCalloc((resources-reqFifo), NCCL_STEPS));// Pass info to send sideinfo-reqFifo resources-reqFifo;info-collNetComm resources-collNetRecvComm;for (int p0; pNCCL_NUM_PROTOCOLS; p)info-mhandles[p] resources-mhandles[p];cleanup:if (handlePtrs ! NULL) free(handlePtrs);// Close listen commNCCLCHECK(collNetCloseListen(resources-netListenComm));return res;
}
然后回到collNetSetuprecv端会将info发送给send端即reqFifo的地址和collNetRecvComm。
static int collNetSetup(struct ncclComm* comm, struct ncclTopoGraph* collNetGraph, struct ncclChannel* channel, int rank, int nranks, int masterRank, int masterPeer, int nMasters, int type) {...// recv side sends connect info to send sideif (isMaster type 1) {sendrecvExchange.collNetRank rankInCollNet;memcpy(sendrecvExchange.connect, masterConnectsrankInCollNet, sizeof(struct ncclConnect));NCCLCHECKGOTO(bootstrapSend(comm-bootstrap, masterPeer, sendrecvExchange, sizeof(sendrecvExchange)), res, cleanup);}if (ret 0) {supported 1;}
cleanup:if (allConnects ! NULL) free(allConnects);if (masterConnects ! NULL) free(masterConnects);return supported;
}然后开始对send端执行collNetSetupsend端比较简单主要就是显存的分配和注册然后通过recv发送过来的info将info中的reqFifo和comm记录到collNetSendResources。
到这里通信的建链就完成了接下来我们看下api执行的过程
enqueue
如前所述当用户执行了一个allreduce api之后会执行enqueue的操作。
ncclResult_t ncclSaveKernel(struct ncclInfo* info) {struct ncclColl coll;struct ncclProxyArgs proxyArgs;memset(proxyArgs, 0, sizeof(struct ncclProxyArgs));NCCLCHECK(computeColl(info, coll, proxyArgs));info-comm-myParams-blockDim.x std::maxunsigned(info-comm-myParams-blockDim.x, info-nThreads);int nChannels info-coll ncclCollSendRecv ? 1 : coll.args.coll.nChannels;int nSubChannels (info-pattern ncclPatternCollTreeUp || info-pattern ncclPatternCollTreeDown) ? 2 : 1;for (int bid0; bidnChannels*nSubChannels; bid) {int channelId (info-coll ncclCollSendRecv) ? info-channelId :info-comm-myParams-gridDim.x % info-comm-nChannels;struct ncclChannel* channel info-comm-channelschannelId;proxyArgs.channel channel;// Adjust pattern for CollNet based on channel indexif (nSubChannels 2) {info-pattern (channelId info-comm-nChannels/nSubChannels) ? ncclPatternCollTreeUp : ncclPatternCollTreeDown;}NCCLCHECK(ncclProxySaveColl(proxyArgs, info-pattern, info-root, info-comm-nRanks));info-comm-myParams-gridDim.x;int opIndex channel-collFifoTail;struct ncclColl* c channel-collectivesopIndex;volatile uint8_t* activePtr (volatile uint8_t*)c-active;while (activePtr[0] ! 0) sched_yield();memcpy(c, coll, sizeof(struct ncclColl));if (info-coll ! ncclCollSendRecv) c-args.coll.bid bid % coll.args.coll.nChannels;c-active 1;opIndex (opIndex1)%NCCL_MAX_OPS;c-nextIndex opIndex;channel-collFifoTail opIndex;channel-collCount;}info-comm-opCount;return ncclSuccess;
}
通过computeColl将算法协议等信息记录到coll中当使用sharp的时候算法为NCCL_ALGO_COLLNETnChannels设置为搜索出来channel数的一半pattern设置为ncclPatternCollTreeUpinfo-nstepsPerLoop info- nchunksPerLoop 1。 然后将coll记录到所有channel上行channel的pattern为ncclPatternCollTreeUp下行channel pattern为ncclPatternCollTreeDown然后通过ncclProxySaveColl创建ncclProxyArgs。
kernel和proxy
__device__ void ncclAllReduceCollNetKernel(struct CollectiveArgs* args) {const int tid threadIdx.x;const int nthreads args-coll.nThreads-WARP_SIZE;const int bid args-coll.bid;const int nChannels args-coll.nChannels;struct ncclDevComm* comm args-comm;struct ncclChannel* channel comm-channelsblockIdx.x;const int stepSize comm-buffSizes[NCCL_PROTO_SIMPLE] / (sizeof(T)*NCCL_STEPS);int chunkSize args-coll.lastChunkSize;const ssize_t minChunkSize nthreads*8*sizeof(uint64_t) / sizeof(T);const ssize_t loopSize nChannels*chunkSize;const ssize_t size args-coll.count;if (loopSize size) {chunkSize DIVUP(size, nChannels*minChunkSize)*minChunkSize;}// Compute pointersconst T * __restrict__ thisInput (const T*)args-sendbuff;T * __restrict__ thisOutput (T*)args-recvbuff;if (blockIdx.x nChannels) { // first half of the channels do reducestruct ncclTree* tree channel-collTreeUp;ncclPrimitivesUNROLL, 1, 1, T, 1, 1, 0, FUNC prims(tid, nthreads, tree-down, tree-up, NULL, stepSize, channel, comm);for (ssize_t gridOffset 0; gridOffset size; gridOffset loopSize) {// Upssize_t offset gridOffset bid*chunkSize;int nelem min(chunkSize, size-offset);if (tree-up -1) {prims.recvReduceCopy(thisInputoffset, thisOutputoffset, nelem);} else if (tree-down[0] -1) {prims.send(thisInputoffset, nelem);} else {prims.recvReduceSend(thisInputoffset, nelem);}}}...
}先看上行过程nChannels之前有除以2所以小于nChannels的都是上行channel如果是sendEndIndex那么他的down为-1因此直接通过send将自己的数据发送给下一个rank的buffer如果是非sendEndIndex那么需要接收上一个rank发送过来的数据和自己userbuff里对应数据执行reduce然后发送给up的buffer。
然后看下proxy的send流程
ncclResult_t collNetSendProxy(struct ncclProxyArgs* args) {...if (args-state ncclProxyOpProgress) {int p args-protocol;int stepSize args-connector-comm-buffSizes[p] / NCCL_STEPS;char* localBuff args-connector-conn.buffs[p];void* sendMhandle resources-sendMhandles[p];void* recvMhandle resources-recvMhandles[p];args-idle 1;struct reqSlot* reqFifo resources-reqFifo;if (args-head args-end) {int buffSlot args-tail%NCCL_STEPS;if (args-tail args-end args-tail args-head NCCL_STEPS reqFifo[buffSlot].recvBuff ! NULL) {volatile int* sizesFifo resources-hostRecvMem-sizesFifo;volatile uint64_t* recvTail resources-hostRecvMem-tail;if (args-protocol NCCL_PROTO_LL) {} else if (args-tail *recvTail) {// Send through networkif (sizesFifo[buffSlot] ! -1) {int count sizesFifo[buffSlot]/ncclTypeSize(args-dtype);NCCLCHECK(collNetIallreduce(resources-collNetSendComm, localBuffbuffSlot*stepSize, (void*)(reqFifo[buffSlot].recvBuff), count, args-dtype, args-redOp, sendMhandle, recvMhandle, args-requestsbuffSlot));if (args-requests[buffSlot] ! NULL) {sizesFifo[buffSlot] -1; // Make sure size is reset to zero before we update the head.__sync_synchronize();args-tail args-sliceSteps;args-idle 0;} } } } if (args-head args-tail) {int done, size;int buffSlot args-head%NCCL_STEPS;NCCLCHECK(collNetTest((void*)(args-requests[buffSlot]), done, size));if (done) {TRACE(NCCL_NET, sendProxy [%d/%d] request %p done, size %d, args-head, buffSlot, args-requests[buffSlot], size);reqFifo[buffSlot].size size;// Make sure size is updated before we set recvBuff to NULL (from the view of recv proxy, concerning the flush)// (reordered store after store is possible on POWER, though not on x86)__sync_synchronize();reqFifo[buffSlot].recvBuff NULL; // Notify recvProxyargs-head args-sliceSteps;resources-hostSendMem-head args-head;args-idle 0;} } } if (args-head args-end) {resources-step args-end;args-idle 0;args-state ncclProxyOpNone;}
}collNetIallreduce就是将sendbuffrecvbuff和对应的mr填充到sharp_coll_reduce_spec然后执行sharp_coll_do_allreduce或sharp_coll_do_allreduce_nb等到执行完成之后reduce的结果就会填充到recvbuff。 这里sendbuff就是send conn中的buffrecvbuff就是recv conn的buffSendProxy不知道recv conn中buff哪块可用这里就是通过reqFifo完成send和recv之间的协调的所以可以看到这里判断能否发送数据的条件除了常规的判断队列有否有数据之外还判断对应reqFifo的recvBuff是否为NULL都满足条件才能发送。 完成发送后将tail加上sliceSteps如果head小于tail说明有已发送但未完成的allreduce那么通过sharp_coll_req_test判断对应的请求是否完成如果完成将head加上sliceSteps将对应的recvBuff设置为NULL以通知RecvProxy这个req已经完成了。
然后看下RecvProxy
ncclResult_t collNetRecvProxy(struct ncclProxyArgs* args) {if (args-head args-end) {if ((args-tail args-head NCCL_STEPS) (args-tail (resources-hostSendMem-head) NCCL_STEPS) (args-tail args-end)) {int buffSlot args-tail%NCCL_STEPS;char* recvBuff p NCCL_PROTO_LL ? (char*)resources-llData : localBuff;int recvStepSize p NCCL_PROTO_LL ? stepSize/2 : stepSize;reqFifo[buffSlot].recvBuff recvBuffbuffSlot*recvStepSize;TRACE(NCCL_NET, recvProxy [%d/%d] posted buffer %p, args-tail, buffSlot, reqFifo[buffSlot].recvBuff);args-tail args-sliceSteps;args-idle 0;} if (args-tail args-head) {int buffSlot args-head%NCCL_STEPS;if (reqFifo[buffSlot].recvBuff NULL) { // Buffer is cleared : coll is completeTRACE(NCCL_NET, recvProxy [%d/%d] done, size %d, args-head, buffSlot, reqFifo[buffSlot].size);args-head args-sliceSteps;if (args-protocol NCCL_PROTO_LL) { // ll} else if (args-protocol NCCL_PROTO_SIMPLE) {if (resources-useGdr) NCCLCHECK(collNetFlush(resources-collNetRecvComm, localBuffbuffSlot*stepSize, reqFifo[buffSlot].size, mhandle));resources-hostRecvMem-tail args-head;} args-idle 0;}}}
}这里可以看到只要队列有空间那么就下发对应的recvbuf到reqFifo如果tail大于head说明有未完成的请求那么判断对应的recvbuff是否为NULL如果为NULL说明已经完成那么将head加上sliceSteps然后执行collNetFlush保证数据落盘这里的flush和ncclNetIb一致都是读本地QPflush之后将resources-hostRecvMem-tai设置为head以通知kernel有新数据。
最后看下recv kernel
templateint UNROLL, class FUNC, typename T
__device__ void ncclAllReduceCollNetKernel(struct CollectiveArgs* args) {...if (blockIdx.x nChannels) { // second half of the channels do broadcaststruct ncclTree* tree channel-collTreeDn;ncclPrimitivesUNROLL, 1, 1, T, 1, 1, 0, FUNC prims(tid, nthreads, tree-up, tree-down, NULL, stepSize, channel, comm);for (ssize_t gridOffset 0; gridOffset size; gridOffset loopSize) {// Downssize_t offset gridOffset bid*chunkSize;int nelem min(chunkSize, size-offset);if (tree-up -1) {prims.send(thisOutputoffset, nelem);} else if (tree-down[0] -1) {prims.recv(thisOutputoffset, nelem);} else {prims.recvCopySend(thisOutputoffset, nelem);} } }
}如果是recvEndIndex那么只需要recv数据即可如果不是recvEndIndex那么通过recvCopySend将数据从up的buffe中接收copy到自己的user buffer对应位置并发送给down的buffer。