From 1b7b0a86cf0f7765464cc40fdbc18d6b557ccc6a Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Wed, 20 Jan 2021 12:23:06 -0500 Subject: [PATCH 1/6] added dataman step combination --- .../adios2/engine/dataman/DataManWriter.cpp | 52 ++++++++++++------- source/adios2/engine/dataman/DataManWriter.h | 2 + 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index c115d8da1b..6c63b8f46f 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -36,6 +36,7 @@ DataManWriter::DataManWriter(IO &io, const std::string &name, helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer); helper::GetParameter(m_IO.m_Parameters, "TransportMode", m_TransportMode); helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive); + helper::GetParameter(m_IO.m_Parameters, "CombiningSteps", m_CombiningSteps); if (m_IPAddress.empty()) { @@ -140,36 +141,47 @@ void DataManWriter::EndStep() m_Serializer.PutAttributes(m_IO); } - m_Serializer.AttachAttributesToLocalPack(); - const auto buffer = m_Serializer.GetLocalPack(); - if (buffer->size() > m_SerializerBufferSize) - { - m_SerializerBufferSize = buffer->size(); - } + ++m_CombinedSteps; - if (m_MonitorActive) - { - m_Monitor.BeginTransport(m_CurrentStep); - } - - if (m_DoubleBuffer || m_TransportMode == "reliable") + if (m_CombinedSteps == m_CombiningSteps) { - PushBufferQueue(buffer); + m_CombinedSteps = 0; + m_Serializer.AttachAttributesToLocalPack(); + const auto buffer = m_Serializer.GetLocalPack(); + if (buffer->size() > m_SerializerBufferSize) + { + m_SerializerBufferSize = buffer->size(); + } + if (m_MonitorActive) + { + m_Monitor.BeginTransport(m_CurrentStep); + } + if (m_DoubleBuffer || m_TransportMode == "reliable") + { + PushBufferQueue(buffer); + } + else + { + m_Publisher.Send(buffer); + if (m_MonitorActive) + { + m_Monitor.EndTransport(); + } + } + if (m_MonitorActive) + { + m_Monitor.EndStep(m_CurrentStep); + } } else { - m_Publisher.Send(buffer); if (m_MonitorActive) { - m_Monitor.EndTransport(); + m_Monitor.BeginTransport(m_CurrentStep); + m_Monitor.EndStep(m_CurrentStep); } } - if (m_MonitorActive) - { - m_Monitor.EndStep(m_CurrentStep); - } - if (m_Verbosity >= 10) { std::cout << "DataManWriter::EndStep " << m_CurrentStep << std::endl; diff --git a/source/adios2/engine/dataman/DataManWriter.h b/source/adios2/engine/dataman/DataManWriter.h index a16f8c3b8f..f953e1e774 100644 --- a/source/adios2/engine/dataman/DataManWriter.h +++ b/source/adios2/engine/dataman/DataManWriter.h @@ -49,6 +49,8 @@ class DataManWriter : public Engine bool m_DoubleBuffer = false; std::string m_TransportMode = "fast"; bool m_MonitorActive = false; + int m_CombiningSteps = 1; + int m_CombinedSteps = 0; std::string m_AllAddresses; std::string m_PublisherAddress; From 728d68c388fe515c82f79db387bb48d339b8361b Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Fri, 22 Jan 2021 19:22:11 -0500 Subject: [PATCH 2/6] dataman multi-step per shot worked --- .../adios2/engine/dataman/DataManMonitor.cpp | 4 +-- .../adios2/engine/dataman/DataManReader.cpp | 2 +- .../adios2/engine/dataman/DataManWriter.cpp | 29 ++++++++++++++----- .../format/dataman/DataManSerializer.tcc | 18 ------------ 4 files changed, 24 insertions(+), 29 deletions(-) diff --git a/source/adios2/engine/dataman/DataManMonitor.cpp b/source/adios2/engine/dataman/DataManMonitor.cpp index 86e50c4f9d..1b1119ac20 100644 --- a/source/adios2/engine/dataman/DataManMonitor.cpp +++ b/source/adios2/engine/dataman/DataManMonitor.cpp @@ -108,12 +108,12 @@ void DataManMonitor::EndTransport() { auto latency = std::chrono::duration_cast( (std::chrono::system_clock::now() - - m_TransportTimers.back().second)) + m_TransportTimers.front().second)) .count(); if (m_Verbose) { std::lock_guard l(m_PrintMutex); - std::cout << "Step " << m_TransportTimers.back().first + std::cout << "Step " << m_TransportTimers.front().first << ", Latency milliseconds " << static_cast(latency) / 1000.0 << std::endl; } diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index eeff7dc271..f6cdebdad4 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -166,7 +166,7 @@ StepStatus DataManReader::BeginStep(StepMode stepMode, } m_CurrentStepMetadata = m_Serializer.GetEarliestLatestStep( - m_CurrentStep, m_PublisherAddresses.size(), timeout, true); + m_CurrentStep, m_PublisherAddresses.size(), timeout, false); if (m_CurrentStepMetadata == nullptr) { diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 6c63b8f46f..3e0ee9fa1d 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -27,6 +27,8 @@ DataManWriter::DataManWriter(IO &io, const std::string &name, m_MpiRank = m_Comm.Rank(); m_MpiSize = m_Comm.Size(); + m_Serializer.NewWriterBuffer(m_SerializerBufferSize); + helper::GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress); helper::GetParameter(m_IO.m_Parameters, "Port", m_Port); helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout); @@ -115,7 +117,10 @@ DataManWriter::~DataManWriter() StepStatus DataManWriter::BeginStep(StepMode mode, const float timeout_sec) { ++m_CurrentStep; - m_Serializer.NewWriterBuffer(m_SerializerBufferSize); + if (m_CombiningSteps <= m_CombinedSteps) + { + m_Serializer.NewWriterBuffer(m_SerializerBufferSize); + } if (m_MonitorActive) { @@ -152,10 +157,12 @@ void DataManWriter::EndStep() { m_SerializerBufferSize = buffer->size(); } + if (m_MonitorActive) { m_Monitor.BeginTransport(m_CurrentStep); } + if (m_DoubleBuffer || m_TransportMode == "reliable") { PushBufferQueue(buffer); @@ -165,23 +172,26 @@ void DataManWriter::EndStep() m_Publisher.Send(buffer); if (m_MonitorActive) { - m_Monitor.EndTransport(); + for (int i = 0; i < m_CombiningSteps; ++i) + { + m_Monitor.EndTransport(); + } } } - if (m_MonitorActive) - { - m_Monitor.EndStep(m_CurrentStep); - } } else { if (m_MonitorActive) { m_Monitor.BeginTransport(m_CurrentStep); - m_Monitor.EndStep(m_CurrentStep); } } + if (m_MonitorActive) + { + m_Monitor.EndStep(m_CurrentStep); + } + if (m_Verbosity >= 10) { std::cout << "DataManWriter::EndStep " << m_CurrentStep << std::endl; @@ -280,7 +290,10 @@ void DataManWriter::PublishThread() m_Publisher.Send(buffer); if (m_MonitorActive) { - m_Monitor.EndTransport(); + for (int i = 0; i < m_CombiningSteps; ++i) + { + m_Monitor.EndTransport(); + } } } } diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc index 00355293f8..9c12e1cf88 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc @@ -137,24 +137,6 @@ void DataManSerializer::PutData( metaj["E"] = m_IsLittleEndian; } - for (const auto &op : ops) - { - const auto opName = op.Op->m_Type; - if (opName == "zfp" or opName == "bzip2" or opName == "sz") - { - /* - m_CompressionParams[variable.m_Name]["CompressionMethod"] = - opName; - for (const auto &p : op.Parameters) - { - m_CompressionParams[variable.m_Name] - [opName + ":" + p.first] = p.second; - } - break; - */ - } - } - size_t datasize = 0; bool compressed = false; std::string compressionMethod; From e24b803a565e72d6eaaa899d056189e7a29255fd Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Fri, 22 Jan 2021 20:11:19 -0500 Subject: [PATCH 3/6] fixed missing steps at the end of a stream --- source/adios2/engine/dataman/DataManWriter.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 3e0ee9fa1d..812ef3ce7e 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -148,7 +148,7 @@ void DataManWriter::EndStep() ++m_CombinedSteps; - if (m_CombinedSteps == m_CombiningSteps) + if (m_CombinedSteps >= m_CombiningSteps) { m_CombinedSteps = 0; m_Serializer.AttachAttributesToLocalPack(); @@ -216,6 +216,11 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) void DataManWriter::DoClose(const int transportIndex) { + if (m_CombinedSteps != m_CombiningSteps) + { + m_CombinedSteps = m_CombiningSteps; + EndStep(); + } nlohmann::json endSignal; endSignal["FinalStep"] = static_cast(m_CurrentStep); std::string s = endSignal.dump() + '\0'; From 47c7838dcfa0a3a555a36f706d855f1f4bf73ffd Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Fri, 22 Jan 2021 20:49:41 -0500 Subject: [PATCH 4/6] fixed a bug --- .../adios2/engine/dataman/DataManWriter.cpp | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 812ef3ce7e..d514430605 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -117,7 +117,7 @@ DataManWriter::~DataManWriter() StepStatus DataManWriter::BeginStep(StepMode mode, const float timeout_sec) { ++m_CurrentStep; - if (m_CombiningSteps <= m_CombinedSteps) + if (m_CombinedSteps == 0) { m_Serializer.NewWriterBuffer(m_SerializerBufferSize); } @@ -218,9 +218,30 @@ void DataManWriter::DoClose(const int transportIndex) { if (m_CombinedSteps != m_CombiningSteps) { - m_CombinedSteps = m_CombiningSteps; - EndStep(); + m_Serializer.AttachAttributesToLocalPack(); + const auto buffer = m_Serializer.GetLocalPack(); + if (buffer->size() > m_SerializerBufferSize) + { + m_SerializerBufferSize = buffer->size(); + } + + if (m_DoubleBuffer || m_TransportMode == "reliable") + { + PushBufferQueue(buffer); + } + else + { + m_Publisher.Send(buffer); + if (m_MonitorActive) + { + for (int i = 0; i < m_CombiningSteps; ++i) + { + m_Monitor.EndTransport(); + } + } + } } + nlohmann::json endSignal; endSignal["FinalStep"] = static_cast(m_CurrentStep); std::string s = endSignal.dump() + '\0'; From 76c83dfedae4848fc5c4d236074e358b089277fc Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Sat, 23 Jan 2021 13:55:50 -0500 Subject: [PATCH 5/6] try to satisfy sanitizers --- source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp | 4 ++-- source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp b/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp index eddb269543..5487c1b325 100644 --- a/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp +++ b/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp @@ -121,7 +121,7 @@ void ZmqReqRep::SendReply(const void *reply, const size_t size) } std::shared_ptr> -ZmqReqRep::Request(const void *request, const size_t size, +ZmqReqRep::Request(const char *request, const size_t size, const std::string &address) { auto reply = std::make_shared>(); @@ -183,7 +183,7 @@ ZmqReqRep::Request(const void *request, const size_t size, return reply; } -std::shared_ptr> ZmqReqRep::Request(const void *request, +std::shared_ptr> ZmqReqRep::Request(const char *request, const size_t size) { auto reply = std::make_shared>(); diff --git a/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h b/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h index 351e3bce1b..6afa8d266f 100644 --- a/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h +++ b/source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h @@ -33,8 +33,8 @@ class ZmqReqRep void OpenRequester(const std::string &address, const int timeout, const size_t receiverBufferSize); std::shared_ptr> - Request(const void *request, const size_t size, const std::string &address); - std::shared_ptr> Request(const void *request, + Request(const char *request, const size_t size, const std::string &address); + std::shared_ptr> Request(const char *request, const size_t size); // replier From 9c3d56fbd91288faaeb63d2848337069e78758f8 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 25 Jan 2021 17:32:11 -0500 Subject: [PATCH 6/6] fixed a bug --- source/adios2/engine/dataman/DataManWriter.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index d514430605..a7c990cc00 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -216,7 +216,7 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) void DataManWriter::DoClose(const int transportIndex) { - if (m_CombinedSteps != m_CombiningSteps) + if (m_CombinedSteps < m_CombiningSteps && m_CombinedSteps > 0) { m_Serializer.AttachAttributesToLocalPack(); const auto buffer = m_Serializer.GetLocalPack();