diff --git a/docs/user_guide/source/engines/bp4.rst b/docs/user_guide/source/engines/bp4.rst index e43a51a714..7f1b1fe1f8 100644 --- a/docs/user_guide/source/engines/bp4.rst +++ b/docs/user_guide/source/engines/bp4.rst @@ -76,7 +76,7 @@ This engine allows the user to fine tune the buffering operations through the fo 16. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc). - +17. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open(). ============================== ===================== =========================================================== **Key** **Value Format** **Default** and Examples @@ -98,6 +98,7 @@ This engine allows the user to fine tune the buffering operations through the fo BurstBufferPath string **""**, /mnt/bb/norbert, /ssd BurstBufferDrain string On/Off **On**, Off BurstBufferVerbose integer, 0-2 **0**, ``1``, ``2`` + StreamReader string On/Off On, **Off** ============================== ===================== =========================================================== diff --git a/docs/user_guide/source/engines/virtual_engines.rst b/docs/user_guide/source/engines/virtual_engines.rst index e532c62a25..4aa2a11c30 100644 --- a/docs/user_guide/source/engines/virtual_engines.rst +++ b/docs/user_guide/source/engines/virtual_engines.rst @@ -55,6 +55,7 @@ These are the actual settings in ADIOS when a virtual engine is selected. The pa ============================== ===================== =========================================================== OpenTimeoutSecs float **3600** (wait for up to an hour) BeginStepPollingFrequencySecs float **1** (poll the file system with 1 second frequency + StreamReader bool **On** (process metadata in streaming mode) ============================== ===================== =========================================================== 3. ``InSituAnalysis``. The engine is ``SST``. The parameters are set to: diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index d844a6b2e2..4877f62b86 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -224,6 +224,7 @@ void IO::SetEngine(const std::string engineType) noexcept { finalEngineType = "BP4"; lf_InsertParam("OpenTimeoutSecs", "3600"); + lf_InsertParam("StreamReader", "true"); } /* "file" is handled entirely in IO::Open() as it needs the name */ else diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index 17173aeb88..0a8cf74fdf 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -157,24 +157,22 @@ void BP4Reader::Init() const Seconds timeoutSeconds = Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs); - // set poll to 1/100 of timeout - Seconds pollSeconds = timeoutSeconds / 100.0; - static const auto pollSecondsMin = Seconds(1.0); - if (pollSeconds < pollSecondsMin) - { - pollSeconds = pollSecondsMin; - } - static const auto pollSecondsMax = Seconds(10.0); - if (pollSeconds > pollSecondsMax) + Seconds pollSeconds = + Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs); + if (pollSeconds > timeoutSeconds) { - pollSeconds = pollSecondsMax; + pollSeconds = timeoutSeconds; } TimePoint timeoutInstant = std::chrono::steady_clock::now() + timeoutSeconds; OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); - InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + if (!m_BP4Deserializer.m_Parameters.StreamReader) + { + /* non-stream reader gets as much steps as available now */ + InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + } } bool BP4Reader::SleepOrQuit(const TimePoint &timeoutInstant, @@ -317,6 +315,71 @@ void BP4Reader::InitTransports() } } +/* Count index records to minimum 1 and maximum of N records so that + * expected metadata size is less then a predetermined constant + */ +void MetadataCalculateMinFileSize( + const format::BP4Deserializer &m_BP4Deserializer, + const std::string &IdxFileName, char *buf, size_t idxsize, bool hasHeader, + const size_t mdStartPos, size_t &newIdxSize, size_t &expectedMinFileSize) +{ + newIdxSize = 0; + expectedMinFileSize = 0; + + if (hasHeader && idxsize < m_BP4Deserializer.m_IndexRecordSize) + { + return; + } + + /* eliminate header for now for only calculating with records */ + if (hasHeader) + { + buf += m_BP4Deserializer.m_IndexRecordSize; + idxsize -= m_BP4Deserializer.m_IndexRecordSize; + } + + if (idxsize % m_BP4Deserializer.m_IndexRecordSize != 0) + { + throw std::runtime_error( + "FATAL CODING ERROR: ADIOS Index file " + IdxFileName + + " is assumed to always contain n*" + + std::to_string(m_BP4Deserializer.m_IndexRecordSize) + + " byte-length records. " + "Right now the length of index buffer is " + + std::to_string(idxsize) + " bytes."); + } + + const size_t nTotalRecords = idxsize / m_BP4Deserializer.m_IndexRecordSize; + if (nTotalRecords == 0) + { + // no (new) step entry in the index, so no metadata is expected + newIdxSize = 0; + expectedMinFileSize = 0; + return; + } + + size_t nRecords = 1; + expectedMinFileSize = *(uint64_t *)&( + buf[nRecords * m_BP4Deserializer.m_IndexRecordSize - 24]); + while (nRecords < nTotalRecords) + { + const size_t n = nRecords + 1; + const uint64_t mdEndPos = + *(uint64_t *)&(buf[n * m_BP4Deserializer.m_IndexRecordSize - 24]); + if (mdEndPos - mdStartPos > 16777216) + { + break; + } + expectedMinFileSize = mdEndPos; + ++nRecords; + } + newIdxSize = nRecords * m_BP4Deserializer.m_IndexRecordSize; + if (hasHeader) + { + newIdxSize += m_BP4Deserializer.m_IndexRecordSize; + } +} + uint64_t MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer, const std::string &IdxFileName, bool hasHeader) @@ -330,7 +393,9 @@ MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer, "The file size now is " + std::to_string(idxsize) + " bytes."); } - if ((hasHeader && idxsize < 128) || idxsize < 64) + if ((hasHeader && idxsize < m_BP4Deserializer.m_IndexHeaderSize + + m_BP4Deserializer.m_IndexRecordSize) || + idxsize < m_BP4Deserializer.m_IndexRecordSize) { // no (new) step entry in the index, so no metadata is expected return 0; @@ -450,7 +515,21 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0); if (idxFileSize > m_MDIndexFileProcessedSize) { - const size_t newIdxSize = idxFileSize - m_MDIndexFileProcessedSize; + const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize; + std::vector idxbuf(maxIdxSize); + m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize, + m_MDIndexFileProcessedSize); + size_t newIdxSize; + size_t expectedMinFileSize; + char *buf = idxbuf.data(); + + MetadataCalculateMinFileSize( + m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed, + m_MDFileProcessedSize, newIdxSize, expectedMinFileSize); + + // const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize( + // m_BP4Deserializer, m_Name, !m_IdxHeaderParsed); + if (m_BP4Deserializer.m_MetadataIndex.m_Buffer.size() < newIdxSize) { m_BP4Deserializer.m_MetadataIndex.Resize( @@ -458,15 +537,13 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, "call to BP4Reader::BeginStep/UpdateBuffer"); } m_BP4Deserializer.m_MetadataIndex.m_Position = 0; - m_MDIndexFileManager.ReadFile( - m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(), newIdxSize, - m_MDIndexFileProcessedSize); + std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize, + m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin()); /* Wait until as much metadata arrives in the file as much * is indicated by the existing index entries */ - uint64_t expectedMinFileSize = MetadataExpectedMinFileSize( - m_BP4Deserializer, m_Name, !m_IdxHeaderParsed); + size_t fileSize = 0; do { @@ -486,7 +563,8 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, * the buffer now. */ const size_t fileSize = m_MDFileManager.GetFileSize(0); - const size_t newMDSize = fileSize - m_MDFileProcessedSize; + const size_t newMDSize = + expectedMinFileSize - m_MDFileProcessedSize; if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize) { m_BP4Deserializer.m_Metadata.Resize( @@ -549,9 +627,6 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize) { m_MDIndexFileProcessedSize += newIdxSize; } - size_t idxsize = m_BP4Deserializer.m_MetadataIndex.m_Buffer.size(); - uint64_t lastpos = *(uint64_t *)&( - m_BP4Deserializer.m_MetadataIndex.m_Buffer[idxsize - 24]); } bool BP4Reader::CheckWriterActive() @@ -559,8 +634,9 @@ bool BP4Reader::CheckWriterActive() size_t flag = 0; if (m_BP4Deserializer.m_RankMPI == 0) { - std::vector header(64, '\0'); - m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0); + std::vector header(m_BP4Deserializer.m_IndexHeaderSize, '\0'); + m_MDIndexFileManager.ReadFile( + header.data(), m_BP4Deserializer.m_IndexHeaderSize, 0, 0); bool active = m_BP4Deserializer.ReadActiveFlag(header); flag = (active ? 1 : 0); } diff --git a/source/adios2/toolkit/format/bp/BPBase.cpp b/source/adios2/toolkit/format/bp/BPBase.cpp index 10a9ddf079..5bcc7593ec 100644 --- a/source/adios2/toolkit/format/bp/BPBase.cpp +++ b/source/adios2/toolkit/format/bp/BPBase.cpp @@ -196,6 +196,11 @@ void BPBase::Init(const Params ¶meters, const std::string hint, static_cast(helper::StringTo( value, " in Parameter key=BurstBufferVerbose " + hint)); } + else if (key == "streamreader") + { + parsedParameters.StreamReader = helper::StringTo( + value, " in Parameter key=StreamReader " + hint); + } } if (!engineType.empty()) { diff --git a/source/adios2/toolkit/format/bp/BPBase.h b/source/adios2/toolkit/format/bp/BPBase.h index c1285df588..548fc66b95 100644 --- a/source/adios2/toolkit/format/bp/BPBase.h +++ b/source/adios2/toolkit/format/bp/BPBase.h @@ -210,6 +210,11 @@ class BPBase bool BurstBufferDrain = true; /** Verbose level for burst buffer draining thread */ int BurstBufferVerbose = 0; + + /** Stream reader flag: process metadata step-by-step + * instead of parsing everything available + */ + bool StreamReader = false; }; /** Return type of the ResizeBuffer function. */ diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Base.h b/source/adios2/toolkit/format/bp/bp4/BP4Base.h index 054161a765..c5fb29bb4d 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Base.h +++ b/source/adios2/toolkit/format/bp/bp4/BP4Base.h @@ -56,6 +56,8 @@ class BP4Base : virtual public BPBase BufferSTL m_MetadataIndex; /** Positions of flags in Index Table Header that Reader uses */ + static constexpr size_t m_IndexHeaderSize = 64; + static constexpr size_t m_IndexRecordSize = 64; static constexpr size_t m_EndianFlagPosition = 36; static constexpr size_t m_BPVersionPosition = 37; static constexpr size_t m_ActiveFlagPosition = 38; diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 60ed8f65a4..c83a49a6ed 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -5,8 +5,10 @@ set(BP3_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp3) set(BP4_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp4) +set(FS_DIR ${CMAKE_CURRENT_BINARY_DIR}/filestream) file(MAKE_DIRECTORY ${BP3_DIR}) file(MAKE_DIRECTORY ${BP4_DIR}) +file(MAKE_DIRECTORY ${FS_DIR}) macro(bp3_bp4_gtest_add_tests_helper testname mpi) gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .BP3 @@ -68,3 +70,11 @@ gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .BP4 WORKING_DIRECTORY ${BP4_DIR} EXTRA_ARGS "BP4" ) +# FileStream is BP4 + StreamReader=true +gtest_add_tests_helper(StepsInSituGlobalArray MPI_ALLOW BP Engine.BP. .FileStream + WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream" +) +gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .FileStream + WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream" +) + diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index a2b8d343d6..319bf12de1 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -12,6 +12,7 @@ if(ADIOS2_HAVE_SST) gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS") gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP") gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".BP4_stream" EXTRA_ARGS "BP4" "OpenTimeoutSecs=5") + gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".FileStream" EXTRA_ARGS "FileStream") endif() foreach(helper @@ -230,7 +231,7 @@ endif() if(NOT MSVC) # not on windows # BP4 streaming tests start with all the simple tests, but with a timeout added on open LIST (APPEND BP4_STREAM_TESTS ${ALL_SIMPLE_TESTS} ${SPECIAL_TESTS}) - MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10" "${BP4_STREAM_TESTS}") + MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${BP4_STREAM_TESTS}") # SharedVars fail with BP4_streaming* list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*SharedVar.BPS$") # Discard not a feature of BP4 @@ -268,6 +269,37 @@ if(NOT MSVC) # not on windows endif() +# +# Setup streaming tests for FileStream virtual engine (BP4+StreamReader=true) +# +if(NOT MSVC) # not on windows + # FileStream streaming tests start with all the simple tests, but with a timeout added on open + LIST (APPEND FILESTREAM_TESTS ${SIMPLE_TESTS} ${SIMPLE_MPI_TESTS}) + MutateTestSet( FILESTREAM_TESTS "FS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${FILESTREAM_TESTS}") + # SharedVars fail with file_streaming* + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$") + # SharedVars fail with file_streaming* + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$") + # Local fail with file_streaming* + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*Local.FS$") + # The nobody-writes-data-in-a-timestep tests don't work for any BP-file based engine + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoData.FS$") + # Don't need to repeat tests that are identical for BP4 and FileStream + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$") + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*TimeoutOnOpen.FS$") + list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$") + + foreach(test ${FILESTREAM_TESTS}) + add_common_test(${test} FileStream) + endforeach() + + MutateTestSet( FileStream_BBSTREAM_TESTS "BB" writer "BurstBufferPath=bb,BurstBufferVerbose=2" "${FILESTREAM_TESTS}") + foreach(test ${FileStream_BBSTREAM_TESTS}) + add_common_test(${test} FileStream) + endforeach() + +endif() + # # Setup tests for HDF5 engine #