Skip to content

Commit

Permalink
Add test for local operator, tweak posix transport to fail on EOF whe…
Browse files Browse the repository at this point in the history
…n desired, add a unit test
  • Loading branch information
eisenhauer committed Jan 29, 2024
1 parent b2da76e commit ae245c6
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 10 deletions.
11 changes: 11 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ std::pair<double, double> BP5Reader::ReadData(adios2::transportman::TransportMan
}
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, m_IO.m_TransportsParameters[0],
/*{{"transport", "File"}},*/ false);
if (!m_WriterIsActive)
{
Params transportParameters;
transportParameters["FailOnEOF"] = "true";
FileManager.SetParameters(transportParameters, -1);
}
}
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);
Expand Down Expand Up @@ -300,6 +306,11 @@ void BP5Reader::PerformLocalGets()
m_WriterMap[m_WriterMapIndex[r2.Timestep]].RankToSubfile[r2.WriterRank]);
};

if (!m_InitialWriterActiveCheckDone)
{
CheckWriterActive();
m_InitialWriterActiveCheckDone = true;
}
// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
m_JSONProfiler.Start("DataRead");
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class BP5Reader : public BP5Engine, public Engine

Minifooter m_Minifooter;

bool m_InitialWriterActiveCheckDone = false;

void Init();
void InitParameters();
void InitTransports();
Expand Down
37 changes: 33 additions & 4 deletions source/adios2/toolkit/transport/file/FilePOSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* FileDescriptor.cpp file I/O using POSIX I/O library
* FilePOSIX.cpp file I/O using POSIX I/O library
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
*/
#include "FilePOSIX.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"

#ifdef ADIOS2_HAVE_O_DIRECT
#ifndef _GNU_SOURCE
Expand All @@ -22,7 +21,8 @@
#include <fcntl.h> // open
#include <sys/stat.h> // open, fstat
#include <sys/types.h> // open
#include <unistd.h> // write, close, ftruncate
#include <thread>
#include <unistd.h> // write, close, ftruncate

/// \cond EXCLUDE_FROM_DOXYGEN
#include <ios> //std::ios_base::failure
Expand Down Expand Up @@ -398,6 +398,7 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start)
void FilePOSIX::Read(char *buffer, size_t size, size_t start)
{
auto lf_Read = [&](char *buffer, size_t size) {
size_t backoff_ns = 20;
while (size > 0)
{
ProfilerStart("read");
Expand All @@ -417,6 +418,25 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
"Toolkit", "transport::file::FilePOSIX", "Read",
"couldn't read from file " + m_Name + " " + SysErrMsg());
}
else if (readSize == 0)
{
if (m_FailOnEOF)
{
helper::Throw<std::ios_base::failure>(
"Toolkit", "transport::file::FilePOSIX", "Read",
"Read past end of file on " + m_Name + " " + SysErrMsg());
}
else
{
// read past EOF, but we're to wait for data. Exponential backoff with a limit
// of .5 sec (500,000,000 nanosec)
std::this_thread::sleep_for(std::chrono::nanoseconds(backoff_ns));
constexpr size_t backoff_limit = 500 * 1000 * 1000;
backoff_ns *= 2;
if (backoff_ns > backoff_limit)
backoff_ns = backoff_limit;
}
}

buffer += readSize;
size -= readSize;
Expand Down Expand Up @@ -595,5 +615,14 @@ void FilePOSIX::Truncate(const size_t length)

void FilePOSIX::MkDir(const std::string &fileName) {}

void FilePOSIX::SetParameters(const Params &params)
{
// Parameters are set from config parameters if present
// Otherwise, they are set from environment if present
// Otherwise, they remain at their default value

helper::GetParameter(params, "FailOnEOF", m_FailOnEOF);
}

} // end namespace transport
} // end namespace adios2
5 changes: 3 additions & 2 deletions source/adios2/toolkit/transport/file/FilePOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* FileDescriptor.h wrapper of POSIX library functions for file I/O
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
Expand Down Expand Up @@ -68,10 +66,13 @@ class FilePOSIX : public Transport

void MkDir(const std::string &fileName) final;

void SetParameters(const Params &params) final;

private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
int m_Errno = 0;
bool m_FailOnEOF = false; // default to false for historic reasons
bool m_IsOpening = false;
std::future<int> m_OpenFuture;
bool m_DirectIO = false;
Expand Down
22 changes: 20 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.cpp
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#include "TransportMan.h"
Expand Down Expand Up @@ -410,6 +408,26 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,
itTransport->second->Read(buffer, size, start);
}

void TransportMan::SetParameters(const Params &params, const int transportIndex)
{
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
{
auto &transport = transportPair.second;

transport->SetParameters(params);
}
}
else
{
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport,
", in call to SetParameters with index " + std::to_string(transportIndex));
itTransport->second->SetParameters(params);
}
}

void TransportMan::FlushFiles(const int transportIndex)
{
if (transportIndex == -1)
Expand Down
9 changes: 7 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.h : manages a vector of transports
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_TRANSPORTMANAGER_H_
Expand Down Expand Up @@ -210,6 +208,13 @@ class TransportMan
*/
bool FileExists(const std::string &name, const Params &parameters, const bool profile);

/**
* Set Transport Paramers
* @param params
* @param transportIndex
*/
void SetParameters(const Params &params, const int transportIndex = -1);

protected:
core::IO &m_IO;
helper::Comm const &m_Comm;
Expand Down
129 changes: 129 additions & 0 deletions testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,134 @@ void BZIP2Accuracy1D(const std::string accuracy)
}
}

void BZIP2Accuracy1DLocal(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
// write a Nx 1D array
const std::string fname("BPWR_BZIP2_1D_Local_" + accuracy + ".bp");

int mpiRank = 0;
// Number of rows
const size_t Nx = 1000;

// Number of steps
const size_t NSteps = 1;

std::vector<float> r32s(Nx);
std::vector<double> r64s(Nx);

// range 0 to 999
std::iota(r32s.begin(), r32s.end(), 0.f);
std::iota(r64s.begin(), r64s.end(), 0.);

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{Nx};

adios2::Variable<float> var_r32 =
io.DefineVariable<float>("r32", shape, start, count, adios2::ConstantDims);
adios2::Variable<double> var_r64 =
io.DefineVariable<double>("r64", shape, start, count, adios2::ConstantDims);

// add operations
adios2::Operator BZIP2Op =
adios.DefineOperator("BZIP2Compressor", adios2::ops::LosslessBZIP2);

var_r32.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});
var_r64.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();
bpWriter.Put<float>("r32", r32s.data());
bpWriter.Put<double>("r64", r64s.data());
bpWriter.EndStep();
}

bpWriter.Close();
}

{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);

unsigned int t = 0;
std::vector<float> decompressedR32s;
std::vector<double> decompressedR64s;

while (bpReader.BeginStep() == adios2::StepStatus::OK)
{
auto var_r32 = io.InquireVariable<float>("r32");
EXPECT_TRUE(var_r32);
ASSERT_EQ(var_r32.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r32.Steps(), NSteps);

auto var_r64 = io.InquireVariable<double>("r64");
EXPECT_TRUE(var_r64);
ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r64.Steps(), NSteps);
auto r32_info = bpReader.BlocksInfo(var_r32, -1);
auto r64_info = bpReader.BlocksInfo(var_r64, -1);

var_r32.SetBlockSelection(mpiRank);
var_r64.SetBlockSelection(mpiRank);

bpReader.Get(var_r32, decompressedR32s);
bpReader.Get(var_r64, decompressedR64s);
bpReader.EndStep();

for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
std::string msg = ss.str();
ASSERT_EQ(decompressedR32s[i], r32s[i]) << msg;
ASSERT_EQ(decompressedR64s[i], r64s[i]) << msg;
}
++t;
}

EXPECT_EQ(t, NSteps);

bpReader.Close();
}
}

void BZIP2Accuracy2D(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
Expand Down Expand Up @@ -830,6 +958,7 @@ class BPWriteReadBZIP2 : public ::testing::TestWithParam<std::string>
};

TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21D) { BZIP2Accuracy1D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DLocal) { BZIP2Accuracy1DLocal(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP22D) { BZIP2Accuracy2D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP23D) { BZIP2Accuracy3D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DSel) { BZIP2Accuracy1DSel(GetParam()); }
Expand Down
1 change: 1 addition & 0 deletions testing/adios2/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
#------------------------------------------------------------------------------#

gtest_add_tests_helper(ChunkV MPI_NONE "" Unit. "")
gtest_add_tests_helper(PosixTransport MPI_NONE "" Unit. "")

Loading

0 comments on commit ae245c6

Please sign in to comment.