Skip to content

Commit

Permalink
Merge pull request #2584 from JasonRuonanWang/ssc-threading
Browse files Browse the repository at this point in the history
Add threading into ssc writer to optimize first step metadata manipulation
  • Loading branch information
JasonRuonanWang authored Jan 15, 2021
2 parents 7bd5fec + 5ee3007 commit 5948ca8
Show file tree
Hide file tree
Showing 4 changed files with 408 additions and 52 deletions.
5 changes: 4 additions & 1 deletion docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ The SSC engine takes the following parameters:

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
=============================== ================== ================================================
OpenTimeoutSecs integer **10**, 2, 20, 200
OpenTimeoutSecs integer **10**, 2, 20, 200
MpiMode string **TwoSided**, OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, OneSidedPostPull
Threading bool **false**, true
=============================== ================== ================================================


159 changes: 109 additions & 50 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,26 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
m_OpenTimeoutSecs);

int providedMpiMode;
MPI_Query_thread(&providedMpiMode);
if (providedMpiMode != MPI_THREAD_MULTIPLE)
{
if (m_Threading == true)
{
m_Threading = false;
if (m_WriterRank == 0)
{
std::cout << "SSC Threading disabled as MPI is not initialized "
"with multi-threads"
<< std::endl;
}
}
}

SyncMpiPattern();
m_WriterRank = m_Comm.Rank();
m_WriterSize = m_Comm.Size();
Expand All @@ -42,6 +59,11 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();

if (m_Threading && m_EndStepThread.joinable())
{
m_EndStepThread.join();
}

++m_CurrentStep;

if (m_Verbosity >= 5)
Expand Down Expand Up @@ -80,6 +102,73 @@ size_t SscWriter::CurrentStep() const { return m_CurrentStep; }

void SscWriter::PerformPuts() { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::EndStepFirst()
{
TAU_SCOPED_TIMER_FUNC();

SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}

void SscWriter::EndStepConsequentFixed()
{
TAU_SCOPED_TIMER_FUNC();
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
}
}

void SscWriter::EndStepConsequentFlexible()
{
TAU_SCOPED_TIMER_FUNC();
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}

void SscWriter::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -93,68 +182,33 @@ void SscWriter::EndStep()

if (m_CurrentStep == 0)
{
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
if (m_Threading)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
m_EndStepThread = std::thread(&SscWriter::EndStepFirst, this);
}
else
{
EndStepFirst();
}
}
else
{
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(),
static_cast<int>(m_Buffer.size()), MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR,
m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR,
m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
EndStepConsequentFixed();
}
else
{
if (m_Threading)
{
MPI_Win_fence(0, m_MpiWin);
m_EndStepThread =
std::thread(&SscWriter::EndStepConsequentFlexible, this);
}
else if (m_MpiMode == "onesidedpostpull")
else
{
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
EndStepConsequentFlexible();
}
}
else
{
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}
}

Expand Down Expand Up @@ -376,6 +430,11 @@ void SscWriter::DoClose(const int transportIndex)
<< ", Writer Rank " << m_WriterRank << std::endl;
}

if (m_Threading && m_EndStepThread.joinable())
{
m_EndStepThread.join();
}

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_CurrentStep > 0)
Expand Down
7 changes: 6 additions & 1 deletion source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class SscWriter : public Engine
MPI_Win m_MpiWin;
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
std::thread m_EndStepThread;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -64,6 +64,9 @@ class SscWriter : public Engine
void SyncWritePattern(bool finalStep = false);
void SyncReadPattern();
void MpiWait();
void EndStepFirst();
void EndStepConsequentFixed();
void EndStepConsequentFlexible();

#define declare_type(T) \
void DoPutSync(Variable<T> &, const T *) final; \
Expand All @@ -82,6 +85,8 @@ class SscWriter : public Engine

int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
Loading

0 comments on commit 5948ca8

Please sign in to comment.