Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPI DataPlane: do not call MPI_Init internally #3847

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 20 additions & 46 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
#define QUOTE(name) #name
#define MACRO_TO_STR(name) QUOTE(name)

static pthread_once_t OnceMpiInitializer = PTHREAD_ONCE_INIT;

/*****Stream Basic Structures ***********************************************/

typedef struct _MpiReaderContactInfo
Expand Down Expand Up @@ -235,40 +233,6 @@ static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, voi
static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data,
attr_list attrs);

/**
* Initialize MPI in the mode that it is required for MPI_DP to work.
*
* It can be called multiple times.
*/
static void MpiInitialize()
{
int IsInitialized = 0;
int provided;

MPI_Initialized(&IsInitialized);
if (!IsInitialized)
{
MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &provided);
}
else
{
MPI_Query_thread(&provided);
}

if (provided != MPI_THREAD_MULTIPLE)
{
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (!rank)
{
fprintf(stderr,
"MPI init without MPI_THREAD_MULTIPLE (Externally "
"initialized:%s)\n",
IsInitialized ? "true" : "false");
}
}
}

/*****Public accessible functions********************************************/

/**
Expand All @@ -286,8 +250,6 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **Read
struct _SstParams *Params, attr_list WriterContact,
SstStats Stats)
{
pthread_once(&OnceMpiInitializer, MpiInitialize);

MpiStreamRD Stream = calloc(sizeof(struct _MpiStreamRD), 1);
CManager cm = Svcs->getCManager(CP_Stream);
SMPI_Comm comm = Svcs->getMPIComm(CP_Stream);
Expand Down Expand Up @@ -327,8 +289,6 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **Read
static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params,
attr_list DPAttrs, SstStats Stats)
{
pthread_once(&OnceMpiInitializer, MpiInitialize);

MpiStreamWR Stream = calloc(sizeof(struct _MpiStreamWR), 1);
CManager cm = Svcs->getCManager(CP_Stream);
SMPI_Comm comm = Svcs->getMPIComm(CP_Stream);
Expand Down Expand Up @@ -798,16 +758,30 @@ static void MpiReleaseTimeStep(CP_Services Svcs, DP_WS_Stream Stream_v, long Tim
*/
static int MpiGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params)
{
#if defined(MPICH)
// Only enabled when MPI_THREAD_MULTIPLE and using MPICH
int IsInitialized = 0;
int provided = 0;
pthread_once(&OnceMpiInitializer, MpiInitialize);
MPI_Query_thread(&provided);
if (provided == MPI_THREAD_MULTIPLE)
int IsMPICH = 0;
#if defined(MPICH)
IsMPICH = 1;

MPI_Initialized(&IsInitialized);
if (IsInitialized)
{
return 100;
MPI_Query_thread(&provided);
// Only enabled when MPI_THREAD_MULTIPLE and using MPICH
if (provided == MPI_THREAD_MULTIPLE)
{
return 100;
}
}
#endif

Svcs->verbose(CP_Stream, DPTraceVerbose,
"MPI DP disabled since the following predicate is false: "
"(MPICH=%s AND MPI_initialized=%s AND MPI_THREAD_MULTIPLE=%s)",
IsMPICH ? "true" : "false", IsInitialized ? "true" : "false",
provided == MPI_THREAD_MULTIPLE ? "true" : "false");

return -100;
}

Expand Down
Loading