Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an issue when reading blocks in streaming mode #4332

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ BP5Deserializer::GenerateReadRequests(const bool doAllocTempBuffers, size_t *max
MetaArrayRecOperator *writer_meta_base =
(MetaArrayRecOperator *)GetMetadataBase((struct BP5VarRec *)Req->VarRec,
Step, WriterRank);
if (!writer_meta_base)
if (!writer_meta_base || !writer_meta_base->BlockCount)
{
continue; // Not writen on this step
}
Expand Down
194 changes: 194 additions & 0 deletions testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,200 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal2DChangeCount)
}
}

template <class T>
void print_vector(std::vector<T> &v)
{
std::cout << "{ ";
for (auto &e : v)
{
std::cout << e << " ";
}
std::cout << "}" << std::endl;
}

template <class T, size_t N>
void print_array(std::array<T, N> &a)
{
std::cout << "{ ";
for (auto &e : a)
{
std::cout << e << " ";
}
std::cout << "}" << std::endl;
}

TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocalVaryingNumberOfBlocks)
{
/* Write different number of blocks per step, skipping some rank in some steps.
Each block size is different too. Test reading each block properly.
Only the first 8 MPI ranks do work, the others just loop empty.
*/
constexpr int NSTEPS = 3;
constexpr int MAXMPISIZE = 8;
const int blocks[NSTEPS][MAXMPISIZE] = {
{1, 1, 1, 1, 1, 1, 1, 1},
{1, 0, 1, 0, 1, 0, 1, 0},
{0, 1, 0, 1, 0, 1, 0, 1},
};
const size_t blocksizes[NSTEPS][MAXMPISIZE] = {
{9, 8, 7, 6, 5, 6, 7, 3},
{2, 0, 4, 0, 6, 0, 9, 0},
{0, 1, 0, 4, 0, 7, 0, 5},
};

int mpiRank = 0;
int mpiSize = 1;
#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + "_MPI.bp");
#else
const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + ".bp");
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");
const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{(size_t)1};

auto var_r32 = io.DefineVariable<float>("r32", shape, start, count);

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "--- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " ---" << std::endl;
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, static_cast<int>(step), mpiRank, mpiSize);
print_array(currentTestData.R32);

int bpos = (mpiRank < MAXMPISIZE ? mpiRank : -1);

bpWriter.BeginStep();
if (bpos >= 0 && blocks[step][bpos])
{
size_t nx = blocksizes[step][bpos];
var_r32.SetSelection({{}, {nx}});
bpWriter.Put(var_r32, currentTestData.R32.data());
}
bpWriter.EndStep();
}
bpWriter.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (!mpiRank)
{
adios2::IO io = adios.DeclareIO("ReaderIO");
#if ADIOS2_USE_MPI
adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read, MPI_COMM_SELF);
#else
adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);
#endif

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "====== Read Step " << step << " ==========" << std::endl;
bpReader.BeginStep();
auto var_r32 = io.InquireVariable<float>("r32");

size_t nblocks = 0;
int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE);
for (size_t b = 0; b < static_cast<size_t>(nproc); ++b)
{
if (blocks[step][b] == 0)
{
continue;
}

std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---"
<< std::endl;
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), static_cast<int>(b), mpiSize);
print_array(currentTestData.R32);

var_r32.SetBlockSelection(nblocks);
std::vector<float> dataIn;
bpReader.Get(var_r32, dataIn, adios2::Mode::Sync);
EXPECT_EQ(dataIn.size(), blocksizes[step][b]);

for (size_t i = 0; i < dataIn.size(); ++i)
{
EXPECT_EQ(dataIn[i], currentTestData.R32[i]);
}

++nblocks;
}
bpReader.EndStep();
}
bpReader.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (!mpiRank)
{
adios2::IO io = adios.DeclareIO("ReaderIORRA");
#if ADIOS2_USE_MPI
adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess, MPI_COMM_SELF);
#else
adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess);
#endif

auto var_r32 = io.InquireVariable<float>("r32");

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "====== Read Step " << step << " ==========" << std::endl;
var_r32.SetStepSelection({step, 1});

size_t nblocks = 0;
int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE);
for (size_t b = 0; b < static_cast<size_t>(nproc); ++b)
{
if (blocks[step][b] == 0)
{
continue;
}

std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---"
<< std::endl;
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), static_cast<int>(b), mpiSize);
print_array(currentTestData.R32);

var_r32.SetBlockSelection(nblocks);
std::vector<float> dataIn;
bpReader.Get(var_r32, dataIn, adios2::Mode::Sync);
EXPECT_EQ(dataIn.size(), blocksizes[step][b]);

for (size_t i = 0; i < dataIn.size(); ++i)
{
EXPECT_EQ(dataIn[i], currentTestData.R32[i]);
}

++nblocks;
}
}
bpReader.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif
}

int main(int argc, char **argv)
{
#if ADIOS2_USE_MPI
Expand Down