From 0db6db3299c71714c0ad298e49d4160258925c46 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 15 Feb 2021 20:46:47 -0500 Subject: [PATCH 1/3] only dataman writer takes TransportMode parameter --- .../adios2/engine/dataman/DataManReader.cpp | 9 ++- .../adios2/engine/dataman/DataManWriter.cpp | 60 ++++++++++++++++--- source/adios2/engine/dataman/DataManWriter.h | 2 + 3 files changed, 59 insertions(+), 12 deletions(-) diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index bb8141c141..4eef6cff0d 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -32,7 +32,6 @@ DataManReader::DataManReader(IO &io, const std::string &name, helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout); helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading); - helper::GetParameter(m_IO.m_Parameters, "TransportMode", m_TransportMode); helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive); if (m_IPAddress.empty()) @@ -56,6 +55,7 @@ DataManReader::DataManReader(IO &io, const std::string &name, .count(); auto startTime = std::chrono::system_clock::now(); + while (reply == nullptr or reply->empty()) { reply = m_Requester.Request("Handshake", 9); @@ -69,10 +69,13 @@ DataManReader::DataManReader(IO &io, const std::string &name, return; } } + + nlohmann::json message = nlohmann::json::parse(reply->data()); + m_TransportMode = message["Transport"]; + if (m_MonitorActive) { - m_Monitor.SetClockError(roundLatency, - *reinterpret_cast(reply->data())); + m_Monitor.SetClockError(roundLatency, message["TimeStamp"]); m_Monitor.AddTransport(m_TransportMode); if (m_Threading) { diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 8dfe4114e9..d987e482c2 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -44,6 +44,9 @@ DataManWriter::DataManWriter(IO &io, const std::string &name, helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive); helper::GetParameter(m_IO.m_Parameters, "CombiningSteps", m_CombiningSteps); + m_HandshakeJson["Threading"] = m_Threading; + m_HandshakeJson["Transport"] = m_TransportMode; + if (m_IPAddress.empty()) { throw(std::invalid_argument("IP address not specified")); @@ -73,14 +76,25 @@ DataManWriter::DataManWriter(IO &io, const std::string &name, m_Replier.OpenReplier(replierAddress, m_Timeout, 64); - if (m_RendezvousReaderCount == 0 || m_TransportMode == "reliable") + if (m_RendezvousReaderCount == 0) { m_ReplyThreadActive = true; m_ReplyThread = std::thread(&DataManWriter::ReplyThread, this); } else { - ReplyThread(); + Handshake(); + } + + if (m_TransportMode == "reliable" && m_RendezvousReaderCount > 0) + { + m_ReplyThreadActive = true; + m_ReplyThread = std::thread(&DataManWriter::ReplyThread, this); + } + + if (m_TransportMode == "fast") + { + m_ReplyThreadActive = false; } if (m_Threading && m_TransportMode == "fast") @@ -221,7 +235,6 @@ void DataManWriter::DoClose(const int transportIndex) } m_PublishThreadActive = false; - if (m_ReplyThreadActive) { while (m_SentSteps < m_CurrentStep + 2) @@ -281,6 +294,38 @@ void DataManWriter::PublishThread() } } +void DataManWriter::Handshake() +{ + int readerCount = 0; + while (true) + { + auto request = m_Replier.ReceiveRequest(); + if (request != nullptr && request->size() > 0) + { + std::string r(request->begin(), request->end()); + if (r == "Handshake") + { + m_HandshakeJson["TimeStamp"] = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + std::string js = m_HandshakeJson.dump() + '\0'; + m_Replier.SendReply(js.data(), js.size()); + } + else if (r == "Ready") + { + m_Replier.SendReply("OK", 2); + ++readerCount; + } + + if (readerCount >= m_RendezvousReaderCount) + { + break; + } + } + } +} + void DataManWriter::ReplyThread() { int readerCount = 0; @@ -292,11 +337,12 @@ void DataManWriter::ReplyThread() std::string r(request->begin(), request->end()); if (r == "Handshake") { - uint64_t timeStamp = + m_HandshakeJson["TimeStamp"] = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); - m_Replier.SendReply(&timeStamp, sizeof(timeStamp)); + std::string js = m_HandshakeJson.dump() + '\0'; + m_Replier.SendReply(js.data(), js.size()); } else if (r == "Ready") { @@ -317,10 +363,6 @@ void DataManWriter::ReplyThread() } } } - if (m_RendezvousReaderCount == readerCount && m_TransportMode == "fast") - { - m_ReplyThreadActive = false; - } } } diff --git a/source/adios2/engine/dataman/DataManWriter.h b/source/adios2/engine/dataman/DataManWriter.h index d316d2131a..fc423ad130 100644 --- a/source/adios2/engine/dataman/DataManWriter.h +++ b/source/adios2/engine/dataman/DataManWriter.h @@ -57,6 +57,7 @@ class DataManWriter : public Engine size_t m_SerializerBufferSize = 1024 * 1024; int64_t m_CurrentStep = -1; std::atomic m_SentSteps; + nlohmann::json m_HandshakeJson; format::DataManSerializer m_Serializer; @@ -76,6 +77,7 @@ class DataManWriter : public Engine void PushBufferQueue(std::shared_ptr> buffer); std::shared_ptr> PopBufferQueue(); + void Handshake(); void ReplyThread(); void PublishThread(); From 4924090e99aa3ab1d81c469c429f6eacc5ffae03 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 15 Feb 2021 21:00:29 -0500 Subject: [PATCH 2/3] updated dataman doc --- docs/user_guide/source/engines/dataman.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/user_guide/source/engines/dataman.rst b/docs/user_guide/source/engines/dataman.rst index 62ec66b02c..2750511ef3 100644 --- a/docs/user_guide/source/engines/dataman.rst +++ b/docs/user_guide/source/engines/dataman.rst @@ -26,9 +26,10 @@ The DataMan engine takes the following parameters: 5. ``Threading``: Default **true** for reader, **false** for writer. Whether to use threads for send and receive operations. Enabling threading will cause extra overhead for managing threads and buffer queues, but will improve the continuity of data steps for readers, and help overlap data transfers with computations for writers. - Advice for generic uses cases is to keep the default values, true for reader and false for writer. -6. ``TransportMode``: Default **fast**. The fast mode is optimized for latency-critical applications. +6. ``TransportMode``: Default **fast**. Only DataMan writers take this parameter. + Readers are automatically synchronized at runtime to match writers' transport mode. + The fast mode is optimized for latency-critical applications. It enforces readers to only receive the latest step. Therefore, in cases where writers are faster than readers, readers will skip some data steps. The reliable mode ensures that all steps are received by readers, by sacrificing performance compared to the fast mode. From 5a7bcf8b2d999cdd0d0f6e7f6965562470964d08 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 15 Feb 2021 21:19:54 -0500 Subject: [PATCH 3/3] pass writer threading to reader for monitoring --- source/adios2/engine/dataman/DataManReader.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 4eef6cff0d..134dc1370d 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -81,6 +81,11 @@ DataManReader::DataManReader(IO &io, const std::string &name, { m_Monitor.SetReaderThreading(); } + bool writerThreading = message["Threading"]; + if (writerThreading) + { + m_Monitor.SetWriterThreading(); + } } if (m_TransportMode == "fast")