Skip to content

Commit

Permalink
WIP: Posix transport should fail if you try to read past EOF (ornladi…
Browse files Browse the repository at this point in the history
…os#3997)

* Add test for local operator, tweak posix transport to fail on EOF when desired, add a unit test

* Disable posix test on Windows
  • Loading branch information
eisenhauer authored and vicentebolea committed Feb 6, 2024
1 parent 7a63268 commit c1367bd
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 10 deletions.
11 changes: 11 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ std::pair<double, double> BP5Reader::ReadData(adios2::transportman::TransportMan
}
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, m_IO.m_TransportsParameters[0],
/*{{"transport", "File"}},*/ false);
if (!m_WriterIsActive)
{
Params transportParameters;
transportParameters["FailOnEOF"] = "true";
FileManager.SetParameters(transportParameters, -1);
}
}
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);
Expand Down Expand Up @@ -300,6 +306,11 @@ void BP5Reader::PerformLocalGets()
m_WriterMap[m_WriterMapIndex[r2.Timestep]].RankToSubfile[r2.WriterRank]);
};

if (!m_InitialWriterActiveCheckDone)
{
CheckWriterActive();
m_InitialWriterActiveCheckDone = true;
}
// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
m_JSONProfiler.Start("DataRead");
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class BP5Reader : public BP5Engine, public Engine

Minifooter m_Minifooter;

bool m_InitialWriterActiveCheckDone = false;

void Init();
void InitParameters();
void InitTransports();
Expand Down
37 changes: 33 additions & 4 deletions source/adios2/toolkit/transport/file/FilePOSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* FileDescriptor.cpp file I/O using POSIX I/O library
* FilePOSIX.cpp file I/O using POSIX I/O library
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
*/
#include "FilePOSIX.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"

#ifdef ADIOS2_HAVE_O_DIRECT
#ifndef _GNU_SOURCE
Expand All @@ -22,7 +21,8 @@
#include <fcntl.h> // open
#include <sys/stat.h> // open, fstat
#include <sys/types.h> // open
#include <unistd.h> // write, close, ftruncate
#include <thread>
#include <unistd.h> // write, close, ftruncate

/// \cond EXCLUDE_FROM_DOXYGEN
#include <ios> //std::ios_base::failure
Expand Down Expand Up @@ -398,6 +398,7 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start)
void FilePOSIX::Read(char *buffer, size_t size, size_t start)
{
auto lf_Read = [&](char *buffer, size_t size) {
size_t backoff_ns = 20;
while (size > 0)
{
ProfilerStart("read");
Expand All @@ -417,6 +418,25 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
"Toolkit", "transport::file::FilePOSIX", "Read",
"couldn't read from file " + m_Name + " " + SysErrMsg());
}
else if (readSize == 0)
{
if (m_FailOnEOF)
{
helper::Throw<std::ios_base::failure>(
"Toolkit", "transport::file::FilePOSIX", "Read",
"Read past end of file on " + m_Name + " " + SysErrMsg());
}
else
{
// read past EOF, but we're to wait for data. Exponential backoff with a limit
// of .5 sec (500,000,000 nanosec)
std::this_thread::sleep_for(std::chrono::nanoseconds(backoff_ns));
constexpr size_t backoff_limit = 500 * 1000 * 1000;
backoff_ns *= 2;
if (backoff_ns > backoff_limit)
backoff_ns = backoff_limit;
}
}

buffer += readSize;
size -= readSize;
Expand Down Expand Up @@ -595,5 +615,14 @@ void FilePOSIX::Truncate(const size_t length)

void FilePOSIX::MkDir(const std::string &fileName) {}

void FilePOSIX::SetParameters(const Params &params)
{
// Parameters are set from config parameters if present
// Otherwise, they are set from environment if present
// Otherwise, they remain at their default value

helper::GetParameter(params, "FailOnEOF", m_FailOnEOF);
}

} // end namespace transport
} // end namespace adios2
5 changes: 3 additions & 2 deletions source/adios2/toolkit/transport/file/FilePOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* FileDescriptor.h wrapper of POSIX library functions for file I/O
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
Expand Down Expand Up @@ -68,10 +66,13 @@ class FilePOSIX : public Transport

void MkDir(const std::string &fileName) final;

void SetParameters(const Params &params) final;

private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
int m_Errno = 0;
bool m_FailOnEOF = false; // default to false for historic reasons
bool m_IsOpening = false;
std::future<int> m_OpenFuture;
bool m_DirectIO = false;
Expand Down
22 changes: 20 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.cpp
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#include "TransportMan.h"
Expand Down Expand Up @@ -410,6 +408,26 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,
itTransport->second->Read(buffer, size, start);
}

void TransportMan::SetParameters(const Params &params, const int transportIndex)
{
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
{
auto &transport = transportPair.second;

transport->SetParameters(params);
}
}
else
{
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport,
", in call to SetParameters with index " + std::to_string(transportIndex));
itTransport->second->SetParameters(params);
}
}

void TransportMan::FlushFiles(const int transportIndex)
{
if (transportIndex == -1)
Expand Down
9 changes: 7 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.h : manages a vector of transports
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_TRANSPORTMANAGER_H_
Expand Down Expand Up @@ -210,6 +208,13 @@ class TransportMan
*/
bool FileExists(const std::string &name, const Params &parameters, const bool profile);

/**
* Set Transport Paramers
* @param params
* @param transportIndex
*/
void SetParameters(const Params &params, const int transportIndex = -1);

protected:
core::IO &m_IO;
helper::Comm const &m_Comm;
Expand Down
129 changes: 129 additions & 0 deletions testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,134 @@ void BZIP2Accuracy1D(const std::string accuracy)
}
}

void BZIP2Accuracy1DLocal(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
// write a Nx 1D array
const std::string fname("BPWR_BZIP2_1D_Local_" + accuracy + ".bp");

int mpiRank = 0;
// Number of rows
const size_t Nx = 1000;

// Number of steps
const size_t NSteps = 1;

std::vector<float> r32s(Nx);
std::vector<double> r64s(Nx);

// range 0 to 999
std::iota(r32s.begin(), r32s.end(), 0.f);
std::iota(r64s.begin(), r64s.end(), 0.);

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{Nx};

adios2::Variable<float> var_r32 =
io.DefineVariable<float>("r32", shape, start, count, adios2::ConstantDims);
adios2::Variable<double> var_r64 =
io.DefineVariable<double>("r64", shape, start, count, adios2::ConstantDims);

// add operations
adios2::Operator BZIP2Op =
adios.DefineOperator("BZIP2Compressor", adios2::ops::LosslessBZIP2);

var_r32.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});
var_r64.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();
bpWriter.Put<float>("r32", r32s.data());
bpWriter.Put<double>("r64", r64s.data());
bpWriter.EndStep();
}

bpWriter.Close();
}

{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);

unsigned int t = 0;
std::vector<float> decompressedR32s;
std::vector<double> decompressedR64s;

while (bpReader.BeginStep() == adios2::StepStatus::OK)
{
auto var_r32 = io.InquireVariable<float>("r32");
EXPECT_TRUE(var_r32);
ASSERT_EQ(var_r32.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r32.Steps(), NSteps);

auto var_r64 = io.InquireVariable<double>("r64");
EXPECT_TRUE(var_r64);
ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r64.Steps(), NSteps);
auto r32_info = bpReader.BlocksInfo(var_r32, -1);
auto r64_info = bpReader.BlocksInfo(var_r64, -1);

var_r32.SetBlockSelection(mpiRank);
var_r64.SetBlockSelection(mpiRank);

bpReader.Get(var_r32, decompressedR32s);
bpReader.Get(var_r64, decompressedR64s);
bpReader.EndStep();

for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
std::string msg = ss.str();
ASSERT_EQ(decompressedR32s[i], r32s[i]) << msg;
ASSERT_EQ(decompressedR64s[i], r64s[i]) << msg;
}
++t;
}

EXPECT_EQ(t, NSteps);

bpReader.Close();
}
}

void BZIP2Accuracy2D(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
Expand Down Expand Up @@ -830,6 +958,7 @@ class BPWriteReadBZIP2 : public ::testing::TestWithParam<std::string>
};

TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21D) { BZIP2Accuracy1D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DLocal) { BZIP2Accuracy1DLocal(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP22D) { BZIP2Accuracy2D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP23D) { BZIP2Accuracy3D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DSel) { BZIP2Accuracy1DSel(GetParam()); }
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
#------------------------------------------------------------------------------#

gtest_add_tests_helper(ChunkV MPI_NONE "" Unit. "")
if(UNIX)
gtest_add_tests_helper(PosixTransport MPI_NONE "" Unit. "")
endif()

Loading

0 comments on commit c1367bd

Please sign in to comment.