Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Posix transport should fail if you try to read past EOF #3997

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ std::pair<double, double> BP5Reader::ReadData(adios2::transportman::TransportMan
}
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, m_IO.m_TransportsParameters[0],
/*{{"transport", "File"}},*/ false);
if (!m_WriterIsActive)
{
Params transportParameters;
transportParameters["FailOnEOF"] = "true";
FileManager.SetParameters(transportParameters, -1);
}
}
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);
Expand Down Expand Up @@ -300,6 +306,11 @@ void BP5Reader::PerformLocalGets()
m_WriterMap[m_WriterMapIndex[r2.Timestep]].RankToSubfile[r2.WriterRank]);
};

if (!m_InitialWriterActiveCheckDone)
{
CheckWriterActive();
m_InitialWriterActiveCheckDone = true;
}
// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
m_JSONProfiler.Start("DataRead");
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class BP5Reader : public BP5Engine, public Engine

Minifooter m_Minifooter;

bool m_InitialWriterActiveCheckDone = false;

void Init();
void InitParameters();
void InitTransports();
Expand Down
37 changes: 33 additions & 4 deletions source/adios2/toolkit/transport/file/FilePOSIX.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* FileDescriptor.cpp file I/O using POSIX I/O library
* FilePOSIX.cpp file I/O using POSIX I/O library
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to keep.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the history of the file it went through multiple revisions and it has many authors. I think we should remove the header from all the files since there is a copyright file that is covering the entire repo and includes all the authors (you are included). I'll bring it up next Tuesday and if everyone agrees I can can create a PR.

*/
#include "FilePOSIX.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"

#ifdef ADIOS2_HAVE_O_DIRECT
#ifndef _GNU_SOURCE
Expand All @@ -22,7 +21,8 @@
#include <fcntl.h> // open
#include <sys/stat.h> // open, fstat
#include <sys/types.h> // open
#include <unistd.h> // write, close, ftruncate
#include <thread>
#include <unistd.h> // write, close, ftruncate

/// \cond EXCLUDE_FROM_DOXYGEN
#include <ios> //std::ios_base::failure
Expand Down Expand Up @@ -398,6 +398,7 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start)
void FilePOSIX::Read(char *buffer, size_t size, size_t start)
{
auto lf_Read = [&](char *buffer, size_t size) {
size_t backoff_ns = 20;
while (size > 0)
{
ProfilerStart("read");
Expand All @@ -417,6 +418,25 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start)
"Toolkit", "transport::file::FilePOSIX", "Read",
"couldn't read from file " + m_Name + " " + SysErrMsg());
}
else if (readSize == 0)
{
if (m_FailOnEOF)
{
helper::Throw<std::ios_base::failure>(
"Toolkit", "transport::file::FilePOSIX", "Read",
"Read past end of file on " + m_Name + " " + SysErrMsg());
}
else
{
// read past EOF, but we're to wait for data. Exponential backoff with a limit
// of .5 sec (500,000,000 nanosec)
std::this_thread::sleep_for(std::chrono::nanoseconds(backoff_ns));
constexpr size_t backoff_limit = 500 * 1000 * 1000;
vicentebolea marked this conversation as resolved.
Show resolved Hide resolved
backoff_ns *= 2;
if (backoff_ns > backoff_limit)
backoff_ns = backoff_limit;
}
}

buffer += readSize;
size -= readSize;
Expand Down Expand Up @@ -595,5 +615,14 @@ void FilePOSIX::Truncate(const size_t length)

void FilePOSIX::MkDir(const std::string &fileName) {}

void FilePOSIX::SetParameters(const Params &params)
{
// Parameters are set from config parameters if present
// Otherwise, they are set from environment if present
// Otherwise, they remain at their default value

helper::GetParameter(params, "FailOnEOF", m_FailOnEOF);
}

} // end namespace transport
} // end namespace adios2
5 changes: 3 additions & 2 deletions source/adios2/toolkit/transport/file/FilePOSIX.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* FileDescriptor.h wrapper of POSIX library functions for file I/O
*
* Created on: Oct 6, 2016
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_
Expand Down Expand Up @@ -68,10 +66,13 @@ class FilePOSIX : public Transport

void MkDir(const std::string &fileName) final;

void SetParameters(const Params &params) final;

private:
/** POSIX file handle returned by Open */
int m_FileDescriptor = -1;
int m_Errno = 0;
bool m_FailOnEOF = false; // default to false for historic reasons
bool m_IsOpening = false;
std::future<int> m_OpenFuture;
bool m_DirectIO = false;
Expand Down
22 changes: 20 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.cpp
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#include "TransportMan.h"
Expand Down Expand Up @@ -410,6 +408,26 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,
itTransport->second->Read(buffer, size, start);
}

void TransportMan::SetParameters(const Params &params, const int transportIndex)
{
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
{
auto &transport = transportPair.second;

transport->SetParameters(params);
}
}
else
{
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport,
", in call to SetParameters with index " + std::to_string(transportIndex));
itTransport->second->SetParameters(params);
}
}

void TransportMan::FlushFiles(const int transportIndex)
{
if (transportIndex == -1)
Expand Down
9 changes: 7 additions & 2 deletions source/adios2/toolkit/transportman/TransportMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*
* TransportMan.h : manages a vector of transports
*
* Created on: May 23, 2017
* Author: William F Godoy [email protected]
*/

#ifndef ADIOS2_TOOLKIT_TRANSPORT_TRANSPORTMANAGER_H_
Expand Down Expand Up @@ -210,6 +208,13 @@ class TransportMan
*/
bool FileExists(const std::string &name, const Params &parameters, const bool profile);

/**
* Set Transport Paramers
* @param params
* @param transportIndex
*/
void SetParameters(const Params &params, const int transportIndex = -1);

protected:
core::IO &m_IO;
helper::Comm const &m_Comm;
Expand Down
129 changes: 129 additions & 0 deletions testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,134 @@ void BZIP2Accuracy1D(const std::string accuracy)
}
}

void BZIP2Accuracy1DLocal(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
// write a Nx 1D array
const std::string fname("BPWR_BZIP2_1D_Local_" + accuracy + ".bp");

int mpiRank = 0;
// Number of rows
const size_t Nx = 1000;

// Number of steps
const size_t NSteps = 1;

std::vector<float> r32s(Nx);
std::vector<double> r64s(Nx);

// range 0 to 999
std::iota(r32s.begin(), r32s.end(), 0.f);
std::iota(r64s.begin(), r64s.end(), 0.);

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{Nx};

adios2::Variable<float> var_r32 =
io.DefineVariable<float>("r32", shape, start, count, adios2::ConstantDims);
adios2::Variable<double> var_r64 =
io.DefineVariable<double>("r64", shape, start, count, adios2::ConstantDims);

// add operations
adios2::Operator BZIP2Op =
adios.DefineOperator("BZIP2Compressor", adios2::ops::LosslessBZIP2);

var_r32.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});
var_r64.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();
bpWriter.Put<float>("r32", r32s.data());
bpWriter.Put<double>("r64", r64s.data());
bpWriter.EndStep();
}

bpWriter.Close();
}

{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);

unsigned int t = 0;
std::vector<float> decompressedR32s;
std::vector<double> decompressedR64s;

while (bpReader.BeginStep() == adios2::StepStatus::OK)
{
auto var_r32 = io.InquireVariable<float>("r32");
EXPECT_TRUE(var_r32);
ASSERT_EQ(var_r32.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r32.Steps(), NSteps);

auto var_r64 = io.InquireVariable<double>("r64");
EXPECT_TRUE(var_r64);
ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r64.Steps(), NSteps);
auto r32_info = bpReader.BlocksInfo(var_r32, -1);
auto r64_info = bpReader.BlocksInfo(var_r64, -1);

var_r32.SetBlockSelection(mpiRank);
var_r64.SetBlockSelection(mpiRank);

bpReader.Get(var_r32, decompressedR32s);
bpReader.Get(var_r64, decompressedR64s);
bpReader.EndStep();

for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
std::string msg = ss.str();
ASSERT_EQ(decompressedR32s[i], r32s[i]) << msg;
ASSERT_EQ(decompressedR64s[i], r64s[i]) << msg;
}
++t;
}

EXPECT_EQ(t, NSteps);

bpReader.Close();
}
}

void BZIP2Accuracy2D(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
Expand Down Expand Up @@ -830,6 +958,7 @@ class BPWriteReadBZIP2 : public ::testing::TestWithParam<std::string>
};

TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21D) { BZIP2Accuracy1D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DLocal) { BZIP2Accuracy1DLocal(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP22D) { BZIP2Accuracy2D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP23D) { BZIP2Accuracy3D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DSel) { BZIP2Accuracy1DSel(GetParam()); }
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
#------------------------------------------------------------------------------#

gtest_add_tests_helper(ChunkV MPI_NONE "" Unit. "")
if(UNIX)
gtest_add_tests_helper(PosixTransport MPI_NONE "" Unit. "")
endif()

Loading
Loading