Skip to content

Commit

Permalink
Merge pull request #3042 from pnorbert/bp5-filterstep
Browse files Browse the repository at this point in the history
String parameter SelectSteps added to BP5 so that a reader only sees …
  • Loading branch information
pnorbert authored Feb 5, 2022
2 parents fb70532 + 91f3b00 commit 1cefd0c
Show file tree
Hide file tree
Showing 10 changed files with 670 additions and 63 deletions.
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ add_library(adios2_core
helper/adiosXMLUtil.cpp
helper/adiosYAML.cpp
helper/adiosLog.cpp
helper/adiosRangeFilter.cpp

#engine derived classes
engine/bp3/BP3Reader.cpp engine/bp3/BP3Reader.tcc
Expand Down
9 changes: 8 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ class BP5Engine
{
public:
int m_RankMPI = 0;
/* metadata index table*/
/* metadata index table
0: pos in memory for step (after filtered read)
1: size of metadata
2: flush count
3: pos in index where data offsets are enumerated
4: abs. pos in metadata File for step
*/
std::unordered_map<uint64_t, std::vector<uint64_t>> m_MetadataIndexTable;

struct Minifooter
Expand Down Expand Up @@ -129,6 +135,7 @@ class BP5Engine
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(AppendAfterSteps, Int, int, INT_MAX) \
MACRO(SelectSteps, String, std::string, (char *)(intptr_t)0) \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
177 changes: 115 additions & 62 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ void BP5Reader::Init()
ParseParams(m_IO, m_Parameters);
m_ReaderIsRowMajor = (m_IO.m_ArrayOrder == ArrayOrdering::RowMajor);
InitTransports();
if (!m_Parameters.SelectSteps.empty())
{
m_SelectedSteps.ParseSelection(m_Parameters.SelectSteps);
}

/* Do a collective wait for the file(s) to appear within timeout.
Make sure every process comes to the same conclusion */
Expand Down Expand Up @@ -486,8 +490,8 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
const Seconds &pollSeconds,
const Seconds &timeoutSeconds)
{
/* Put all metadata in buffer and parse in random access mode */
size_t newIdxSize = 0;
// Put all metadata in buffer
if (m_Comm.Rank() == 0)
{
/* Read metadata index table into memory */
Expand All @@ -500,7 +504,30 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
"in call to BPFileReader Open");
m_MDIndexFileManager.ReadFile(m_MetadataIndex.m_Buffer.data(),
metadataIndexFileSize);
}
m_MDIndexFileAlreadyReadSize = metadataIndexFileSize;
newIdxSize = metadataIndexFileSize;
}

newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0);

if (newIdxSize > 0)
{
// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer);

/* Parse metadata index table */
ParseMetadataIndex(m_MetadataIndex, 0, true, false);
// now we are sure the index header has been parsed,
// first step parsing done
// m_FilteredMetadataInfo is created
m_IdxHeaderParsed = true;
}

if (newIdxSize > 0)
{
if (m_Comm.Rank() == 0)
{
/* Read metametadata into memory */
const size_t metametadataFileSize =
m_FileMetaMetadataManager.GetFileSize(0);
Expand All @@ -510,34 +537,43 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
m_FileMetaMetadataManager.ReadFile(m_MetaMetadata.m_Buffer.data(),
metametadataFileSize);

size_t fileSize = 0;
fileSize = m_MDFileManager.GetFileSize(0);
#ifdef NOTDEF
size_t fileFilteredSize = 0;
for (auto p : m_FilteredMetadataInfo)
{
fileFilteredSize += p.second;
}

/* Read metadata file into memory but first make sure
* it has the content that the index table refers to */
uint64_t expectedMinFileSize =
MetadataExpectedMinFileSize(m_Name, true);
auto p = m_FilteredMetadataInfo.back();
uint64_t expectedMinFileSize = p.first + p.second;
size_t actualFileSize = 0;
do
{
fileSize = m_MDFileManager.GetFileSize(0);
if (fileSize >= expectedMinFileSize)
actualFileSize = m_MDFileManager.GetFileSize(0);
if (actualFileSize >= expectedMinFileSize)
{
break;
}
} while (SleepOrQuit(timeoutInstant, pollSeconds));

if (fileSize >= expectedMinFileSize)
if (actualFileSize >= expectedMinFileSize)
{
#endif
m_Metadata.Resize(
fileSize,
"allocating metadata buffer, in call to BP5Reader Open");

m_MDFileManager.ReadFile(m_Metadata.m_Buffer.data(), fileSize);
m_MDFileAlreadyReadSize = fileSize;
m_MDIndexFileAlreadyReadSize = metadataIndexFileSize;
newIdxSize = metadataIndexFileSize;
#ifdef NOTDEF
m_Metadata.Resize(fileFilteredSize,
"allocating metadata buffer, "
"in call to BP5Reader Open");

size_t mempos = 0;
for (auto p : m_FilteredMetadataInfo)
{
/*std::cout << "Read metadata pos = " << p.first
<< " size = " << p.second
<< " to mempos = " << mempos << std::endl;*/
m_MDFileManager.ReadFile(
m_Metadata.m_Buffer.data() + mempos, p.second, p.first);
mempos += p.second;
}
m_MDFileAlreadyReadSize = expectedMinFileSize;
}
else
{
Expand All @@ -547,44 +583,28 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant,
"has not contained enough data within "
"the specified timeout of " +
std::to_string(timeoutSeconds.count()) +
" seconds. index size = " +
std::to_string(metadataIndexFileSize) +
" metadata size = " + std::to_string(fileSize) +
" seconds. index size = " + std::to_string(newIdxSize) +
" metadata size = " + std::to_string(actualFileSize) +
" expected size = " + std::to_string(expectedMinFileSize) +
". One reason could be if the reader finds old data while "
". One reason could be if the reader finds old data "
"while "
"the writer is creating the new files.");
}
#endif
}
}

newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0);

if (newIdxSize > 0)
{
// broadcast buffer to all ranks from zero
m_Comm.BroadcastVector(m_Metadata.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer);

// broadcast metadata index buffer to all ranks from zero
m_Comm.BroadcastVector(m_MetaMetadata.m_Buffer);

/* Parse metadata index table */
ParseMetadataIndex(m_MetadataIndex, 0, true, false);
// now we are sure the index header has been parsed, first step parsing
// done

m_BP5Deserializer =
new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor,
(m_OpenMode == Mode::ReadRandomAccess));
m_BP5Deserializer->m_Engine = this;

InstallMetaMetaData(m_MetaMetadata);

m_IdxHeaderParsed = true;

if (m_OpenMode == Mode::ReadRandomAccess)
{
for (size_t Step = 0; Step < m_MetadataIndexTable.size(); Step++)
Expand Down Expand Up @@ -672,9 +692,13 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}

// Read each record now
uint64_t currentStep = 0;
uint64_t absStepInFile = 0;
uint64_t lastMapStep = 0;
uint64_t lastWriterCount = 0;
uint64_t MetadataPosTotalSkip = 0;
m_FilteredMetadataInfo.clear();
uint64_t minfo_pos = 0;
uint64_t minfo_size = 0;
do
{
std::vector<uint64_t> ptrs;
Expand All @@ -687,9 +711,15 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

if (!absStepInFile)
{
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (hasWriterMap)
{
auto p = m_WriterMap.emplace(currentStep, WriterMapStruct());
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
Expand All @@ -705,40 +735,63 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
s.RankToSubfile.push_back(subfileIdx);
}
lastMapStep = currentStep;
lastMapStep = m_StepsCount;
lastWriterCount = s.WriterCount;
}
m_WriterMapIndex.push_back(lastMapStep);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
m_MetadataIndexTable[currentStep] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
if (m_SelectedSteps.IsSelected(absStepInFile))
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
m_WriterMapIndex.push_back(lastMapStep);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
ptrs.push_back(MetadataPos); // absolute pos in file before read
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize << "; ";
std::cout << "loc:" << DataPos << std::endl;
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif
minfo_size += MetadataSize;
m_StepsCount++;
}
else
{
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
}
minfo_pos = MetadataPos;
minfo_size = 0;
}

// skip over the writer -> data file offset records
position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
absStepInFile++;
} while (!oneStepOnly && position < buffer.size());
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(std::make_pair(minfo_pos, minfo_size));
}
}

void BP5Reader::DoGetAbsoluteSteps(const VariableBase &variable,
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "adios2/core/Engine.h"
#include "adios2/engine/bp5/BP5Engine.h"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosRangeFilter.h"
#include "adios2/toolkit/format/bp5/BP5Deserializer.h"
#include "adios2/toolkit/transportman/TransportMan.h"

Expand Down Expand Up @@ -96,6 +97,12 @@ class BP5Reader : public BP5Engine, public Engine
bool m_FirstStep = true;
bool m_IdxHeaderParsed = false; // true after first index parsing

/** used to filter steps */
helper::RangeFilter m_SelectedSteps;

// offset/size pairs to read sections of metadata from file in InitBuffer
std::vector<std::pair<uint64_t, uint64_t>> m_FilteredMetadataInfo;

Minifooter m_Minifooter;

void Init();
Expand Down
Loading

0 comments on commit 1cefd0c

Please sign in to comment.