Skip to content

Commit

Permalink
Merge pull request #2622 from JasonRuonanWang/dataman
Browse files Browse the repository at this point in the history
only dataman writer takes TransportMode parameter
  • Loading branch information
JasonRuonanWang authored Feb 16, 2021
2 parents 3455b66 + 5a7bcf8 commit 658ab72
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 14 deletions.
5 changes: 3 additions & 2 deletions docs/user_guide/source/engines/dataman.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ The DataMan engine takes the following parameters:

5. ``Threading``: Default **true** for reader, **false** for writer. Whether to use threads for send and receive operations.
Enabling threading will cause extra overhead for managing threads and buffer queues, but will improve the continuity of data steps for readers, and help overlap data transfers with computations for writers.
Advice for generic uses cases is to keep the default values, true for reader and false for writer.

6. ``TransportMode``: Default **fast**. The fast mode is optimized for latency-critical applications.
6. ``TransportMode``: Default **fast**. Only DataMan writers take this parameter.
Readers are automatically synchronized at runtime to match writers' transport mode.
The fast mode is optimized for latency-critical applications.
It enforces readers to only receive the latest step.
Therefore, in cases where writers are faster than readers, readers will skip some data steps.
The reliable mode ensures that all steps are received by readers, by sacrificing performance compared to the fast mode.
Expand Down
14 changes: 11 additions & 3 deletions source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ DataManReader::DataManReader(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "TransportMode", m_TransportMode);
helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive);

if (m_IPAddress.empty())
Expand All @@ -56,6 +55,7 @@ DataManReader::DataManReader(IO &io, const std::string &name,
.count();

auto startTime = std::chrono::system_clock::now();

while (reply == nullptr or reply->empty())
{
reply = m_Requester.Request("Handshake", 9);
Expand All @@ -69,15 +69,23 @@ DataManReader::DataManReader(IO &io, const std::string &name,
return;
}
}

nlohmann::json message = nlohmann::json::parse(reply->data());
m_TransportMode = message["Transport"];

if (m_MonitorActive)
{
m_Monitor.SetClockError(roundLatency,
*reinterpret_cast<uint64_t *>(reply->data()));
m_Monitor.SetClockError(roundLatency, message["TimeStamp"]);
m_Monitor.AddTransport(m_TransportMode);
if (m_Threading)
{
m_Monitor.SetReaderThreading();
}
bool writerThreading = message["Threading"];
if (writerThreading)
{
m_Monitor.SetWriterThreading();
}
}

if (m_TransportMode == "fast")
Expand Down
60 changes: 51 additions & 9 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive);
helper::GetParameter(m_IO.m_Parameters, "CombiningSteps", m_CombiningSteps);

m_HandshakeJson["Threading"] = m_Threading;
m_HandshakeJson["Transport"] = m_TransportMode;

if (m_IPAddress.empty())
{
throw(std::invalid_argument("IP address not specified"));
Expand Down Expand Up @@ -73,14 +76,25 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,

m_Replier.OpenReplier(replierAddress, m_Timeout, 64);

if (m_RendezvousReaderCount == 0 || m_TransportMode == "reliable")
if (m_RendezvousReaderCount == 0)
{
m_ReplyThreadActive = true;
m_ReplyThread = std::thread(&DataManWriter::ReplyThread, this);
}
else
{
ReplyThread();
Handshake();
}

if (m_TransportMode == "reliable" && m_RendezvousReaderCount > 0)
{
m_ReplyThreadActive = true;
m_ReplyThread = std::thread(&DataManWriter::ReplyThread, this);
}

if (m_TransportMode == "fast")
{
m_ReplyThreadActive = false;
}

if (m_Threading && m_TransportMode == "fast")
Expand Down Expand Up @@ -221,7 +235,6 @@ void DataManWriter::DoClose(const int transportIndex)
}

m_PublishThreadActive = false;

if (m_ReplyThreadActive)
{
while (m_SentSteps < m_CurrentStep + 2)
Expand Down Expand Up @@ -281,6 +294,38 @@ void DataManWriter::PublishThread()
}
}

void DataManWriter::Handshake()
{
int readerCount = 0;
while (true)
{
auto request = m_Replier.ReceiveRequest();
if (request != nullptr && request->size() > 0)
{
std::string r(request->begin(), request->end());
if (r == "Handshake")
{
m_HandshakeJson["TimeStamp"] =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
std::string js = m_HandshakeJson.dump() + '\0';
m_Replier.SendReply(js.data(), js.size());
}
else if (r == "Ready")
{
m_Replier.SendReply("OK", 2);
++readerCount;
}

if (readerCount >= m_RendezvousReaderCount)
{
break;
}
}
}
}

void DataManWriter::ReplyThread()
{
int readerCount = 0;
Expand All @@ -292,11 +337,12 @@ void DataManWriter::ReplyThread()
std::string r(request->begin(), request->end());
if (r == "Handshake")
{
uint64_t timeStamp =
m_HandshakeJson["TimeStamp"] =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
m_Replier.SendReply(&timeStamp, sizeof(timeStamp));
std::string js = m_HandshakeJson.dump() + '\0';
m_Replier.SendReply(js.data(), js.size());
}
else if (r == "Ready")
{
Expand All @@ -317,10 +363,6 @@ void DataManWriter::ReplyThread()
}
}
}
if (m_RendezvousReaderCount == readerCount && m_TransportMode == "fast")
{
m_ReplyThreadActive = false;
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class DataManWriter : public Engine
size_t m_SerializerBufferSize = 1024 * 1024;
int64_t m_CurrentStep = -1;
std::atomic<size_t> m_SentSteps;
nlohmann::json m_HandshakeJson;

format::DataManSerializer m_Serializer;

Expand All @@ -76,6 +77,7 @@ class DataManWriter : public Engine
void PushBufferQueue(std::shared_ptr<std::vector<char>> buffer);
std::shared_ptr<std::vector<char>> PopBufferQueue();

void Handshake();
void ReplyThread();
void PublishThread();

Expand Down

0 comments on commit 658ab72

Please sign in to comment.