Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream reading mode for BP4 metadata processing. Read and parse o… #2397

Merged
merged 4 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/user_guide/source/engines/bp4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ This engine allows the user to fine tune the buffering operations through the fo

16. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc).


17. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open().

============================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
Expand All @@ -98,6 +98,7 @@ This engine allows the user to fine tune the buffering operations through the fo
BurstBufferPath string **""**, /mnt/bb/norbert, /ssd
BurstBufferDrain string On/Off **On**, Off
BurstBufferVerbose integer, 0-2 **0**, ``1``, ``2``
StreamReader string On/Off On, **Off**
============================== ===================== ===========================================================


Expand Down
1 change: 1 addition & 0 deletions docs/user_guide/source/engines/virtual_engines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ These are the actual settings in ADIOS when a virtual engine is selected. The pa
============================== ===================== ===========================================================
OpenTimeoutSecs float **3600** (wait for up to an hour)
BeginStepPollingFrequencySecs float **1** (poll the file system with 1 second frequency
StreamReader bool **On** (process metadata in streaming mode)
============================== ===================== ===========================================================

3. ``InSituAnalysis``. The engine is ``SST``. The parameters are set to:
Expand Down
1 change: 1 addition & 0 deletions source/adios2/core/IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void IO::SetEngine(const std::string engineType) noexcept
{
finalEngineType = "BP4";
lf_InsertParam("OpenTimeoutSecs", "3600");
lf_InsertParam("StreamReader", "true");
}
/* "file" is handled entirely in IO::Open() as it needs the name */
else
Expand Down
124 changes: 100 additions & 24 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,22 @@ void BP4Reader::Init()
const Seconds timeoutSeconds =
Seconds(m_BP4Deserializer.m_Parameters.OpenTimeoutSecs);

// set poll to 1/100 of timeout
Seconds pollSeconds = timeoutSeconds / 100.0;
static const auto pollSecondsMin = Seconds(1.0);
if (pollSeconds < pollSecondsMin)
{
pollSeconds = pollSecondsMin;
}
static const auto pollSecondsMax = Seconds(10.0);
if (pollSeconds > pollSecondsMax)
Seconds pollSeconds =
Seconds(m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs);
if (pollSeconds > timeoutSeconds)
{
pollSeconds = pollSecondsMax;
pollSeconds = timeoutSeconds;
}

TimePoint timeoutInstant =
std::chrono::steady_clock::now() + timeoutSeconds;

OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds);
InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds);
if (!m_BP4Deserializer.m_Parameters.StreamReader)
{
/* non-stream reader gets as much steps as available now */
InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds);
}
}

bool BP4Reader::SleepOrQuit(const TimePoint &timeoutInstant,
Expand Down Expand Up @@ -317,6 +315,71 @@ void BP4Reader::InitTransports()
}
}

/* Count index records to minimum 1 and maximum of N records so that
* expected metadata size is less then a predetermined constant
*/
void MetadataCalculateMinFileSize(
const format::BP4Deserializer &m_BP4Deserializer,
const std::string &IdxFileName, char *buf, size_t idxsize, bool hasHeader,
const size_t mdStartPos, size_t &newIdxSize, size_t &expectedMinFileSize)
{
newIdxSize = 0;
expectedMinFileSize = 0;

if (hasHeader && idxsize < m_BP4Deserializer.m_IndexRecordSize)
{
return;
}

/* eliminate header for now for only calculating with records */
if (hasHeader)
{
buf += m_BP4Deserializer.m_IndexRecordSize;
idxsize -= m_BP4Deserializer.m_IndexRecordSize;
}

if (idxsize % m_BP4Deserializer.m_IndexRecordSize != 0)
{
throw std::runtime_error(
"FATAL CODING ERROR: ADIOS Index file " + IdxFileName +
" is assumed to always contain n*" +
std::to_string(m_BP4Deserializer.m_IndexRecordSize) +
" byte-length records. "
"Right now the length of index buffer is " +
std::to_string(idxsize) + " bytes.");
}

const size_t nTotalRecords = idxsize / m_BP4Deserializer.m_IndexRecordSize;
if (nTotalRecords == 0)
{
// no (new) step entry in the index, so no metadata is expected
newIdxSize = 0;
expectedMinFileSize = 0;
return;
}

size_t nRecords = 1;
expectedMinFileSize = *(uint64_t *)&(
buf[nRecords * m_BP4Deserializer.m_IndexRecordSize - 24]);
while (nRecords < nTotalRecords)
{
const size_t n = nRecords + 1;
const uint64_t mdEndPos =
*(uint64_t *)&(buf[n * m_BP4Deserializer.m_IndexRecordSize - 24]);
if (mdEndPos - mdStartPos > 16777216)
{
break;
}
expectedMinFileSize = mdEndPos;
++nRecords;
}
newIdxSize = nRecords * m_BP4Deserializer.m_IndexRecordSize;
if (hasHeader)
{
newIdxSize += m_BP4Deserializer.m_IndexRecordSize;
}
}

uint64_t
MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer,
const std::string &IdxFileName, bool hasHeader)
Expand All @@ -330,7 +393,9 @@ MetadataExpectedMinFileSize(const format::BP4Deserializer &m_BP4Deserializer,
"The file size now is " +
std::to_string(idxsize) + " bytes.");
}
if ((hasHeader && idxsize < 128) || idxsize < 64)
if ((hasHeader && idxsize < m_BP4Deserializer.m_IndexHeaderSize +
m_BP4Deserializer.m_IndexRecordSize) ||
idxsize < m_BP4Deserializer.m_IndexRecordSize)
{
// no (new) step entry in the index, so no metadata is expected
return 0;
Expand Down Expand Up @@ -450,23 +515,35 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
const size_t idxFileSize = m_MDIndexFileManager.GetFileSize(0);
if (idxFileSize > m_MDIndexFileProcessedSize)
{
const size_t newIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
const size_t maxIdxSize = idxFileSize - m_MDIndexFileProcessedSize;
std::vector<char> idxbuf(maxIdxSize);
m_MDIndexFileManager.ReadFile(idxbuf.data(), maxIdxSize,
m_MDIndexFileProcessedSize);
size_t newIdxSize;
size_t expectedMinFileSize;
char *buf = idxbuf.data();

MetadataCalculateMinFileSize(
m_BP4Deserializer, m_Name, buf, maxIdxSize, !m_IdxHeaderParsed,
m_MDFileProcessedSize, newIdxSize, expectedMinFileSize);

// const uint64_t expectedMinFileSize = MetadataExpectedMinFileSize(
// m_BP4Deserializer, m_Name, !m_IdxHeaderParsed);

if (m_BP4Deserializer.m_MetadataIndex.m_Buffer.size() < newIdxSize)
{
m_BP4Deserializer.m_MetadataIndex.Resize(
newIdxSize, "re-allocating metadata index buffer, in "
"call to BP4Reader::BeginStep/UpdateBuffer");
}
m_BP4Deserializer.m_MetadataIndex.m_Position = 0;
m_MDIndexFileManager.ReadFile(
m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(), newIdxSize,
m_MDIndexFileProcessedSize);
std::copy(idxbuf.begin(), idxbuf.begin() + newIdxSize,
m_BP4Deserializer.m_MetadataIndex.m_Buffer.begin());

/* Wait until as much metadata arrives in the file as much
* is indicated by the existing index entries
*/
uint64_t expectedMinFileSize = MetadataExpectedMinFileSize(
m_BP4Deserializer, m_Name, !m_IdxHeaderParsed);

size_t fileSize = 0;
do
{
Expand All @@ -486,7 +563,8 @@ size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant,
* the buffer now.
*/
const size_t fileSize = m_MDFileManager.GetFileSize(0);
const size_t newMDSize = fileSize - m_MDFileProcessedSize;
const size_t newMDSize =
expectedMinFileSize - m_MDFileProcessedSize;
if (m_BP4Deserializer.m_Metadata.m_Buffer.size() < newMDSize)
{
m_BP4Deserializer.m_Metadata.Resize(
Expand Down Expand Up @@ -549,18 +627,16 @@ void BP4Reader::ProcessMetadataForNewSteps(const size_t newIdxSize)
{
m_MDIndexFileProcessedSize += newIdxSize;
}
size_t idxsize = m_BP4Deserializer.m_MetadataIndex.m_Buffer.size();
uint64_t lastpos = *(uint64_t *)&(
m_BP4Deserializer.m_MetadataIndex.m_Buffer[idxsize - 24]);
}

bool BP4Reader::CheckWriterActive()
{
size_t flag = 0;
if (m_BP4Deserializer.m_RankMPI == 0)
{
std::vector<char> header(64, '\0');
m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0);
std::vector<char> header(m_BP4Deserializer.m_IndexHeaderSize, '\0');
m_MDIndexFileManager.ReadFile(
header.data(), m_BP4Deserializer.m_IndexHeaderSize, 0, 0);
bool active = m_BP4Deserializer.ReadActiveFlag(header);
flag = (active ? 1 : 0);
}
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ void BPBase::Init(const Params &parameters, const std::string hint,
static_cast<int>(helper::StringTo<int32_t>(
value, " in Parameter key=BurstBufferVerbose " + hint));
}
else if (key == "streamreader")
{
parsedParameters.StreamReader = helper::StringTo<bool>(
value, " in Parameter key=StreamReader " + hint);
}
}
if (!engineType.empty())
{
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ class BPBase
bool BurstBufferDrain = true;
/** Verbose level for burst buffer draining thread */
int BurstBufferVerbose = 0;

/** Stream reader flag: process metadata step-by-step
* instead of parsing everything available
*/
bool StreamReader = false;
};

/** Return type of the ResizeBuffer function. */
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class BP4Base : virtual public BPBase
BufferSTL m_MetadataIndex;

/** Positions of flags in Index Table Header that Reader uses */
static constexpr size_t m_IndexHeaderSize = 64;
static constexpr size_t m_IndexRecordSize = 64;
static constexpr size_t m_EndianFlagPosition = 36;
static constexpr size_t m_BPVersionPosition = 37;
static constexpr size_t m_ActiveFlagPosition = 38;
Expand Down
10 changes: 10 additions & 0 deletions testing/adios2/engine/bp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

set(BP3_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp3)
set(BP4_DIR ${CMAKE_CURRENT_BINARY_DIR}/bp4)
set(FS_DIR ${CMAKE_CURRENT_BINARY_DIR}/filestream)
file(MAKE_DIRECTORY ${BP3_DIR})
file(MAKE_DIRECTORY ${BP4_DIR})
file(MAKE_DIRECTORY ${FS_DIR})

macro(bp3_bp4_gtest_add_tests_helper testname mpi)
gtest_add_tests_helper(${testname} ${mpi} BP Engine.BP. .BP3
Expand Down Expand Up @@ -68,3 +70,11 @@ gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .BP4
WORKING_DIRECTORY ${BP4_DIR} EXTRA_ARGS "BP4"
)

# FileStream is BP4 + StreamReader=true
gtest_add_tests_helper(StepsInSituGlobalArray MPI_ALLOW BP Engine.BP. .FileStream
WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream"
)
gtest_add_tests_helper(StepsInSituLocalArray MPI_ALLOW BP Engine.BP. .FileStream
WORKING_DIRECTORY ${FS_DIR} EXTRA_ARGS "FileStream"
)

34 changes: 33 additions & 1 deletion testing/adios2/engine/staging-common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ if(ADIOS2_HAVE_SST)
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.FFS" EXTRA_ARGS "SST" "MarshalMethod=FFS")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".SST.BP" EXTRA_ARGS "SST" "MarshalMethod=BP")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".BP4_stream" EXTRA_ARGS "BP4" "OpenTimeoutSecs=5")
gtest_add_tests_helper(Threads MPI_NONE "" Engine.Staging. ".FileStream" EXTRA_ARGS "FileStream")
endif()

foreach(helper
Expand Down Expand Up @@ -230,7 +231,7 @@ endif()
if(NOT MSVC) # not on windows
# BP4 streaming tests start with all the simple tests, but with a timeout added on open
LIST (APPEND BP4_STREAM_TESTS ${ALL_SIMPLE_TESTS} ${SPECIAL_TESTS})
MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10" "${BP4_STREAM_TESTS}")
MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${BP4_STREAM_TESTS}")
# SharedVars fail with BP4_streaming*
list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*SharedVar.BPS$")
# Discard not a feature of BP4
Expand Down Expand Up @@ -268,6 +269,37 @@ if(NOT MSVC) # not on windows

endif()

#
# Setup streaming tests for FileStream virtual engine (BP4+StreamReader=true)
#
if(NOT MSVC) # not on windows
# FileStream streaming tests start with all the simple tests, but with a timeout added on open
LIST (APPEND FILESTREAM_TESTS ${SIMPLE_TESTS} ${SIMPLE_MPI_TESTS})
MutateTestSet( FILESTREAM_TESTS "FS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${FILESTREAM_TESTS}")
# SharedVars fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$")
# SharedVars fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*SharedVar.FS$")
# Local fail with file_streaming*
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*Local.FS$")
# The nobody-writes-data-in-a-timestep tests don't work for any BP-file based engine
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoData.FS$")
# Don't need to repeat tests that are identical for BP4 and FileStream
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$")
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*TimeoutOnOpen.FS$")
list (FILTER FILESTREAM_TESTS EXCLUDE REGEX ".*NoReaderNoWait.FS$")

foreach(test ${FILESTREAM_TESTS})
add_common_test(${test} FileStream)
endforeach()

MutateTestSet( FileStream_BBSTREAM_TESTS "BB" writer "BurstBufferPath=bb,BurstBufferVerbose=2" "${FILESTREAM_TESTS}")
foreach(test ${FileStream_BBSTREAM_TESTS})
add_common_test(${test} FileStream)
endforeach()

endif()

#
# Setup tests for HDF5 engine
#
Expand Down