From ea73b47e60707d3918c791d24e17832b6b2505dc Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 2 Oct 2020 11:06:08 -0400 Subject: [PATCH 01/12] Add BP3/BP4 options: NumAggregators (same as SubStreams) and AggregationRatio. Default aggregation is now one per compute node (NumAggregators=0) --- docs/user_guide/source/engines/bp3.rst | 9 +++-- docs/user_guide/source/engines/bp4.rst | 25 ++++++------ source/adios2/engine/bp3/BP3Writer.cpp | 2 + source/adios2/engine/bp4/BP4Writer.cpp | 2 + source/adios2/helper/adiosComm.cpp | 5 +++ source/adios2/helper/adiosComm.h | 9 +++++ source/adios2/helper/adiosCommDummy.cpp | 7 ++++ source/adios2/helper/adiosCommMPI.cpp | 13 +++++++ .../toolkit/aggregator/mpi/MPIAggregator.cpp | 39 +++++++++++++++++++ .../toolkit/aggregator/mpi/MPIAggregator.h | 4 ++ .../toolkit/aggregator/mpi/MPIChain.cpp | 12 +++++- source/adios2/toolkit/format/bp/BPBase.cpp | 39 +++++++++++-------- source/adios2/toolkit/format/bp/BPBase.h | 8 ++++ .../engine/bp/TestBPWriteAggregateRead.cpp | 11 +++++- 14 files changed, 151 insertions(+), 34 deletions(-) diff --git a/docs/user_guide/source/engines/bp3.rst b/docs/user_guide/source/engines/bp3.rst index 7534f1f26a..4f46c7b44e 100644 --- a/docs/user_guide/source/engines/bp3.rst +++ b/docs/user_guide/source/engines/bp3.rst @@ -48,9 +48,11 @@ This engine allows the user to fine tune the buffering operations through the fo 8. **FlushStepsCount**: users can select how often to produce the more expensive collective metadata file in terms of steps: default is 1. Increase to reduce adios2 collective operations footprint, with the trade-off of reducing checkpoint frequency. Buffer size will increase until first steps count if ``MaxBufferSize`` is not set. -9. **SubStreams**: (MPI-only) users can select how many sub-streams (``M`` sub-files) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. If Substream is out of bounds it will pick either 1 (``SubStreams`` < ``1 -> N-to-1``) or ``N`` ((``SubStreams`` > ``N -> N-to-N``) and ADIOS2 will issue a WARNING message. Use for performance tuning. +9. **NumAggregators** (or **SubStreams**): Users can select how many sub-files (``M``) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. Default is 0, which will let adios2 to group processes per shared-memory-access (i.e. one per compute node) and use one process per node as an aggregator. If NumAggregators is larger than the number of processes then it will be set to the number of processes. -10. **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required for using local disk/SSD/NVMe in a cluster. +10. **AggregatorRatio**: An alternative option to NumAggregators to pick every Nth process as aggregator. An integer divider of the number of processes is required, otherwise a runtime exception is thrown. + +11. **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required for using local disk/SSD/NVMe in a cluster. ==================== ===================== =========================================================== **Key** **Value Format** **Default** and Examples @@ -63,7 +65,8 @@ This engine allows the user to fine tune the buffering operations through the fo MaxBufferSize float+units >= 16Kb **at EndStep**, 10Mb, 0.5Gb BufferGrowthFactor float > 1 **1.05**, 1.01, 1.5, 2 FlushStepsCount integer > 1 **1**, 5, 1000, 50000 - SubStreams integer >= 1 **MPI_Size (N-to-N)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1 + NumAggregators integer >= 1 **0 (one file per compute node)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1 + AggregatorRatio integer >= 1 not used unless set, ``MPI_Size``/N must be an integer value Node-Local string On/Off **Off**, On ==================== ===================== =========================================================== diff --git a/docs/user_guide/source/engines/bp4.rst b/docs/user_guide/source/engines/bp4.rst index 7f1b1fe1f8..9391476123 100644 --- a/docs/user_guide/source/engines/bp4.rst +++ b/docs/user_guide/source/engines/bp4.rst @@ -58,25 +58,27 @@ This engine allows the user to fine tune the buffering operations through the fo 7. **FlushStepsCount**: users can select how often to produce the more expensive collective metadata file in terms of steps: default is 1. Increase to reduce adios2 collective operations footprint, with the trade-off of reducing checkpoint frequency. Buffer size will increase until first steps count if ``MaxBufferSize`` is not set. -8. **SubStreams**: (MPI-only) users can select how many sub-streams (``M`` sub-files) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. If Substream is out of bounds it will pick either 1 (``SubStreams`` < ``1 -> N-to-1``) or ``N`` ((``SubStreams`` > ``N -> N-to-N``) and ADIOS2 will issue a WARNING message. Use for performance tuning. +8. **NumAggregators** (or **SubStreams**): Users can select how many sub-files (``M``) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. Default is 0, which will let adios2 to group processes per shared-memory-access (i.e. one per compute node) and use one process per node as an aggregator. If NumAggregators is larger than the number of processes then it will be set to the number of processes. -9. **OpenTimeoutSecs**: (Streaming mode) Reader may want to wait for the creation of the file in ``io.Open()``. By default the Open() function returns with an error if file is not found. +9. **AggregatorRatio**: An alternative option to NumAggregators to pick every Nth process as aggregator. An integer divider of the number of processes is required, otherwise a runtime exception is thrown. -10. **BeginStepPollingFrequencySecs**: (Streaming mode) Reader can set how frequently to check the file (and file system) for new steps. Default is 1 seconds which may be stressful for the file system and unnecessary for the application. +10. **OpenTimeoutSecs**: (Streaming mode) Reader may want to wait for the creation of the file in ``io.Open()``. By default the Open() function returns with an error if file is not found. -11. **StatsLevel**: Turn on/off calculating statistics for every variable (Min/Max). Default is On. It has some cost to generate this metadata so it can be turned off if there is no need for this information. +11. **BeginStepPollingFrequencySecs**: (Streaming mode) Reader can set how frequently to check the file (and file system) for new steps. Default is 1 seconds which may be stressful for the file system and unnecessary for the application. -12. **StatsBlockSize**: Calculate Min/Max for a given size of each process output. Default is one Min/Max per writer. More fine-grained min/max can be useful for querying the data. +12. **StatsLevel**: Turn on/off calculating statistics for every variable (Min/Max). Default is On. It has some cost to generate this metadata so it can be turned off if there is no need for this information. -13. **NodeLocal** or **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required when writing to local disk/SSD/NVMe in a cluster. Note: the BurstBuffer* parameters are newer and should be used for using the local storage as temporary instead of this parameter. +13. **StatsBlockSize**: Calculate Min/Max for a given size of each process output. Default is one Min/Max per writer. More fine-grained min/max can be useful for querying the data. -14. **BurstBufferPath**: Redirect output file to another location and drain it to the original target location in an asynchronous thread. It requires to be able to launch one thread per aggregator (see SubStreams) on the system. This feature can be used on machines that have local NVMe/SSDs on each node to accelerate the output writing speed. On Summit at OLCF, use "/mnt/bb/" for the path where is your user account name. Temporary files on the accelerated storage will be automatically deleted after the application closes the output and ADIOS drains all data to the file system, unless draining is turned off (see the next parameter). Note: at this time, this feature cannot be used to append data to an existing dataset on the target system. +14. **NodeLocal** or **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required when writing to local disk/SSD/NVMe in a cluster. Note: the BurstBuffer* parameters are newer and should be used for using the local storage as temporary instead of this parameter. -15. **BurstBufferDrain**: To write only to the accelerated storage but to not drain it to the target file system, set this flag to false. Data will NOT be deleted from the accelerated storage on close. By default, setting the BurstBufferPath will turn on draining. +15. **BurstBufferPath**: Redirect output file to another location and drain it to the original target location in an asynchronous thread. It requires to be able to launch one thread per aggregator (see SubStreams) on the system. This feature can be used on machines that have local NVMe/SSDs on each node to accelerate the output writing speed. On Summit at OLCF, use "/mnt/bb/" for the path where is your user account name. Temporary files on the accelerated storage will be automatically deleted after the application closes the output and ADIOS drains all data to the file system, unless draining is turned off (see the next parameter). Note: at this time, this feature cannot be used to append data to an existing dataset on the target system. -16. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc). +16. **BurstBufferDrain**: To write only to the accelerated storage but to not drain it to the target file system, set this flag to false. Data will NOT be deleted from the accelerated storage on close. By default, setting the BurstBufferPath will turn on draining. -17. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open(). +17. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc). + +18. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open(). ============================== ===================== =========================================================== **Key** **Value Format** **Default** and Examples @@ -88,7 +90,8 @@ This engine allows the user to fine tune the buffering operations through the fo MaxBufferSize float+units >= 16Kb **at EndStep**, 10Mb, 0.5Gb BufferGrowthFactor float > 1 **1.05**, 1.01, 1.5, 2 FlushStepsCount integer > 1 **1**, 5, 1000, 50000 - SubStreams integer >= 1 **MPI_Size (N-to-N)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1 + NumAggregators integer >= 1 **0 (one file per compute node)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1 + AggregatorRatio integer >= 1 not used unless set, ``MPI_Size``/N must be an integer value OpenTimeoutSecs float **0**, ``10.0``, ``5`` BeginStepPollingFrequencySecs float **1**, ``10.0`` StatsLevel integer, 0 or 1 **1**, ``0`` diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index 9e869c4b64..fda562c5ad 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -118,6 +118,8 @@ void BP3Writer::Flush(const int transportIndex) void BP3Writer::Init() { InitParameters(); + m_BP3Serializer.m_Aggregator.Init( + m_BP3Serializer.m_Parameters.NumAggregators, m_Comm); InitTransports(); InitBPBuffer(); } diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index a50dafb0e8..dec597d598 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -123,6 +123,8 @@ void BP4Writer::Flush(const int transportIndex) void BP4Writer::Init() { InitParameters(); + m_BP4Serializer.m_Aggregator.Init( + m_BP4Serializer.m_Parameters.NumAggregators, m_Comm); InitTransports(); InitBPBuffer(); } diff --git a/source/adios2/helper/adiosComm.cpp b/source/adios2/helper/adiosComm.cpp index 01cd58bb4d..42ce4d7fce 100644 --- a/source/adios2/helper/adiosComm.cpp +++ b/source/adios2/helper/adiosComm.cpp @@ -70,6 +70,11 @@ Comm Comm::World(const std::string &hint) const return Comm(m_Impl->World(hint)); } +Comm Comm::GroupByShm(const std::string &hint) const +{ + return Comm(m_Impl->GroupByShm(hint)); +} + int Comm::Rank() const { return m_Impl->Rank(); } int Comm::Size() const { return m_Impl->Size(); } diff --git a/source/adios2/helper/adiosComm.h b/source/adios2/helper/adiosComm.h index 750f54dcc4..f8e2c7cf9e 100644 --- a/source/adios2/helper/adiosComm.h +++ b/source/adios2/helper/adiosComm.h @@ -118,6 +118,13 @@ class Comm */ Comm World(const std::string &hint = std::string()) const; + /** + * @brief Create a communicator for processes that can share memory + * @param hint Description of std::runtime_error exception on error. + * Useful for grouping processes per compute node + */ + Comm GroupByShm(const std::string &hint = std::string()) const; + int Rank() const; int Size() const; @@ -368,6 +375,8 @@ class CommImpl virtual std::unique_ptr Split(int color, int key, const std::string &hint) const = 0; virtual std::unique_ptr World(const std::string &hint) const = 0; + virtual std::unique_ptr + GroupByShm(const std::string &hint) const = 0; virtual int Rank() const = 0; virtual int Size() const = 0; virtual bool IsMPI() const = 0; diff --git a/source/adios2/helper/adiosCommDummy.cpp b/source/adios2/helper/adiosCommDummy.cpp index d9c431a096..eddf9827d5 100644 --- a/source/adios2/helper/adiosCommDummy.cpp +++ b/source/adios2/helper/adiosCommDummy.cpp @@ -49,6 +49,8 @@ class CommImplDummy : public CommImpl std::unique_ptr Split(int color, int key, const std::string &hint) const override; std::unique_ptr World(const std::string &hint) const override; + virtual std::unique_ptr + GroupByShm(const std::string &hint) const override; int Rank() const override; int Size() const override; @@ -124,6 +126,11 @@ std::unique_ptr CommImplDummy::World(const std::string &) const return std::unique_ptr(new CommImplDummy()); } +std::unique_ptr CommImplDummy::GroupByShm(const std::string &) const +{ + return std::unique_ptr(new CommImplDummy()); +} + int CommImplDummy::Rank() const { return 0; } int CommImplDummy::Size() const { return 1; } diff --git a/source/adios2/helper/adiosCommMPI.cpp b/source/adios2/helper/adiosCommMPI.cpp index 07f33d9abf..eebc4e7251 100644 --- a/source/adios2/helper/adiosCommMPI.cpp +++ b/source/adios2/helper/adiosCommMPI.cpp @@ -122,6 +122,8 @@ class CommImplMPI : public CommImpl std::unique_ptr Split(int color, int key, const std::string &hint) const override; std::unique_ptr World(const std::string &hint) const override; + virtual std::unique_ptr + GroupByShm(const std::string &hint) const override; int Rank() const override; int Size() const override; @@ -224,6 +226,17 @@ std::unique_ptr CommImplMPI::World(const std::string &) const return std::unique_ptr(new CommImplMPI(MPI_COMM_WORLD)); } +std::unique_ptr CommImplMPI::GroupByShm(const std::string &hint) const +{ + MPI_Comm nodeComm; + MPI_Info info; + MPI_Info_create(&info); + CheckMPIReturn(MPI_Comm_split_type(m_MPIComm, MPI_COMM_TYPE_SHARED, 0, info, + &nodeComm), + hint); + return std::unique_ptr(new CommImplMPI(nodeComm)); +} + int CommImplMPI::Rank() const { int rank; diff --git a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp index 8743a6b245..5d00e9741b 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp @@ -96,6 +96,45 @@ void MPIAggregator::InitComm(const size_t subStreams, m_SubStreams = subStreams; } +void MPIAggregator::InitCommOnePerNode(helper::Comm const &parentComm) +{ + m_Comm = parentComm.GroupByShm("creating default aggregator setup at Open"); + m_Rank = m_Comm.Rank(); + m_Size = m_Comm.Size(); + + if (m_Rank != 0) + { + m_IsConsumer = false; + } + + m_IsActive = true; + + /* Determine number of aggregators (= nodes) */ + + /* + * Communicators connecting rank N of each node + * We are only interested in the chain of rank 0s + */ + int color = (m_Rank ? 1 : 0); + helper::Comm onePerNodeComm = + parentComm.Split(color, 0, "creating default aggregator setup at Open"); + + if (!m_Rank) + { + m_SubStreamIndex = static_cast(onePerNodeComm.Rank()); + m_SubStreams = static_cast(onePerNodeComm.Size()); + } + m_SubStreams = m_Comm.BroadcastValue(m_SubStreams, 0); + m_SubStreamIndex = m_Comm.BroadcastValue(m_SubStreamIndex, 0); + + /* Identify parent rank of aggregator process within each group */ + if (!m_Rank) + { + m_ConsumerRank = static_cast(parentComm.Rank()); + } + m_ConsumerRank = m_Comm.BroadcastValue(m_ConsumerRank, 0); +} + void MPIAggregator::HandshakeRank(const int rank) { int message = -1; diff --git a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h index d15d9751b5..73c4bfb2f3 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h +++ b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h @@ -98,6 +98,10 @@ class MPIAggregator * the last rank) */ void InitComm(const size_t subStreams, helper::Comm const &parentComm); + /** A default init function to select one process per node to be aggregator + */ + void InitCommOnePerNode(helper::Comm const &parentComm); + /** handshakes a single rank with the rest of the m_Comm ranks */ void HandshakeRank(const int rank = 0); diff --git a/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp b/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp index 4db96961bb..eea9c50f15 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp @@ -20,8 +20,16 @@ MPIChain::MPIChain() : MPIAggregator() {} void MPIChain::Init(const size_t subStreams, helper::Comm const &parentComm) { - InitComm(subStreams, parentComm); - HandshakeRank(0); + if (subStreams > 0) + { + InitComm(subStreams, parentComm); + HandshakeRank(0); + } + else + { + InitCommOnePerNode(parentComm); + } + HandshakeLinks(); // add a receiving buffer except for the last rank (only sends) diff --git a/source/adios2/toolkit/format/bp/BPBase.cpp b/source/adios2/toolkit/format/bp/BPBase.cpp index 5bcc7593ec..b586186ac5 100644 --- a/source/adios2/toolkit/format/bp/BPBase.cpp +++ b/source/adios2/toolkit/format/bp/BPBase.cpp @@ -49,8 +49,6 @@ void BPBase::Init(const Params ¶meters, const std::string hint, struct Parameters parsedParameters; bool profilePresent = false; bool profileValue; - bool subStreamsPresent = false; - int32_t subStreamsValue; for (const auto ¶meter : parameters) { const std::string key = helper::LowerCase(parameter.first); @@ -155,24 +153,37 @@ void BPBase::Init(const Params ¶meters, const std::string hint, parsedParameters.FlushStepsCount = helper::StringToSizeT( value, " in Parameter key=FlushStepsCount " + hint); } - else if (key == "substreams") + else if (key == "substreams" || key == "numaggregators") { - int subStreams = static_cast(helper::StringTo( - value, " in Parameter key=SubStreams " + hint)); + parsedParameters.NumAggregators = + static_cast(helper::StringTo( + value, " in Parameter key=SubStreams " + hint)); - if (subStreams < 1) + if (parsedParameters.NumAggregators > m_SizeMPI) { - subStreams = 1; + parsedParameters.NumAggregators = m_SizeMPI; } - else if (subStreams > m_SizeMPI) + } + else if (key == "aggregatorratio") + { + unsigned int ratio = static_cast(helper::StringTo( + value, " in Parameter key=AggregatorRatio " + hint)); + parsedParameters.NumAggregators = m_SizeMPI / ratio; + if ((m_SizeMPI % ratio)) { - subStreams = m_SizeMPI; + throw std::invalid_argument( + "ERROR: value for Parameter key=AggregatorRatio must be " + "an integer divisor of the number of processes (" + + std::to_string(m_SizeMPI) + hint); } - if (subStreams < m_SizeMPI) + if (parsedParameters.NumAggregators < 1) { - subStreamsPresent = true; - subStreamsValue = subStreams; + parsedParameters.NumAggregators = 1; + } + else if (parsedParameters.NumAggregators > m_SizeMPI) + { + parsedParameters.NumAggregators = m_SizeMPI; } } else if (key == "node-local" || key == "nodelocal") @@ -232,10 +243,6 @@ void BPBase::Init(const Params ¶meters, const std::string hint, m_Profiler.m_IsActive = profileValue; } m_Parameters = parsedParameters; - if (subStreamsPresent) - { - m_Aggregator.Init(subStreamsValue, m_Comm); - } } // set timers if active if (m_Profiler.m_IsActive) diff --git a/source/adios2/toolkit/format/bp/BPBase.h b/source/adios2/toolkit/format/bp/BPBase.h index 548fc66b95..45e57c08b6 100644 --- a/source/adios2/toolkit/format/bp/BPBase.h +++ b/source/adios2/toolkit/format/bp/BPBase.h @@ -215,6 +215,13 @@ class BPBase * instead of parsing everything available */ bool StreamReader = false; + + /** Number of aggregators. + * Must be a value between 1 and number of MPI ranks + * 0 as default means that the engine must define the number of + * aggregators + */ + unsigned int NumAggregators = 0; }; /** Return type of the ResizeBuffer function. */ @@ -299,6 +306,7 @@ class BPBase */ void Init(const Params ¶meters, const std::string hint, const std::string engineType = ""); + /****************** NEED to check if some are virtual */ /** diff --git a/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp b/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp index 494bdb9e3f..e4a89cc204 100644 --- a/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp @@ -40,7 +40,7 @@ void WriteAggRead1D8(const std::string substreams) if (mpiSize > 1) { - io.SetParameter("Substreams", substreams); + io.SetParameter("NumAggregators", substreams); } // Declare 1D variables (NumOfProcesses * Nx) @@ -69,6 +69,13 @@ void WriteAggRead1D8(const std::string substreams) auto var_r32 = io.DefineVariable("r32", shape, start, count); auto var_r64 = io.DefineVariable("r64", shape, start, count); + + // add operations + adios2::Operator ZFPOp = + adios.DefineOperator("ZFPCompressor", adios2::ops::LossyZFP); + + var_r32.AddOperation(ZFPOp, {{adios2::ops::zfp::key::rate, "32"}}); + var_r64.AddOperation(ZFPOp, {{adios2::ops::zfp::key::rate, "64"}}); } if (!engineName.empty()) @@ -954,7 +961,7 @@ TEST_P(BPWriteAggregateReadTest, ADIOS2BPWriteAggregateRead2D4x2) } INSTANTIATE_TEST_SUITE_P(Substreams, BPWriteAggregateReadTest, - ::testing::Values("1", "2", "3", "4", "5")); + ::testing::Values("1", "2", "3", "4", "5", "0")); int main(int argc, char **argv) { From 8b3390be54655d8345d1d19fa62713b220ae500f Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 2 Oct 2020 12:38:43 -0400 Subject: [PATCH 02/12] remove zfp operator from test that was added accidentally for some manual testing --- testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp b/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp index e4a89cc204..eb9ba1f9d6 100644 --- a/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAggregateRead.cpp @@ -70,12 +70,13 @@ void WriteAggRead1D8(const std::string substreams) auto var_r64 = io.DefineVariable("r64", shape, start, count); - // add operations + /* add operations adios2::Operator ZFPOp = adios.DefineOperator("ZFPCompressor", adios2::ops::LossyZFP); var_r32.AddOperation(ZFPOp, {{adios2::ops::zfp::key::rate, "32"}}); var_r64.AddOperation(ZFPOp, {{adios2::ops::zfp::key::rate, "64"}}); + */ } if (!engineName.empty()) From ab8353a98aa3892f2ce3600d3f8b3854a28c034a Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 2 Oct 2020 14:36:37 -0400 Subject: [PATCH 03/12] Fix sign errors in handling integer parameters --- .../toolkit/aggregator/mpi/MPIAggregator.cpp | 4 +- source/adios2/toolkit/format/bp/BPBase.cpp | 51 +++++++++++-------- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp index 5d00e9741b..7df033f849 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp @@ -130,9 +130,9 @@ void MPIAggregator::InitCommOnePerNode(helper::Comm const &parentComm) /* Identify parent rank of aggregator process within each group */ if (!m_Rank) { - m_ConsumerRank = static_cast(parentComm.Rank()); + m_ConsumerRank = parentComm.Rank(); } - m_ConsumerRank = m_Comm.BroadcastValue(m_ConsumerRank, 0); + m_ConsumerRank = m_Comm.BroadcastValue(m_ConsumerRank, 0); } void MPIAggregator::HandshakeRank(const int rank) diff --git a/source/adios2/toolkit/format/bp/BPBase.cpp b/source/adios2/toolkit/format/bp/BPBase.cpp index b586186ac5..b4788e725c 100644 --- a/source/adios2/toolkit/format/bp/BPBase.cpp +++ b/source/adios2/toolkit/format/bp/BPBase.cpp @@ -155,35 +155,44 @@ void BPBase::Init(const Params ¶meters, const std::string hint, } else if (key == "substreams" || key == "numaggregators") { - parsedParameters.NumAggregators = - static_cast(helper::StringTo( - value, " in Parameter key=SubStreams " + hint)); + int n = static_cast(helper::StringTo( + value, " in Parameter key=" + key + " " + hint)); - if (parsedParameters.NumAggregators > m_SizeMPI) + if (n < 0) { - parsedParameters.NumAggregators = m_SizeMPI; + n = 0; } + if (n > m_SizeMPI) + { + n = m_SizeMPI; + } + parsedParameters.NumAggregators = n; } else if (key == "aggregatorratio") { - unsigned int ratio = static_cast(helper::StringTo( + int ratio = static_cast(helper::StringTo( value, " in Parameter key=AggregatorRatio " + hint)); - parsedParameters.NumAggregators = m_SizeMPI / ratio; - if ((m_SizeMPI % ratio)) - { - throw std::invalid_argument( - "ERROR: value for Parameter key=AggregatorRatio must be " - "an integer divisor of the number of processes (" + - std::to_string(m_SizeMPI) + hint); - } - - if (parsedParameters.NumAggregators < 1) - { - parsedParameters.NumAggregators = 1; - } - else if (parsedParameters.NumAggregators > m_SizeMPI) + if (ratio > 0) { - parsedParameters.NumAggregators = m_SizeMPI; + int n = m_SizeMPI / ratio; + if ((m_SizeMPI % ratio)) + { + throw std::invalid_argument( + "ERROR: value for Parameter key=AggregatorRatio=" + + std::to_string(ratio) + " must be " + + "an integer divisor of the number of processes=" + + std::to_string(m_SizeMPI) + " " + hint); + } + + if (n < 1) + { + n = 1; + } + else if (n > m_SizeMPI) + { + n = m_SizeMPI; + } + parsedParameters.NumAggregators = n; } } else if (key == "node-local" || key == "nodelocal") From 4b8722cf104dc67b21e4ecbd5e15b61b2e928426 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 05:07:53 -0400 Subject: [PATCH 04/12] fix BP3 specific test for reading each subfile alone to make sure it is actually generating the subfiles --- .../adios2/engine/bp/TestBPWriteReadLocalVariables.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp index af211fb376..0d53c817e7 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp @@ -1753,8 +1753,10 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal1DSubFile) const std::string fname("BPWriteReadLocal1DSubFile.bp"); int mpiRank = 0; + int mpiSize = 1; #if ADIOS2_USE_MPI MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); #endif const size_t Nx0 = 3 + static_cast(mpiRank); @@ -1770,6 +1772,12 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal1DSubFile) std::iota(data[0].begin(), data[0].end(), startBlock0); std::iota(data[1].begin(), data[1].end(), startBlock1); + /* This is a test for only BP3 */ + if (engineName != "BP3") + { + return; + } + #if ADIOS2_USE_MPI adios2::ADIOS adios(MPI_COMM_WORLD); #else @@ -1778,6 +1786,7 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal1DSubFile) { adios2::IO io = adios.DeclareIO("TestIO"); io.SetEngine("BP3"); + io.SetParameter("AggregatorRatio", "1"); const adios2::Dims shape{}; const adios2::Dims start{}; const adios2::Dims count{Nx0}; From e0f2567c006a9b77c6a53e746e6386ab31c81e11 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 06:17:15 -0400 Subject: [PATCH 05/12] set aggregation to N-to-N for tests that (wrongly) assume that a block's writerID equals to its writer's rank (it is not, it is the subfile index) --- testing/adios2/engine/bp/TestBPWriteReadBlockInfo.cpp | 2 ++ testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp | 1 + 2 files changed, 3 insertions(+) diff --git a/testing/adios2/engine/bp/TestBPWriteReadBlockInfo.cpp b/testing/adios2/engine/bp/TestBPWriteReadBlockInfo.cpp index 81db013ac2..d957915a60 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadBlockInfo.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadBlockInfo.cpp @@ -137,6 +137,7 @@ TEST_F(BPWriteReadBlockInfo, BPWriteReadBlockInfo1D8) // Create the BP Engine io.SetEngine("BPFile"); } + io.SetParameter("AggregatorRatio", "1"); adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); @@ -459,6 +460,7 @@ TEST_F(BPWriteReadBlockInfo, BPWriteReadBlockInfo2D2x4) // Create the BP Engine io.SetEngine("BPFile"); } + io.SetParameter("AggregatorRatio", "1"); adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); diff --git a/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp b/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp index f16803019a..e8f679ba7c 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadMultiblock.cpp @@ -1286,6 +1286,7 @@ TEST_F(BPWriteReadMultiblockTest, ADIOS2BPWriteReadMultiblock2D4x2) } io.AddTransport("file"); + io.SetParameter("AggregatorRatio", "1"); adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); From 6e310cdc92277b39350fe4b90528301dc9ea147d Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 06:36:51 -0400 Subject: [PATCH 06/12] Set aggregator ratio to 1 for the append test because of the existing bug in #2482 --- testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp index 32731b4695..ef7288f05a 100644 --- a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp @@ -171,6 +171,7 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) io.SetEngine("BP4"); } io.AddTransport("file"); + io.SetParameter("AggregatorRatio", "1"); adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); @@ -322,6 +323,7 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendRead2D2x4) io.SetEngine("BP4"); } io.AddTransport("file"); + io.SetParameter("AggregatorRatio", "1"); adios2::Engine bpAppender = io.Open(fname, adios2::Mode::Append); From 7cfd5fd051c13e0f665e302c91db09a15935529f Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 09:34:18 -0400 Subject: [PATCH 07/12] Set aggregator ratio to 1 for thei time-aggregation test because of the existing bug in #2483 --- testing/adios2/engine/bp/TestBPTimeAggregation.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/adios2/engine/bp/TestBPTimeAggregation.cpp b/testing/adios2/engine/bp/TestBPTimeAggregation.cpp index 8c66874f65..b0ebe23b36 100644 --- a/testing/adios2/engine/bp/TestBPTimeAggregation.cpp +++ b/testing/adios2/engine/bp/TestBPTimeAggregation.cpp @@ -90,6 +90,7 @@ void TimeAggregation1D8(const std::string flushstepscount) } io.AddTransport("file"); + io.SetParameter("AggregatorRatio", "1"); SmallTestData m_TestData; @@ -415,6 +416,7 @@ void TimeAggregation2D4x2(const std::string flushstepscount) } io.AddTransport("file"); + io.SetParameter("AggregatorRatio", "1"); SmallTestData m_TestData; From efd71351054906763cbf41cb866fa911319a8edc Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 09:35:08 -0400 Subject: [PATCH 08/12] Create aggregator only if aggregators < mpi size --- source/adios2/engine/bp3/BP3Writer.cpp | 7 +++++-- source/adios2/engine/bp4/BP4Writer.cpp | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index fda562c5ad..a6c8354f7e 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -118,8 +118,11 @@ void BP3Writer::Flush(const int transportIndex) void BP3Writer::Init() { InitParameters(); - m_BP3Serializer.m_Aggregator.Init( - m_BP3Serializer.m_Parameters.NumAggregators, m_Comm); + if (m_BP3Serializer.m_Parameters.NumAggregators < m_BP3Serializer.m_SizeMPI) + { + m_BP3Serializer.m_Aggregator.Init( + m_BP3Serializer.m_Parameters.NumAggregators, m_Comm); + } InitTransports(); InitBPBuffer(); } diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index dec597d598..0ce90ad7ce 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -123,8 +123,11 @@ void BP4Writer::Flush(const int transportIndex) void BP4Writer::Init() { InitParameters(); - m_BP4Serializer.m_Aggregator.Init( - m_BP4Serializer.m_Parameters.NumAggregators, m_Comm); + if (m_BP4Serializer.m_Parameters.NumAggregators < m_BP4Serializer.m_SizeMPI) + { + m_BP4Serializer.m_Aggregator.Init( + m_BP4Serializer.m_Parameters.NumAggregators, m_Comm); + } InitTransports(); InitBPBuffer(); } From f0ca049da1824e0e2c1f985861c7cbb349e71454 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 09:35:36 -0400 Subject: [PATCH 09/12] replace macro to make aggregation work with complex and double complex types --- source/adios2/toolkit/format/bp/BPSerializer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/adios2/toolkit/format/bp/BPSerializer.cpp b/source/adios2/toolkit/format/bp/BPSerializer.cpp index e5fdead283..3e1093411f 100644 --- a/source/adios2/toolkit/format/bp/BPSerializer.cpp +++ b/source/adios2/toolkit/format/bp/BPSerializer.cpp @@ -336,7 +336,7 @@ void BPSerializer::UpdateOffsetsInMetadata() currentPosition, TypeTraits::type_enum, buffer); \ break; \ } - ADIOS2_FOREACH_ATTRIBUTE_PRIMITIVE_STDTYPE_1ARG(make_case) + ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(make_case) #undef make_case default: From 1172c0938de66586ccc22c71fd6c02da4727b09c Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 10:25:39 -0400 Subject: [PATCH 10/12] signed/unsigned fix --- source/adios2/engine/bp3/BP3Writer.cpp | 3 ++- source/adios2/engine/bp4/BP4Writer.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index a6c8354f7e..76be495611 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -118,7 +118,8 @@ void BP3Writer::Flush(const int transportIndex) void BP3Writer::Init() { InitParameters(); - if (m_BP3Serializer.m_Parameters.NumAggregators < m_BP3Serializer.m_SizeMPI) + if (m_BP3Serializer.m_Parameters.NumAggregators < + static_cast(m_BP3Serializer.m_SizeMPI)) { m_BP3Serializer.m_Aggregator.Init( m_BP3Serializer.m_Parameters.NumAggregators, m_Comm); diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index 0ce90ad7ce..34c85cf4af 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -123,7 +123,8 @@ void BP4Writer::Flush(const int transportIndex) void BP4Writer::Init() { InitParameters(); - if (m_BP4Serializer.m_Parameters.NumAggregators < m_BP4Serializer.m_SizeMPI) + if (m_BP4Serializer.m_Parameters.NumAggregators < + static_cast(m_BP4Serializer.m_SizeMPI)) { m_BP4Serializer.m_Aggregator.Init( m_BP4Serializer.m_Parameters.NumAggregators, m_Comm); From 9ea61008a357a2e3ad0c91751fec5cd37b564b60 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 10:57:11 -0400 Subject: [PATCH 11/12] remove unused variable --- testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp index 0d53c817e7..f50e97a15a 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp @@ -1753,10 +1753,8 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal1DSubFile) const std::string fname("BPWriteReadLocal1DSubFile.bp"); int mpiRank = 0; - int mpiSize = 1; #if ADIOS2_USE_MPI MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); - MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); #endif const size_t Nx0 = 3 + static_cast(mpiRank); From ce36fa3a08567ddf85f232f703f7c0e324799f22 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 6 Oct 2020 13:36:57 -0400 Subject: [PATCH 12/12] Change size_t to uint64_t for storing sub-block-minmax info and fix update function to read 8 bytes instead of 4. --- source/adios2/toolkit/format/bp/BPBase.tcc | 3 ++- source/adios2/toolkit/format/bp/BPSerializer.tcc | 2 +- source/adios2/toolkit/format/bp/bp4/BP4Serializer.tcc | 10 ++++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source/adios2/toolkit/format/bp/BPBase.tcc b/source/adios2/toolkit/format/bp/BPBase.tcc index 54430fd7be..6ad056796b 100644 --- a/source/adios2/toolkit/format/bp/BPBase.tcc +++ b/source/adios2/toolkit/format/bp/BPBase.tcc @@ -301,7 +301,8 @@ inline void BPBase::ParseCharacteristics(const std::vector &buffer, helper::ReadValue(buffer, position, isLittleEndian)); characteristics.Statistics.SubBlockInfo.SubBlockSize = - helper::ReadValue(buffer, position, isLittleEndian); + static_cast(helper::ReadValue( + buffer, position, isLittleEndian)); characteristics.Statistics.SubBlockInfo.Div.resize( dimensionsSize); diff --git a/source/adios2/toolkit/format/bp/BPSerializer.tcc b/source/adios2/toolkit/format/bp/BPSerializer.tcc index 9f5f7f6739..8414fa6eb5 100644 --- a/source/adios2/toolkit/format/bp/BPSerializer.tcc +++ b/source/adios2/toolkit/format/bp/BPSerializer.tcc @@ -167,7 +167,7 @@ void BPSerializer::UpdateIndexOffsetsCharacteristics(size_t ¤tPosition, currentPosition += 2 * sizeof(T); // block min/max if (M > 1) { - currentPosition += 1 + 4; // method, blockSize + currentPosition += 1 + 8; // method (byte), blockSize (uint64_t) currentPosition += dimensionsSize * sizeof(uint16_t); // N-dim division currentPosition += 2 * M * sizeof(T); // M * min/max diff --git a/source/adios2/toolkit/format/bp/bp4/BP4Serializer.tcc b/source/adios2/toolkit/format/bp/bp4/BP4Serializer.tcc index c60924c2bd..b3b4296033 100644 --- a/source/adios2/toolkit/format/bp/bp4/BP4Serializer.tcc +++ b/source/adios2/toolkit/format/bp/bp4/BP4Serializer.tcc @@ -620,8 +620,9 @@ void BP4Serializer::PutBoundsRecord(const bool singleValue, uint8_t method = static_cast(stats.SubBlockInfo.DivisionMethod); helper::InsertToBuffer(buffer, &method); - helper::InsertToBuffer(buffer, - &stats.SubBlockInfo.SubBlockSize); + uint64_t subBlockSize = + static_cast(stats.SubBlockInfo.SubBlockSize); + helper::InsertToBuffer(buffer, &subBlockSize); const uint16_t N = static_cast(stats.SubBlockInfo.Div.size()); @@ -673,8 +674,9 @@ void BP4Serializer::PutBoundsRecord(const bool singleValue, uint8_t method = static_cast(stats.SubBlockInfo.DivisionMethod); helper::CopyToBuffer(buffer, position, &method); - helper::CopyToBuffer(buffer, position, - &stats.SubBlockInfo.SubBlockSize); + uint64_t subBlockSize = + static_cast(stats.SubBlockInfo.SubBlockSize); + helper::CopyToBuffer(buffer, position, &subBlockSize); const uint16_t N = static_cast(stats.SubBlockInfo.Div.size());