Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New AlltoAllV (Imbalanced AlltoAll) benchmark. #157

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename + docs
babusid committed Jun 27, 2023
commit f94647f20987ffe5316cfc4980d6900c9a3149f7
96 changes: 83 additions & 13 deletions src/alltoallv.cu
Original file line number Diff line number Diff line change
@@ -1,32 +1,85 @@
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include "cuda_runtime.h"
#include "common.h"

int CHECK = 0;

/**
* @brief Parses the parameter file and stores the matrix data into the imbalancingFactors reference passed in.
* @param nranks The number of ranks in the test
* @param imbalancingFactors The reference to the vector that will store the parsed data
* @param filename The name of the parameter file to parse
**/
testResult_t parseParamFile(int nranks, std::vector<std::vector<double>> &imbalancingFactors, char filename[64]){
std::vector<std::vector<double>> paramFile_data;
std::ifstream paramFile(filename);

if (!paramFile.is_open()) {
PRINT("\nUNABLE TO OPEN PARAMS FILE\n");
return testInternalError;
}

std::string row;
int rowidx = 0;
while(std::getline(paramFile,row)){ //iterate over every row
std::vector<double> values; //values from this line
std::stringstream rowstream(row);
std::string value;
while(std::getline(rowstream,value,',')){ //go over the row and get each value
double dval = std::stod(value);
if(dval<0 || dval>1) {
PRINT("\nINVALID PARAMS FILE, PARAMETER OUT OF 0:1 RANGE, ROW NUMBER: %i \n", rowidx);
return testInternalError;
} //ensure that the value is between 0 and 1 (necessary for probability distribution)
values.push_back(dval);
}
if(values.size()!=nranks) {
PRINT("\nINVALID PARAMS FILE, ROW %i DOES NOT HAVE CORRECT NUMBER OF VALUES, HAS %lu ENTRIES, NEEDS %i ENTRIES\n", rowidx, values.size(), nranks);
return testInternalError;
}//ensure that this row has the right amount of values
paramFile_data.push_back(values);
rowidx++;
}

if(paramFile_data.size()!=nranks) {
PRINT("\nINVALID PARAMS FILE, DOES NOT HAVE CORRECT NUMBER OF ROWS\n");
return testInternalError;
} //ensure we have the right amount of rows

imbalancingFactors = paramFile_data; //store the data in the return variable
return testSuccess;
}
void AlltoAllvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
*sendcount = (count/nranks)*nranks; //each rank in a2av should be able to send up to count to all of the others combined.
*recvcount = (count/nranks)*nranks; //each rank in a2av should be able to receive up to count from all of its peers.
*sendcount = (count/nranks)*nranks; //Total send count rounded to a multiple of ranks
*recvcount = (count/nranks)*nranks; //Total recv count rounded to a multiple of ranks
*sendInplaceOffset = 0;
*recvInplaceOffset = 0;
*paramcount = count/nranks; //each rank in a2av gets one even chunk to send out.
*paramcount = (count/nranks);
}

testResult_t AlltoAllvInitData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t op, int root, int rep, int in_place) {

size_t sendcount = args->sendBytes / wordSize(type);
size_t recvcount = args->expectedBytes / wordSize(type);
int nranks = args->nProcs*args->nThreads*args->nGpus;

//parse the param file
std::vector<std::vector<double>> imbalancingFactors;
testResult_t parseSuccess = parseParamFile(nranks, imbalancingFactors, args->param_file);
CHECK = 1;
if(parseSuccess != testSuccess) return parseSuccess;
for (int i=0; i<args->nGpus; i++) {
CUDACHECK(cudaSetDevice(args->gpus[i]));
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes)); //zeroes out the receive buffer of each GPU with total size (recvcount*wordSize(type))
CUDACHECK(cudaMemcpy(args->expected[i], args->recvbuffs[i], args->expectedBytes, cudaMemcpyDefault)); //copies the zeroed out receive buffer to the expected buffer
int rank = ((args->proc*args->nThreads + args->thread)*args->nGpus + i); //current rank
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, 0, type, ncclSum, 33*rep + rank, 1, 0)); //initializes the sendbuffer data for this rank
for (int j=0; j<nranks; j++) {
//j == peer rank
for (int j=0; j<nranks; j++) {
size_t partcount = sendcount/nranks; //create chunk definition to use in offsetting the data initialization
size_t partcount_mod = (partcount - j - rank - 1) % partcount; //imbalance the count of data to initialize same way we do in the test
size_t partcount_mod = partcount * imbalancingFactors[j][rank]; //imbalance the count of data to initialize same way we do in the test

TESTCHECK(InitData((char*)args->expected[i] + j*partcount*wordSize(type), partcount_mod, rank*partcount, type, ncclSum, 33*rep + j, 1, 0));
}
CUDACHECK(cudaDeviceSynchronize());
@@ -48,19 +101,36 @@ testResult_t AlltoAllvRunColl(void* sendbuff, void* recvbuff, size_t count, nccl
int nRanks, myRank;
NCCLCHECK(ncclCommCount(comm, &nRanks));
NCCLCHECK(ncclCommUserRank(comm, &myRank));
std::vector<std::vector<double>> imbalancingFactors;

// Since this function is only ever called from the startColl function, this builtin call will return the address of the startColl function's stack frame.
// The beginning of that stack frame will be the threadargs struct, which contains the param filename.
struct threadArgs* args = (struct threadArgs*) (__builtin_frame_address(1));
testResult_t parseSuccess = parseParamFile(nRanks, imbalancingFactors, args->param_file); //parse the param file
if(parseSuccess != testSuccess) return parseSuccess;
size_t rankOffset = count * wordSize(type);

// Get the base address of the previous stack frame.
// Since this function is only ever called from the startColl function, this will be the address of the startColl function's stack frame.
// The beginning of that stack frame will be the threadargs struct.
#if NCCL_MAJOR < 2 || NCCL_MINOR < 7
printf("NCCL 2.7 or later is needed for alltoallv. This test was compiled with %d.%d.\n", NCCL_MAJOR, NCCL_MINOR);
return testNcclError;
#else
NCCLCHECK(ncclGroupStart());


for (int r=0; r<nRanks; r++) {
int count_mod = (count-myRank-r-1) % count; //modify the count variable to to be strictly less than count, but depend on both the peer rank and the sending rank
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, count_mod, type, r, comm, stream));
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, count_mod, type, r, comm, stream));
// int count_mod = (count-myRank-r-1) % count; //modify the count variable to to be strictly less than count, but depend on both the peer rank and the sending rank
if(myRank>imbalancingFactors.size()){
PRINT("\nmyRank is greater than imbalancingFactors.size(), %i\n", myRank);
return testInternalError;
} else if (r > imbalancingFactors[myRank].size()) {
PRINT("\nr is greater than imbalancingFactors[myRank].size(), %i\n", r);
return testInternalError;
}
unsigned long send_count_mod = count * imbalancingFactors[myRank][r];
unsigned long recv_count_mod = count * imbalancingFactors[r][myRank];
NCCLCHECK(ncclSend(((char*)sendbuff)+r*rankOffset, send_count_mod, type, r, comm, stream));
NCCLCHECK(ncclRecv(((char*)recvbuff)+r*rankOffset, recv_count_mod, type, r, comm, stream));
}


173 changes: 0 additions & 173 deletions src/alltoallv2.cu

This file was deleted.