Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BP3/BP4 options: NumAggregators (same as SubStreams) and Aggregat… #2477

Merged
merged 12 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/user_guide/source/engines/bp3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ This engine allows the user to fine tune the buffering operations through the fo

8. **FlushStepsCount**: users can select how often to produce the more expensive collective metadata file in terms of steps: default is 1. Increase to reduce adios2 collective operations footprint, with the trade-off of reducing checkpoint frequency. Buffer size will increase until first steps count if ``MaxBufferSize`` is not set.

9. **SubStreams**: (MPI-only) users can select how many sub-streams (``M`` sub-files) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. If Substream is out of bounds it will pick either 1 (``SubStreams`` < ``1 -> N-to-1``) or ``N`` ((``SubStreams`` > ``N -> N-to-N``) and ADIOS2 will issue a WARNING message. Use for performance tuning.
9. **NumAggregators** (or **SubStreams**): Users can select how many sub-files (``M``) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. Default is 0, which will let adios2 to group processes per shared-memory-access (i.e. one per compute node) and use one process per node as an aggregator. If NumAggregators is larger than the number of processes then it will be set to the number of processes.

10. **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required for using local disk/SSD/NVMe in a cluster.
10. **AggregatorRatio**: An alternative option to NumAggregators to pick every Nth process as aggregator. An integer divider of the number of processes is required, otherwise a runtime exception is thrown.

11. **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required for using local disk/SSD/NVMe in a cluster.

==================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
Expand All @@ -63,7 +65,8 @@ This engine allows the user to fine tune the buffering operations through the fo
MaxBufferSize float+units >= 16Kb **at EndStep**, 10Mb, 0.5Gb
BufferGrowthFactor float > 1 **1.05**, 1.01, 1.5, 2
FlushStepsCount integer > 1 **1**, 5, 1000, 50000
SubStreams integer >= 1 **MPI_Size (N-to-N)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1
NumAggregators integer >= 1 **0 (one file per compute node)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1
AggregatorRatio integer >= 1 not used unless set, ``MPI_Size``/N must be an integer value
Node-Local string On/Off **Off**, On
==================== ===================== ===========================================================

Expand Down
25 changes: 14 additions & 11 deletions docs/user_guide/source/engines/bp4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,27 @@ This engine allows the user to fine tune the buffering operations through the fo

7. **FlushStepsCount**: users can select how often to produce the more expensive collective metadata file in terms of steps: default is 1. Increase to reduce adios2 collective operations footprint, with the trade-off of reducing checkpoint frequency. Buffer size will increase until first steps count if ``MaxBufferSize`` is not set.

8. **SubStreams**: (MPI-only) users can select how many sub-streams (``M`` sub-files) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. If Substream is out of bounds it will pick either 1 (``SubStreams`` < ``1 -> N-to-1``) or ``N`` ((``SubStreams`` > ``N -> N-to-N``) and ADIOS2 will issue a WARNING message. Use for performance tuning.
8. **NumAggregators** (or **SubStreams**): Users can select how many sub-files (``M``) are produced during a run, ranges between 1 and the number of mpi processes from ``MPI_Size`` (``N``), adios2 will internally aggregate data buffers (``N-to-M``) to output the required number of sub-files. Default is 0, which will let adios2 to group processes per shared-memory-access (i.e. one per compute node) and use one process per node as an aggregator. If NumAggregators is larger than the number of processes then it will be set to the number of processes.

9. **OpenTimeoutSecs**: (Streaming mode) Reader may want to wait for the creation of the file in ``io.Open()``. By default the Open() function returns with an error if file is not found.
9. **AggregatorRatio**: An alternative option to NumAggregators to pick every Nth process as aggregator. An integer divider of the number of processes is required, otherwise a runtime exception is thrown.

10. **BeginStepPollingFrequencySecs**: (Streaming mode) Reader can set how frequently to check the file (and file system) for new steps. Default is 1 seconds which may be stressful for the file system and unnecessary for the application.
10. **OpenTimeoutSecs**: (Streaming mode) Reader may want to wait for the creation of the file in ``io.Open()``. By default the Open() function returns with an error if file is not found.

11. **StatsLevel**: Turn on/off calculating statistics for every variable (Min/Max). Default is On. It has some cost to generate this metadata so it can be turned off if there is no need for this information.
11. **BeginStepPollingFrequencySecs**: (Streaming mode) Reader can set how frequently to check the file (and file system) for new steps. Default is 1 seconds which may be stressful for the file system and unnecessary for the application.

12. **StatsBlockSize**: Calculate Min/Max for a given size of each process output. Default is one Min/Max per writer. More fine-grained min/max can be useful for querying the data.
12. **StatsLevel**: Turn on/off calculating statistics for every variable (Min/Max). Default is On. It has some cost to generate this metadata so it can be turned off if there is no need for this information.

13. **NodeLocal** or **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required when writing to local disk/SSD/NVMe in a cluster. Note: the BurstBuffer* parameters are newer and should be used for using the local storage as temporary instead of this parameter.
13. **StatsBlockSize**: Calculate Min/Max for a given size of each process output. Default is one Min/Max per writer. More fine-grained min/max can be useful for querying the data.

14. **BurstBufferPath**: Redirect output file to another location and drain it to the original target location in an asynchronous thread. It requires to be able to launch one thread per aggregator (see SubStreams) on the system. This feature can be used on machines that have local NVMe/SSDs on each node to accelerate the output writing speed. On Summit at OLCF, use "/mnt/bb/<username>" for the path where <username> is your user account name. Temporary files on the accelerated storage will be automatically deleted after the application closes the output and ADIOS drains all data to the file system, unless draining is turned off (see the next parameter). Note: at this time, this feature cannot be used to append data to an existing dataset on the target system.
14. **NodeLocal** or **Node-Local**: For distributed file system. Every writer process must make sure the .bp/ directory is created on the local file system. Required when writing to local disk/SSD/NVMe in a cluster. Note: the BurstBuffer* parameters are newer and should be used for using the local storage as temporary instead of this parameter.

15. **BurstBufferDrain**: To write only to the accelerated storage but to not drain it to the target file system, set this flag to false. Data will NOT be deleted from the accelerated storage on close. By default, setting the BurstBufferPath will turn on draining.
15. **BurstBufferPath**: Redirect output file to another location and drain it to the original target location in an asynchronous thread. It requires to be able to launch one thread per aggregator (see SubStreams) on the system. This feature can be used on machines that have local NVMe/SSDs on each node to accelerate the output writing speed. On Summit at OLCF, use "/mnt/bb/<username>" for the path where <username> is your user account name. Temporary files on the accelerated storage will be automatically deleted after the application closes the output and ADIOS drains all data to the file system, unless draining is turned off (see the next parameter). Note: at this time, this feature cannot be used to append data to an existing dataset on the target system.

16. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc).
16. **BurstBufferDrain**: To write only to the accelerated storage but to not drain it to the target file system, set this flag to false. Data will NOT be deleted from the accelerated storage on close. By default, setting the BurstBufferPath will turn on draining.

17. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open().
17. **BurstBufferVerbose**: Verbose level 1 will cause each draining thread to print a one line report at the end (to standard output) about where it has spent its time and the number of bytes moved. Verbose level 2 will cause each thread to print a line for each draining operation (file creation, copy block, write block from memory, etc).

18. **StreamReader**: By default the BP4 engine parses all available metadata in Open(). An application may turn this flag on to parse a limited number of steps at once, and update metadata when those steps have been processed. If the flag is ON, reading only works in streaming mode (using BeginStep/EndStep); file reading mode will not work as there will be zero steps processed in Open().

============================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
Expand All @@ -88,7 +90,8 @@ This engine allows the user to fine tune the buffering operations through the fo
MaxBufferSize float+units >= 16Kb **at EndStep**, 10Mb, 0.5Gb
BufferGrowthFactor float > 1 **1.05**, 1.01, 1.5, 2
FlushStepsCount integer > 1 **1**, 5, 1000, 50000
SubStreams integer >= 1 **MPI_Size (N-to-N)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1
NumAggregators integer >= 1 **0 (one file per compute node)**, ``MPI_Size``/2, ... , 2, (N-to-1) 1
AggregatorRatio integer >= 1 not used unless set, ``MPI_Size``/N must be an integer value
OpenTimeoutSecs float **0**, ``10.0``, ``5``
BeginStepPollingFrequencySecs float **1**, ``10.0``
StatsLevel integer, 0 or 1 **1**, ``0``
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ void BP3Writer::Flush(const int transportIndex)
void BP3Writer::Init()
{
InitParameters();
if (m_BP3Serializer.m_Parameters.NumAggregators <
static_cast<unsigned int>(m_BP3Serializer.m_SizeMPI))
{
m_BP3Serializer.m_Aggregator.Init(
m_BP3Serializer.m_Parameters.NumAggregators, m_Comm);
}
InitTransports();
InitBPBuffer();
}
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ void BP4Writer::Flush(const int transportIndex)
void BP4Writer::Init()
{
InitParameters();
if (m_BP4Serializer.m_Parameters.NumAggregators <
static_cast<unsigned int>(m_BP4Serializer.m_SizeMPI))
{
m_BP4Serializer.m_Aggregator.Init(
m_BP4Serializer.m_Parameters.NumAggregators, m_Comm);
}
InitTransports();
InitBPBuffer();
}
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/helper/adiosComm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ Comm Comm::World(const std::string &hint) const
return Comm(m_Impl->World(hint));
}

Comm Comm::GroupByShm(const std::string &hint) const
{
return Comm(m_Impl->GroupByShm(hint));
}

int Comm::Rank() const { return m_Impl->Rank(); }

int Comm::Size() const { return m_Impl->Size(); }
Expand Down
9 changes: 9 additions & 0 deletions source/adios2/helper/adiosComm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ class Comm
*/
Comm World(const std::string &hint = std::string()) const;

/**
* @brief Create a communicator for processes that can share memory
* @param hint Description of std::runtime_error exception on error.
* Useful for grouping processes per compute node
*/
Comm GroupByShm(const std::string &hint = std::string()) const;

int Rank() const;
int Size() const;

Expand Down Expand Up @@ -368,6 +375,8 @@ class CommImpl
virtual std::unique_ptr<CommImpl> Split(int color, int key,
const std::string &hint) const = 0;
virtual std::unique_ptr<CommImpl> World(const std::string &hint) const = 0;
virtual std::unique_ptr<CommImpl>
GroupByShm(const std::string &hint) const = 0;
virtual int Rank() const = 0;
virtual int Size() const = 0;
virtual bool IsMPI() const = 0;
Expand Down
7 changes: 7 additions & 0 deletions source/adios2/helper/adiosCommDummy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class CommImplDummy : public CommImpl
std::unique_ptr<CommImpl> Split(int color, int key,
const std::string &hint) const override;
std::unique_ptr<CommImpl> World(const std::string &hint) const override;
virtual std::unique_ptr<CommImpl>
GroupByShm(const std::string &hint) const override;

int Rank() const override;
int Size() const override;
Expand Down Expand Up @@ -124,6 +126,11 @@ std::unique_ptr<CommImpl> CommImplDummy::World(const std::string &) const
return std::unique_ptr<CommImpl>(new CommImplDummy());
}

std::unique_ptr<CommImpl> CommImplDummy::GroupByShm(const std::string &) const
{
return std::unique_ptr<CommImpl>(new CommImplDummy());
}

int CommImplDummy::Rank() const { return 0; }

int CommImplDummy::Size() const { return 1; }
Expand Down
13 changes: 13 additions & 0 deletions source/adios2/helper/adiosCommMPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class CommImplMPI : public CommImpl
std::unique_ptr<CommImpl> Split(int color, int key,
const std::string &hint) const override;
std::unique_ptr<CommImpl> World(const std::string &hint) const override;
virtual std::unique_ptr<CommImpl>
GroupByShm(const std::string &hint) const override;

int Rank() const override;
int Size() const override;
Expand Down Expand Up @@ -224,6 +226,17 @@ std::unique_ptr<CommImpl> CommImplMPI::World(const std::string &) const
return std::unique_ptr<CommImpl>(new CommImplMPI(MPI_COMM_WORLD));
}

std::unique_ptr<CommImpl> CommImplMPI::GroupByShm(const std::string &hint) const
{
MPI_Comm nodeComm;
MPI_Info info;
MPI_Info_create(&info);
CheckMPIReturn(MPI_Comm_split_type(m_MPIComm, MPI_COMM_TYPE_SHARED, 0, info,
&nodeComm),
hint);
return std::unique_ptr<CommImpl>(new CommImplMPI(nodeComm));
}

int CommImplMPI::Rank() const
{
int rank;
Expand Down
39 changes: 39 additions & 0 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,45 @@ void MPIAggregator::InitComm(const size_t subStreams,
m_SubStreams = subStreams;
}

void MPIAggregator::InitCommOnePerNode(helper::Comm const &parentComm)
{
m_Comm = parentComm.GroupByShm("creating default aggregator setup at Open");
m_Rank = m_Comm.Rank();
m_Size = m_Comm.Size();

if (m_Rank != 0)
{
m_IsConsumer = false;
}

m_IsActive = true;

/* Determine number of aggregators (= nodes) */

/*
* Communicators connecting rank N of each node
* We are only interested in the chain of rank 0s
*/
int color = (m_Rank ? 1 : 0);
helper::Comm onePerNodeComm =
parentComm.Split(color, 0, "creating default aggregator setup at Open");

if (!m_Rank)
{
m_SubStreamIndex = static_cast<size_t>(onePerNodeComm.Rank());
m_SubStreams = static_cast<size_t>(onePerNodeComm.Size());
}
m_SubStreams = m_Comm.BroadcastValue<size_t>(m_SubStreams, 0);
m_SubStreamIndex = m_Comm.BroadcastValue<size_t>(m_SubStreamIndex, 0);

/* Identify parent rank of aggregator process within each group */
if (!m_Rank)
{
m_ConsumerRank = parentComm.Rank();
}
m_ConsumerRank = m_Comm.BroadcastValue<int>(m_ConsumerRank, 0);
}

void MPIAggregator::HandshakeRank(const int rank)
{
int message = -1;
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/aggregator/mpi/MPIAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class MPIAggregator
* the last rank) */
void InitComm(const size_t subStreams, helper::Comm const &parentComm);

/** A default init function to select one process per node to be aggregator
*/
void InitCommOnePerNode(helper::Comm const &parentComm);

/** handshakes a single rank with the rest of the m_Comm ranks */
void HandshakeRank(const int rank = 0);

Expand Down
12 changes: 10 additions & 2 deletions source/adios2/toolkit/aggregator/mpi/MPIChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ MPIChain::MPIChain() : MPIAggregator() {}

void MPIChain::Init(const size_t subStreams, helper::Comm const &parentComm)
{
InitComm(subStreams, parentComm);
HandshakeRank(0);
if (subStreams > 0)
{
InitComm(subStreams, parentComm);
HandshakeRank(0);
}
else
{
InitCommOnePerNode(parentComm);
}

HandshakeLinks();

// add a receiving buffer except for the last rank (only sends)
Expand Down
Loading