diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index 073c7101ac..e6dc1d0f76 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -13,8 +13,8 @@ #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" +#include #include -#include namespace adios2 { @@ -71,7 +71,7 @@ StepStatus BP4Reader::BeginStep(StepMode mode, const float timeoutSeconds) if (m_CurrentStep >= m_BP4Deserializer.m_MetadataSet.StepsCount) { - status = CheckForNewSteps(timeoutSeconds); + status = CheckForNewSteps(Seconds(timeoutSeconds)); } // This should be after getting new steps @@ -150,31 +150,31 @@ void BP4Reader::OpenFiles() { /* Do a collective wait for the file(s) to appear within timeout. Make sure every process comes to the same conclusion */ - const float timeoutSeconds = m_BP4Deserializer.m_Parameters.OpenTimeoutSecs; + const auto timeoutSeconds = + Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs); // set poll to 1/100 of timeout - uint64_t pollTime_ms = - static_cast((timeoutSeconds * 1000.0f) / 100); - if (pollTime_ms < 1000) + auto pollSeconds = timeoutSeconds / 100.0; + static const auto pollSecondsMin = Seconds(1.0); + if (pollSeconds < pollSecondsMin) { - pollTime_ms = 1000; // min 1 second polling time + pollSeconds = pollSecondsMin; } - if (pollTime_ms > 10000) + static const auto pollSecondsMax = Seconds(10.0); + if (pollSeconds > pollSecondsMax) { - pollTime_ms = 10000; // max 10 seconds polling time + pollSeconds = pollSecondsMin; } /* Poll */ - double waited = 0.0; - double startTime, endTime; size_t flag = 1; // 0 = OK, opened file, 1 = timeout, 2 = error std::string lasterrmsg; + auto timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds; if (m_BP4Deserializer.m_RankMPI == 0) { - while (waited <= timeoutSeconds) + do { - startTime = MPI_Wtime(); try { errno = 0; @@ -209,10 +209,8 @@ void BP4Reader::OpenFiles() } } - std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms)); - endTime = MPI_Wtime(); - waited += endTime - startTime; - } + std::this_thread::sleep_for(pollSeconds); + } while (std::chrono::steady_clock::now() < timeoutInstant); } flag = m_Comm.BroadcastValue(flag, 0); @@ -252,19 +250,16 @@ void BP4Reader::OpenFiles() flag = 1; // timeout if (m_BP4Deserializer.m_RankMPI == 0) { - while (waited <= timeoutSeconds) + do { - startTime = MPI_Wtime(); const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0); if (idxFileSize > 63) { flag = 0; // we have data break; } - std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms)); - endTime = MPI_Wtime(); - waited += endTime - startTime; - } + std::this_thread::sleep_for(pollSeconds); + } while (std::chrono::steady_clock::now() < timeoutInstant); } flag = m_Comm.BroadcastValue(flag, 0); @@ -273,7 +268,8 @@ void BP4Reader::OpenFiles() throw std::runtime_error("ERROR: File " + m_Name + " was found but has not contained data within " "the specified timeout of " + - std::to_string(timeoutSeconds) + " seconds."); + std::to_string(timeoutSeconds.count()) + + " seconds."); } } @@ -443,38 +439,39 @@ bool BP4Reader::CheckWriterActive() return m_BP4Deserializer.m_WriterIsActive; } -StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds) +StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds) { /* Do a collective wait for a step within timeout. Make sure every reader comes to the same conclusion */ StepStatus retval = StepStatus::OK; bool haveNewStep = false; - if (timeoutSeconds < 0.0) + const bool haveTimeout = timeoutSeconds >= Seconds::zero(); + std::chrono::time_point< + std::chrono::steady_clock, + std::chrono::duration> + timeoutInstant; + if (haveTimeout) { - timeoutSeconds = std::numeric_limits::max(); + timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds; } - float pollSecs = - m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs; - if (pollSecs > timeoutSeconds) + auto pollSeconds = + Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs); + if (haveTimeout && pollSeconds > timeoutSeconds) { - pollSecs = timeoutSeconds; + pollSeconds = timeoutSeconds; } - uint64_t pollTime_ms = static_cast(pollSecs * 1000.f); /* Poll */ - double waited = 0.0; - double startTime, endTime; // Hack: processing metadata for multiple new steps only works // when pretending not to be in streaming mode const bool saveReadStreaming = m_IO.m_ReadStreaming; m_IO.m_ReadStreaming = false; - while (waited < timeoutSeconds && m_BP4Deserializer.m_WriterIsActive) + while (m_BP4Deserializer.m_WriterIsActive) { - startTime = MPI_Wtime(); size_t newIdxSize = UpdateBuffer(); if (newIdxSize > 0) { @@ -487,9 +484,11 @@ StepStatus BP4Reader::CheckForNewSteps(float timeoutSeconds) { break; } - std::this_thread::sleep_for(std::chrono::milliseconds(pollTime_ms)); - endTime = MPI_Wtime(); - waited += endTime - startTime; + std::this_thread::sleep_for(pollSeconds); + if (haveTimeout && std::chrono::steady_clock::now() >= timeoutInstant) + { + break; + } } if (!haveNewStep) diff --git a/source/adios2/engine/bp4/BP4Reader.h b/source/adios2/engine/bp4/BP4Reader.h index 7b21ea5cb1..5cc5474c66 100644 --- a/source/adios2/engine/bp4/BP4Reader.h +++ b/source/adios2/engine/bp4/BP4Reader.h @@ -17,6 +17,8 @@ #include "adios2/toolkit/format/bp/bp4/BP4Deserializer.h" #include "adios2/toolkit/transportman/TransportMan.h" +#include + namespace adios2 { namespace core @@ -50,6 +52,8 @@ class BP4Reader : public Engine void PerformGets() final; private: + typedef std::chrono::duration Seconds; + format::BP4Deserializer m_BP4Deserializer; /* transport manager for metadata file */ transportman::TransportMan m_MDFileManager; @@ -93,7 +97,7 @@ class BP4Reader : public Engine * Used by BeginStep() to get new steps from file when it reaches the * end of steps in memory. */ - StepStatus CheckForNewSteps(float timeoutSeconds); + StepStatus CheckForNewSteps(Seconds timeoutSeconds); #define declare_type(T) \ void DoGetSync(Variable &, T *) final; \