Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add threading into ssc writer to optimize first step metadata manipulation #2584

Merged
merged 5 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ The SSC engine takes the following parameters:

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
=============================== ================== ================================================
OpenTimeoutSecs integer **10**, 2, 20, 200
OpenTimeoutSecs integer **10**, 2, 20, 200
MpiMode string **TwoSided**, OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, OneSidedPostPull
Threading bool **false**, true
=============================== ================== ================================================


159 changes: 109 additions & 50 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,26 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
m_OpenTimeoutSecs);

int providedMpiMode;
MPI_Query_thread(&providedMpiMode);
if (providedMpiMode != MPI_THREAD_MULTIPLE)
{
if (m_Threading == true)
{
m_Threading = false;
if (m_WriterRank == 0)
{
std::cout << "SSC Threading disabled as MPI is not initialized "
"with multi-threads"
<< std::endl;
}
}
}

SyncMpiPattern();
m_WriterRank = m_Comm.Rank();
m_WriterSize = m_Comm.Size();
Expand All @@ -42,6 +59,11 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();

if (m_Threading && m_EndStepThread.joinable())
{
m_EndStepThread.join();
}

++m_CurrentStep;

if (m_Verbosity >= 5)
Expand Down Expand Up @@ -80,6 +102,73 @@ size_t SscWriter::CurrentStep() const { return m_CurrentStep; }

void SscWriter::PerformPuts() { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::EndStepFirst()
{
TAU_SCOPED_TIMER_FUNC();

SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}

void SscWriter::EndStepConsequentFixed()
{
TAU_SCOPED_TIMER_FUNC();
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
}
}

void SscWriter::EndStepConsequentFlexible()
{
TAU_SCOPED_TIMER_FUNC();
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}

void SscWriter::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -93,68 +182,33 @@ void SscWriter::EndStep()

if (m_CurrentStep == 0)
{
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
if (m_Threading)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
m_EndStepThread = std::thread(&SscWriter::EndStepFirst, this);
}
else
{
EndStepFirst();
}
}
else
{
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(),
static_cast<int>(m_Buffer.size()), MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR,
m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_MpiAllReadersGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR,
m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
EndStepConsequentFixed();
}
else
{
if (m_Threading)
{
MPI_Win_fence(0, m_MpiWin);
m_EndStepThread =
std::thread(&SscWriter::EndStepConsequentFlexible, this);
}
else if (m_MpiMode == "onesidedpostpull")
else
{
MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin);
EndStepConsequentFlexible();
}
}
else
{
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}
}

Expand Down Expand Up @@ -376,6 +430,11 @@ void SscWriter::DoClose(const int transportIndex)
<< ", Writer Rank " << m_WriterRank << std::endl;
}

if (m_Threading && m_EndStepThread.joinable())
{
m_EndStepThread.join();
}

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_CurrentStep > 0)
Expand Down
7 changes: 6 additions & 1 deletion source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class SscWriter : public Engine
MPI_Win m_MpiWin;
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
std::thread m_EndStepThread;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -64,6 +64,9 @@ class SscWriter : public Engine
void SyncWritePattern(bool finalStep = false);
void SyncReadPattern();
void MpiWait();
void EndStepFirst();
void EndStepConsequentFixed();
void EndStepConsequentFlexible();

#define declare_type(T) \
void DoPutSync(Variable<T> &, const T *) final; \
Expand All @@ -82,6 +85,8 @@ class SscWriter : public Engine

int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
Loading