From c1367bdfd7b18e2eded09365c5c76ab6f2cb2aed Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Tue, 30 Jan 2024 15:35:56 -0500 Subject: [PATCH] WIP: Posix transport should fail if you try to read past EOF (#3997) * Add test for local operator, tweak posix transport to fail on EOF when desired, add a unit test * Disable posix test on Windows --- source/adios2/engine/bp5/BP5Reader.cpp | 11 ++ source/adios2/engine/bp5/BP5Reader.h | 2 + .../toolkit/transport/file/FilePOSIX.cpp | 37 ++++- .../adios2/toolkit/transport/file/FilePOSIX.h | 5 +- .../toolkit/transportman/TransportMan.cpp | 22 ++- .../toolkit/transportman/TransportMan.h | 9 +- .../bp/operations/TestBPWriteReadBZIP2.cpp | 129 ++++++++++++++++++ testing/adios2/unit/CMakeLists.txt | 3 + testing/adios2/unit/TestPosixTransport.cpp | 101 ++++++++++++++ 9 files changed, 309 insertions(+), 10 deletions(-) create mode 100644 testing/adios2/unit/TestPosixTransport.cpp diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 00209ff1f4..cb0b9aa570 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -225,6 +225,12 @@ std::pair BP5Reader::ReadData(adios2::transportman::TransportMan } FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, m_IO.m_TransportsParameters[0], /*{{"transport", "File"}},*/ false); + if (!m_WriterIsActive) + { + Params transportParameters; + transportParameters["FailOnEOF"] = "true"; + FileManager.SetParameters(transportParameters, -1); + } } TP endSubfile = NOW(); double timeSubfile = DURATION(startSubfile, endSubfile); @@ -300,6 +306,11 @@ void BP5Reader::PerformLocalGets() m_WriterMap[m_WriterMapIndex[r2.Timestep]].RankToSubfile[r2.WriterRank]); }; + if (!m_InitialWriterActiveCheckDone) + { + CheckWriterActive(); + m_InitialWriterActiveCheckDone = true; + } // TP start = NOW(); PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets"); m_JSONProfiler.Start("DataRead"); diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index 3c45b1db2c..cc73944c2f 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -113,6 +113,8 @@ class BP5Reader : public BP5Engine, public Engine Minifooter m_Minifooter; + bool m_InitialWriterActiveCheckDone = false; + void Init(); void InitParameters(); void InitTransports(); diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 20f98cb179..fedb934bf3 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -2,13 +2,12 @@ * Distributed under the OSI-approved Apache License, Version 2.0. See * accompanying file Copyright.txt for details. * - * FileDescriptor.cpp file I/O using POSIX I/O library + * FilePOSIX.cpp file I/O using POSIX I/O library * - * Created on: Oct 6, 2016 - * Author: William F Godoy godoywf@ornl.gov */ #include "FilePOSIX.h" #include "adios2/helper/adiosLog.h" +#include "adios2/helper/adiosString.h" #ifdef ADIOS2_HAVE_O_DIRECT #ifndef _GNU_SOURCE @@ -22,7 +21,8 @@ #include // open #include // open, fstat #include // open -#include // write, close, ftruncate +#include +#include // write, close, ftruncate /// \cond EXCLUDE_FROM_DOXYGEN #include //std::ios_base::failure @@ -398,6 +398,7 @@ void FilePOSIX::WriteV(const core::iovec *iov, const int iovcnt, size_t start) void FilePOSIX::Read(char *buffer, size_t size, size_t start) { auto lf_Read = [&](char *buffer, size_t size) { + size_t backoff_ns = 20; while (size > 0) { ProfilerStart("read"); @@ -417,6 +418,25 @@ void FilePOSIX::Read(char *buffer, size_t size, size_t start) "Toolkit", "transport::file::FilePOSIX", "Read", "couldn't read from file " + m_Name + " " + SysErrMsg()); } + else if (readSize == 0) + { + if (m_FailOnEOF) + { + helper::Throw( + "Toolkit", "transport::file::FilePOSIX", "Read", + "Read past end of file on " + m_Name + " " + SysErrMsg()); + } + else + { + // read past EOF, but we're to wait for data. Exponential backoff with a limit + // of .5 sec (500,000,000 nanosec) + std::this_thread::sleep_for(std::chrono::nanoseconds(backoff_ns)); + constexpr size_t backoff_limit = 500 * 1000 * 1000; + backoff_ns *= 2; + if (backoff_ns > backoff_limit) + backoff_ns = backoff_limit; + } + } buffer += readSize; size -= readSize; @@ -595,5 +615,14 @@ void FilePOSIX::Truncate(const size_t length) void FilePOSIX::MkDir(const std::string &fileName) {} +void FilePOSIX::SetParameters(const Params ¶ms) +{ + // Parameters are set from config parameters if present + // Otherwise, they are set from environment if present + // Otherwise, they remain at their default value + + helper::GetParameter(params, "FailOnEOF", m_FailOnEOF); +} + } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 394aa9ea15..667d63cd0a 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -4,8 +4,6 @@ * * FileDescriptor.h wrapper of POSIX library functions for file I/O * - * Created on: Oct 6, 2016 - * Author: William F Godoy godoywf@ornl.gov */ #ifndef ADIOS2_TOOLKIT_TRANSPORT_FILE_FILEDESCRIPTOR_H_ @@ -68,10 +66,13 @@ class FilePOSIX : public Transport void MkDir(const std::string &fileName) final; + void SetParameters(const Params ¶ms) final; + private: /** POSIX file handle returned by Open */ int m_FileDescriptor = -1; int m_Errno = 0; + bool m_FailOnEOF = false; // default to false for historic reasons bool m_IsOpening = false; std::future m_OpenFuture; bool m_DirectIO = false; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index bb7ff8666d..2e35e73f8b 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -4,8 +4,6 @@ * * TransportMan.cpp * - * Created on: May 23, 2017 - * Author: William F Godoy godoywf@ornl.gov */ #include "TransportMan.h" @@ -410,6 +408,26 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start, itTransport->second->Read(buffer, size, start); } +void TransportMan::SetParameters(const Params ¶ms, const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { + auto &transport = transportPair.second; + + transport->SetParameters(params); + } + } + else + { + auto itTransport = m_Transports.find(transportIndex); + CheckFile(itTransport, + ", in call to SetParameters with index " + std::to_string(transportIndex)); + itTransport->second->SetParameters(params); + } +} + void TransportMan::FlushFiles(const int transportIndex) { if (transportIndex == -1) diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index acf3ac1cd2..adc1d46d13 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -4,8 +4,6 @@ * * TransportMan.h : manages a vector of transports * - * Created on: May 23, 2017 - * Author: William F Godoy godoywf@ornl.gov */ #ifndef ADIOS2_TOOLKIT_TRANSPORT_TRANSPORTMANAGER_H_ @@ -210,6 +208,13 @@ class TransportMan */ bool FileExists(const std::string &name, const Params ¶meters, const bool profile); + /** + * Set Transport Paramers + * @param params + * @param transportIndex + */ + void SetParameters(const Params ¶ms, const int transportIndex = -1); + protected: core::IO &m_IO; helper::Comm const &m_Comm; diff --git a/testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp b/testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp index 4e1f7f86fb..dcbafcccf0 100644 --- a/testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp +++ b/testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp @@ -148,6 +148,134 @@ void BZIP2Accuracy1D(const std::string accuracy) } } +void BZIP2Accuracy1DLocal(const std::string accuracy) +{ + // Each process would write a 1x8 array and all processes would + // write a Nx 1D array + const std::string fname("BPWR_BZIP2_1D_Local_" + accuracy + ".bp"); + + int mpiRank = 0; + // Number of rows + const size_t Nx = 1000; + + // Number of steps + const size_t NSteps = 1; + + std::vector r32s(Nx); + std::vector r64s(Nx); + + // range 0 to 999 + std::iota(r32s.begin(), r32s.end(), 0.f); + std::iota(r64s.begin(), r64s.end(), 0.); + +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); +#endif + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD); +#else + adios2::ADIOS adios; +#endif + { + adios2::IO io = adios.DeclareIO("TestIO"); + + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + else + { + // Create the BP Engine + io.SetEngine("BPFile"); + } + + const adios2::Dims shape{}; + const adios2::Dims start{}; + const adios2::Dims count{Nx}; + + adios2::Variable var_r32 = + io.DefineVariable("r32", shape, start, count, adios2::ConstantDims); + adios2::Variable var_r64 = + io.DefineVariable("r64", shape, start, count, adios2::ConstantDims); + + // add operations + adios2::Operator BZIP2Op = + adios.DefineOperator("BZIP2Compressor", adios2::ops::LosslessBZIP2); + + var_r32.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}}); + var_r64.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}}); + + adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); + + for (size_t step = 0; step < NSteps; ++step) + { + bpWriter.BeginStep(); + bpWriter.Put("r32", r32s.data()); + bpWriter.Put("r64", r64s.data()); + bpWriter.EndStep(); + } + + bpWriter.Close(); + } + + { + adios2::IO io = adios.DeclareIO("ReadIO"); + + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + else + { + // Create the BP Engine + io.SetEngine("BPFile"); + } + + adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read); + + unsigned int t = 0; + std::vector decompressedR32s; + std::vector decompressedR64s; + + while (bpReader.BeginStep() == adios2::StepStatus::OK) + { + auto var_r32 = io.InquireVariable("r32"); + EXPECT_TRUE(var_r32); + ASSERT_EQ(var_r32.ShapeID(), adios2::ShapeID::LocalArray); + ASSERT_EQ(var_r32.Steps(), NSteps); + + auto var_r64 = io.InquireVariable("r64"); + EXPECT_TRUE(var_r64); + ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::LocalArray); + ASSERT_EQ(var_r64.Steps(), NSteps); + auto r32_info = bpReader.BlocksInfo(var_r32, -1); + auto r64_info = bpReader.BlocksInfo(var_r64, -1); + + var_r32.SetBlockSelection(mpiRank); + var_r64.SetBlockSelection(mpiRank); + + bpReader.Get(var_r32, decompressedR32s); + bpReader.Get(var_r64, decompressedR64s); + bpReader.EndStep(); + + for (size_t i = 0; i < Nx; ++i) + { + std::stringstream ss; + ss << "t=" << t << " i=" << i << " rank=" << mpiRank; + std::string msg = ss.str(); + ASSERT_EQ(decompressedR32s[i], r32s[i]) << msg; + ASSERT_EQ(decompressedR64s[i], r64s[i]) << msg; + } + ++t; + } + + EXPECT_EQ(t, NSteps); + + bpReader.Close(); + } +} + void BZIP2Accuracy2D(const std::string accuracy) { // Each process would write a 1x8 array and all processes would @@ -830,6 +958,7 @@ class BPWriteReadBZIP2 : public ::testing::TestWithParam }; TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21D) { BZIP2Accuracy1D(GetParam()); } +TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DLocal) { BZIP2Accuracy1DLocal(GetParam()); } TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP22D) { BZIP2Accuracy2D(GetParam()); } TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP23D) { BZIP2Accuracy3D(GetParam()); } TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DSel) { BZIP2Accuracy1DSel(GetParam()); } diff --git a/testing/adios2/unit/CMakeLists.txt b/testing/adios2/unit/CMakeLists.txt index de65ea8510..c38e0d300f 100644 --- a/testing/adios2/unit/CMakeLists.txt +++ b/testing/adios2/unit/CMakeLists.txt @@ -4,4 +4,7 @@ #------------------------------------------------------------------------------# gtest_add_tests_helper(ChunkV MPI_NONE "" Unit. "") +if(UNIX) + gtest_add_tests_helper(PosixTransport MPI_NONE "" Unit. "") +endif() diff --git a/testing/adios2/unit/TestPosixTransport.cpp b/testing/adios2/unit/TestPosixTransport.cpp new file mode 100644 index 0000000000..c2d565087e --- /dev/null +++ b/testing/adios2/unit/TestPosixTransport.cpp @@ -0,0 +1,101 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "adios2/helper/adiosLog.h" +#include "adios2/helper/adiosString.h" +#include +#include +#include +#include + +#include + +namespace adios2 +{ +namespace format +{ + +TEST(FileTransport, FailOnEOF) +{ + { + std::vector b(256, 0xef); + helper::Comm comm = helper::CommDummy(); + std::unique_ptr w = + std::unique_ptr(new transport::FilePOSIX(comm)); + + w->Open("FailOnEOF", Mode::Write); + w->Write((char *)b.data(), b.size()); + w->Close(); + } + { + std::vector b(256); + helper::Comm comm = helper::CommDummy(); + std::unique_ptr r = + std::unique_ptr(new transport::FilePOSIX(comm)); + + r->Open("FailOnEOF", Mode::Read); + Params p = {{"FailOnEOF", "true"}}; + r->SetParameters(p); + EXPECT_THROW(r->Read((char *)b.data(), b.size() * 2), std::ios_base::failure); + r->Close(); + } +} + +TEST(FileTransport, WaitForData) +{ + constexpr int size = 256; + std::vector b(size, 0xef); + helper::Comm comm = helper::CommDummy(); + std::unique_ptr w = + std::unique_ptr(new transport::FilePOSIX(comm)); + + w->Open("FailOnEOF", Mode::Write); + w->Write((char *)b.data(), b.size()); + { + auto lf_WriteMore = [&](const transport::FilePOSIX *) { + std::vector b2(size, 0xfe); + std::this_thread::sleep_for(std::chrono::seconds(2)); + w->Write((char *)b2.data(), size); + std::cout << "Wrote data" << std::endl; + }; + + // write more data soon + auto h = std::async(std::launch::async, lf_WriteMore, w.get()); + + std::vector b(size * 2); + helper::Comm comm = helper::CommDummy(); + std::unique_ptr r = + std::unique_ptr(new transport::FilePOSIX(comm)); + + r->Open("FailOnEOF", Mode::Read); + r->Read((char *)b.data(), size * 2); + ASSERT_EQ(b[0], 0xef); + ASSERT_EQ(b[size], 0xfe); + r->Close(); + } + w->Close(); +} +} +} + +int main(int argc, char **argv) +{ + + int result; + ::testing::InitGoogleTest(&argc, argv); + result = RUN_ALL_TESTS(); + + return result; +}