Skip to content

Commit

Permalink
Merge pull request #1662 from eisenhauer/KillWriterExtension
Browse files Browse the repository at this point in the history
Fixes to stabilize KillWriter tests
  • Loading branch information
eisenhauer authored Aug 7, 2019
2 parents 78e068c + 7fdf9ba commit 2bc027a
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 13 deletions.
6 changes: 5 additions & 1 deletion source/adios2/engine/sst/SstReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ void SstReader::ReadVariableBlocks(Variable<T> &variable)
// wait for all SstRead requests to finish
for (const auto &i : sstReadHandlers)
{
SstWaitForCompletion(m_Input, i);
if (SstWaitForCompletion(m_Input, i) != SstSuccess)
{
throw std::runtime_error(
"ERROR: Writer failed before returning data");
}
}

size_t iter = 0;
Expand Down
48 changes: 41 additions & 7 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1178,8 +1178,8 @@ SstStream CP_newStream()

static void DP_verbose(SstStream Stream, char *Format, ...);
static CManager CP_getCManager(SstStream Stream);
static void CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank,
CMFormat Format, void *data);
static int CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank,
CMFormat Format, void *data);
static MPI_Comm CP_getMPIComm(SstStream Stream);

struct _CP_Services Svcs = {
Expand Down Expand Up @@ -1336,8 +1336,12 @@ static CManager CP_getCManager(SstStream Stream) { return Stream->CPInfo->cm; }

static MPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; }

static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank,
CMFormat Format, void *Data)
extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn,
void *client_data);
extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,
void *client_data);
static int CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank,
CMFormat Format, void *Data)
{
CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort;
if (Peers[Rank].CMconn == NULL)
Expand All @@ -1348,14 +1352,44 @@ static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank,
CP_error(s,
"Connection failed in CP_sendToPeer! Contact list was:\n");
CP_error(s, attr_list_to_string(Peers[Rank].ContactList));
return;
return 0;
}
if (s->Role == ReaderRole)
{
CP_verbose(
s,
"Registering reader close handler for peer %d CONNECTION %p\n",
Rank, Peers[Rank].CMconn);
CMconn_register_close_handler(Peers[Rank].CMconn,
ReaderConnCloseHandler, (void *)s);
}
else
{
for (int i = 0; i < s->ReaderCount; i++)
{
if (Peers == s->Readers[i]->Connections)
{
CP_verbose(s,
"Registering writer close handler for peer %d, "
"CONNECTION %p\n",
Rank, Peers[Rank].CMconn);
CMconn_register_close_handler(Peers[Rank].CMconn,
WriterConnCloseHandler,
(void *)s->Readers[i]);
break;
}
}
}
}
if (CMwrite(Peers[Rank].CMconn, Format, Data) != 1)
{
CP_verbose(s, "Message failed to send to peer %d in CP_sendToPeer()\n",
Rank);
CP_verbose(s,
"Message failed to send to peer %d CONNECTION %p in "
"CP_sendToPeer()\n",
Rank, Peers[Rank].CMconn);
return 0;
}
return 1;
}

CPNetworkInfoFunc globalNetinfoCallback = NULL;
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ static char *readContactInfo(const char *Name, SstStream Stream, int Timeout)
}
}

static void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,
extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,
void *client_data)
{
TAU_START_FUNC();
SstStream Stream = (SstStream)client_data;
int FailedPeerRank = -1;
CP_verbose(Stream, "Reader-side close handler invoked\n");
for (int i = 0; i < Stream->WriterCohortSize; i++)
{
if (Stream->ConnectionsToWriter[i].CMconn == ClosedConn)
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/sst/cp/cp_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ static void QueueMaintenance(SstStream Stream)
QueueMaintenance
UNLOCK
*/
static void WriterConnCloseHandler(CManager cm, CMConnection closed_conn,
extern void WriterConnCloseHandler(CManager cm, CMConnection closed_conn,
void *client_data)
{
TAU_START_FUNC();
Expand Down
8 changes: 6 additions & 2 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,8 +821,12 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
ReadRequestMsg.RS_Stream = Stream;
ReadRequestMsg.RequestingRank = Stream->Rank;
ReadRequestMsg.NotifyCondition = ret->CMcondition;
Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank,
Stream->ReadRequestFormat, &ReadRequestMsg);
if (!Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank,
Stream->ReadRequestFormat, &ReadRequestMsg))
{
ret->Failed = 1;
CMCondition_signal(cm, ret->CMcondition);
}

return ret;
}
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/engine/staging-common/TestCommonClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ TEST_F(SstReadTest, ADIOS2SstRead)
catch (...)
{
std::cout << "Exception in EndStep, client failed";
WriterFailed = 1;
std::cout << "Noticed Writer failure" << std::endl;
Continue = false;
}

++ExpectedStep;
Expand Down
9 changes: 8 additions & 1 deletion testing/adios2/engine/staging-common/run_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,27 @@ def do_one_client_test(writer_cmd, reader_cmd):

def do_kill_writer_test(writer_cmd, reader_cmd, interval):
return_code = 0
print("TestDriver: Starting kill_writer test protocol")
sys.stdout.flush()
writer = subprocess.Popen(writer_cmd)
reader = subprocess.Popen(reader_cmd)
print("TestDriver: Waiting " + str(interval) + " seconds")
sys.stdout.flush()
time.sleep(interval)
print("TestDriver: Killing Writer")
sys.stdout.flush()
writer.terminate()
writer.wait()
reader.wait()
print("TestDriver: Reader exit status was " + str(reader.returncode))
sys.stdout.flush()
if reader.returncode != 0:
print("TestDriver: Reader failed, causing test failure")
sys.stdout.flush()
return_code = 1
print("TestDriver: Writer exit status was " +
str(writer.returncode)) + " (ignored)"
str(writer.returncode) + " (ignored)")
sys.stdout.flush()
return return_code


Expand Down

0 comments on commit 2bc027a

Please sign in to comment.