Skip to content

Commit

Permalink
bp4: Replace MPI_Wtime with std::chrono for poll timeouts
Browse files Browse the repository at this point in the history
This ensures it can work in serial programs and also improves type
safety.
  • Loading branch information
bradking committed Sep 18, 2019
1 parent 15e5daf commit 03a161a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 40 deletions.
77 changes: 38 additions & 39 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"

#include <chrono>
#include <errno.h>
#include <limits>

namespace adios2
{
Expand Down Expand Up @@ -71,7 +71,7 @@ StepStatus BP4Reader::BeginStep(StepMode mode, const float timeoutSeconds)

if (m_CurrentStep >= m_BP4Deserializer.m_MetadataSet.StepsCount)
{
status = CheckForNewSteps(timeoutSeconds);
status = CheckForNewSteps(Seconds(timeoutSeconds));
}

// This should be after getting new steps
Expand Down Expand Up @@ -150,31 +150,31 @@ void BP4Reader::OpenFiles()
{
/* Do a collective wait for the file(s) to appear within timeout.
Make sure every process comes to the same conclusion */
const float timeoutSeconds = m_BP4Deserializer.m_Parameters.OpenTimeoutSecs;
const auto timeoutSeconds =
Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);

// set poll to 1/100 of timeout
uint64_t pollTime_ms =
static_cast<uint64_t>((timeoutSeconds * 1000.0f) / 100);
if (pollTime_ms < 1000)
auto pollSeconds = timeoutSeconds / 100.0;
static const auto pollSecondsMin = Seconds(1.0);
if (pollSeconds < pollSecondsMin)
{
pollTime_ms = 1000; // min 1 second polling time
pollSeconds = pollSecondsMin;
}
if (pollTime_ms > 10000)
static const auto pollSecondsMax = Seconds(10.0);
if (pollSeconds > pollSecondsMax)
{
pollTime_ms = 10000; // max 10 seconds polling time
pollSeconds = pollSecondsMin;
}

/* Poll */
double waited = 0.0;
double startTime, endTime;
size_t flag = 1; // 0 = OK, opened file, 1 = timeout, 2 = error
std::string lasterrmsg;
auto timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds;

if (m_BP4Deserializer.m_RankMPI == 0)
{
while (waited <= timeoutSeconds)
do
{
startTime = MPI_Wtime();
try
{
errno = 0;
Expand Down Expand Up @@ -209,10 +209,8 @@ void BP4Reader::OpenFiles()
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
}
std::this_thread::sleep_for(pollSeconds);
} while (std::chrono::steady_clock::now() < timeoutInstant);
}

flag = m_Comm.BroadcastValue(flag, 0);
Expand Down Expand Up @@ -252,19 +250,16 @@ void BP4Reader::OpenFiles()
flag = 1; // timeout
if (m_BP4Deserializer.m_RankMPI == 0)
{
while (waited <= timeoutSeconds)
do
{
startTime = MPI_Wtime();
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > 63)
{
flag = 0; // we have data
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
}
std::this_thread::sleep_for(pollSeconds);
} while (std::chrono::steady_clock::now() < timeoutInstant);
}

flag = m_Comm.BroadcastValue(flag, 0);
Expand All @@ -273,7 +268,8 @@ void BP4Reader::OpenFiles()
throw std::runtime_error("ERROR: File " + m_Name +
" was found but has not contained data within "
"the specified timeout of " +
std::to_string(timeoutSeconds) + " seconds.");
std::to_string(timeoutSeconds.count()) +
" seconds.");
}
}

Expand Down Expand Up @@ -443,38 +439,39 @@ bool BP4Reader::CheckWriterActive()
return m_BP4Deserializer.m_WriterIsActive;
}

StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds)
StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
{
/* Do a collective wait for a step within timeout.
Make sure every reader comes to the same conclusion */
StepStatus retval = StepStatus::OK;
bool haveNewStep = false;

if (timeoutSeconds < 0.0)
const bool haveTimeout = timeoutSeconds >= Seconds::zero();
std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
timeoutInstant;
if (haveTimeout)
{
timeoutSeconds = std::numeric_limits<float>::max();
timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds;
}

float pollSecs =
m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs;
if (pollSecs > timeoutSeconds)
auto pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
if (haveTimeout && pollSeconds > timeoutSeconds)
{
pollSecs = timeoutSeconds;
pollSeconds = timeoutSeconds;
}
uint64_t pollTime_ms = static_cast<uint64_t>(pollSecs * 1000.f);

/* Poll */
double waited = 0.0;
double startTime, endTime;

// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;

m_IO.m_ReadStreaming = false;
while (waited < timeoutSeconds && m_BP4Deserializer.m_WriterIsActive)
while (m_BP4Deserializer.m_WriterIsActive)
{
startTime = MPI_Wtime();
size_t newIdxSize = UpdateBuffer();
if (newIdxSize > 0)
{
Expand All @@ -487,9 +484,11 @@ StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
std::this_thread::sleep_for(pollSeconds);
if (haveTimeout && std::chrono::steady_clock::now() >= timeoutInstant)
{
break;
}
}

if (!haveNewStep)
Expand Down
6 changes: 5 additions & 1 deletion source/adios2/engine/bp4/BP4Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "adios2/toolkit/format/bp/bp4/BP4Deserializer.h"
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>

namespace adios2
{
namespace core
Expand Down Expand Up @@ -50,6 +52,8 @@ class BP4Reader : public Engine
void PerformGets() final;

private:
typedef std::chrono::duration<double> Seconds;

format::BP4Deserializer m_BP4Deserializer;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
Expand Down Expand Up @@ -93,7 +97,7 @@ class BP4Reader : public Engine
* Used by BeginStep() to get new steps from file when it reaches the
* end of steps in memory.
*/
StepStatus CheckForNewSteps(float timeoutSeconds);
StepStatus CheckForNewSteps(Seconds timeoutSeconds);

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down

0 comments on commit 03a161a

Please sign in to comment.