Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bp4: Replace MPI_Wtime with std::chrono for poll timeouts #1761

Merged
merged 1 commit into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 38 additions & 39 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"

#include <chrono>
#include <errno.h>
#include <limits>

namespace adios2
{
Expand Down Expand Up @@ -71,7 +71,7 @@ StepStatus BP4Reader::BeginStep(StepMode mode, const float timeoutSeconds)

if (m_CurrentStep >= m_BP4Deserializer.m_MetadataSet.StepsCount)
{
status = CheckForNewSteps(timeoutSeconds);
status = CheckForNewSteps(Seconds(timeoutSeconds));
}

// This should be after getting new steps
Expand Down Expand Up @@ -150,31 +150,31 @@ void BP4Reader::OpenFiles()
{
/* Do a collective wait for the file(s) to appear within timeout.
Make sure every process comes to the same conclusion */
const float timeoutSeconds = m_BP4Deserializer.m_Parameters.OpenTimeoutSecs;
const auto timeoutSeconds =
Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);

// set poll to 1/100 of timeout
uint64_t pollTime_ms =
static_cast<uint64_t>((timeoutSeconds * 1000.0f) / 100);
if (pollTime_ms < 1000)
auto pollSeconds = timeoutSeconds / 100.0;
static const auto pollSecondsMin = Seconds(1.0);
if (pollSeconds < pollSecondsMin)
{
pollTime_ms = 1000; // min 1 second polling time
pollSeconds = pollSecondsMin;
}
if (pollTime_ms > 10000)
static const auto pollSecondsMax = Seconds(10.0);
if (pollSeconds > pollSecondsMax)
{
pollTime_ms = 10000; // max 10 seconds polling time
pollSeconds = pollSecondsMin;
}

/* Poll */
double waited = 0.0;
double startTime, endTime;
size_t flag = 1; // 0 = OK, opened file, 1 = timeout, 2 = error
std::string lasterrmsg;
auto timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds;

if (m_BP4Deserializer.m_RankMPI == 0)
{
while (waited <= timeoutSeconds)
do
{
startTime = MPI_Wtime();
try
{
errno = 0;
Expand Down Expand Up @@ -209,10 +209,8 @@ void BP4Reader::OpenFiles()
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
}
std::this_thread::sleep_for(pollSeconds);
} while (std::chrono::steady_clock::now() < timeoutInstant);
}

flag = m_Comm.BroadcastValue(flag, 0);
Expand Down Expand Up @@ -252,19 +250,16 @@ void BP4Reader::OpenFiles()
flag = 1; // timeout
if (m_BP4Deserializer.m_RankMPI == 0)
{
while (waited <= timeoutSeconds)
do
{
startTime = MPI_Wtime();
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > 63)
{
flag = 0; // we have data
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
}
std::this_thread::sleep_for(pollSeconds);
} while (std::chrono::steady_clock::now() < timeoutInstant);
}

flag = m_Comm.BroadcastValue(flag, 0);
Expand All @@ -273,7 +268,8 @@ void BP4Reader::OpenFiles()
throw std::runtime_error("ERROR: File " + m_Name +
" was found but has not contained data within "
"the specified timeout of " +
std::to_string(timeoutSeconds) + " seconds.");
std::to_string(timeoutSeconds.count()) +
" seconds.");
}
}

Expand Down Expand Up @@ -443,38 +439,39 @@ bool BP4Reader::CheckWriterActive()
return m_BP4Deserializer.m_WriterIsActive;
}

StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds)
StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
{
/* Do a collective wait for a step within timeout.
Make sure every reader comes to the same conclusion */
StepStatus retval = StepStatus::OK;
bool haveNewStep = false;

if (timeoutSeconds < 0.0)
const bool haveTimeout = timeoutSeconds >= Seconds::zero();
std::chrono::time_point<
std::chrono::steady_clock,
std::chrono::duration<double, std::chrono::steady_clock::period>>
timeoutInstant;
if (haveTimeout)
{
timeoutSeconds = std::numeric_limits<float>::max();
timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds;
}

float pollSecs =
m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs;
if (pollSecs > timeoutSeconds)
auto pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
if (haveTimeout && pollSeconds > timeoutSeconds)
{
pollSecs = timeoutSeconds;
pollSeconds = timeoutSeconds;
}
uint64_t pollTime_ms = static_cast<uint64_t>(pollSecs * 1000.f);

/* Poll */
double waited = 0.0;
double startTime, endTime;

// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;

m_IO.m_ReadStreaming = false;
while (waited < timeoutSeconds && m_BP4Deserializer.m_WriterIsActive)
while (m_BP4Deserializer.m_WriterIsActive)
{
startTime = MPI_Wtime();
size_t newIdxSize = UpdateBuffer();
if (newIdxSize > 0)
{
Expand All @@ -487,9 +484,11 @@ StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms));
endTime = MPI_Wtime();
waited += endTime - startTime;
std::this_thread::sleep_for(pollSeconds);
if (haveTimeout && std::chrono::steady_clock::now() >= timeoutInstant)
{
break;
}
}

if (!haveNewStep)
Expand Down
6 changes: 5 additions & 1 deletion source/adios2/engine/bp4/BP4Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "adios2/toolkit/format/bp/bp4/BP4Deserializer.h"
#include "adios2/toolkit/transportman/TransportMan.h"

#include <chrono>

namespace adios2
{
namespace core
Expand Down Expand Up @@ -50,6 +52,8 @@ class BP4Reader : public Engine
void PerformGets() final;

private:
typedef std::chrono::duration<double> Seconds;

format::BP4Deserializer m_BP4Deserializer;
/* transport manager for metadata file */
transportman::TransportMan m_MDFileManager;
Expand Down Expand Up @@ -93,7 +97,7 @@ class BP4Reader : public Engine
* Used by BeginStep() to get new steps from file when it reaches the
* end of steps in memory.
*/
StepStatus CheckForNewSteps(float timeoutSeconds);
StepStatus CheckForNewSteps(Seconds timeoutSeconds);

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down