Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataman step combination #2593

Merged
merged 6 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions source/adios2/engine/dataman/DataManMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ void DataManMonitor::EndTransport()
{
auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
(std::chrono::system_clock::now() -
m_TransportTimers.back().second))
m_TransportTimers.front().second))
.count();
if (m_Verbose)
{
std::lock_guard<std::mutex> l(m_PrintMutex);
std::cout << "Step " << m_TransportTimers.back().first
std::cout << "Step " << m_TransportTimers.front().first
<< ", Latency milliseconds "
<< static_cast<double>(latency) / 1000.0 << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ StepStatus DataManReader::BeginStep(StepMode stepMode,
}

m_CurrentStepMetadata = m_Serializer.GetEarliestLatestStep(
m_CurrentStep, m_PublisherAddresses.size(), timeout, true);
m_CurrentStep, m_PublisherAddresses.size(), timeout, false);

if (m_CurrentStepMetadata == nullptr)
{
Expand Down
83 changes: 67 additions & 16 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
m_MpiRank = m_Comm.Rank();
m_MpiSize = m_Comm.Size();

m_Serializer.NewWriterBuffer(m_SerializerBufferSize);

helper::GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress);
helper::GetParameter(m_IO.m_Parameters, "Port", m_Port);
helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
Expand All @@ -36,6 +38,7 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);
helper::GetParameter(m_IO.m_Parameters, "TransportMode", m_TransportMode);
helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive);
helper::GetParameter(m_IO.m_Parameters, "CombiningSteps", m_CombiningSteps);

if (m_IPAddress.empty())
{
Expand Down Expand Up @@ -114,7 +117,10 @@ DataManWriter::~DataManWriter()
StepStatus DataManWriter::BeginStep(StepMode mode, const float timeout_sec)
{
++m_CurrentStep;
m_Serializer.NewWriterBuffer(m_SerializerBufferSize);
if (m_CombinedSteps == 0)
{
m_Serializer.NewWriterBuffer(m_SerializerBufferSize);
}

if (m_MonitorActive)
{
Expand All @@ -140,28 +146,44 @@ void DataManWriter::EndStep()
m_Serializer.PutAttributes(m_IO);
}

m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}
++m_CombinedSteps;

if (m_MonitorActive)
if (m_CombinedSteps >= m_CombiningSteps)
{
m_Monitor.BeginTransport(m_CurrentStep);
}
m_CombinedSteps = 0;
m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
if (m_MonitorActive)
{
m_Monitor.BeginTransport(m_CurrentStep);
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
m_Monitor.EndTransport();
m_Monitor.BeginTransport(m_CurrentStep);
}
}

Expand Down Expand Up @@ -194,6 +216,32 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)

void DataManWriter::DoClose(const int transportIndex)
{
if (m_CombinedSteps < m_CombiningSteps && m_CombinedSteps > 0)
{
m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}

nlohmann::json endSignal;
endSignal["FinalStep"] = static_cast<int64_t>(m_CurrentStep);
std::string s = endSignal.dump() + '\0';
Expand Down Expand Up @@ -268,7 +316,10 @@ void DataManWriter::PublishThread()
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
m_Monitor.EndTransport();
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class DataManWriter : public Engine
bool m_DoubleBuffer = false;
std::string m_TransportMode = "fast";
bool m_MonitorActive = false;
int m_CombiningSteps = 1;
int m_CombinedSteps = 0;

std::string m_AllAddresses;
std::string m_PublisherAddress;
Expand Down
18 changes: 0 additions & 18 deletions source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,6 @@ void DataManSerializer::PutData(
metaj["E"] = m_IsLittleEndian;
}

for (const auto &op : ops)
{
const auto opName = op.Op->m_Type;
if (opName == "zfp" or opName == "bzip2" or opName == "sz")
{
/*
m_CompressionParams[variable.m_Name]["CompressionMethod"] =
opName;
for (const auto &p : op.Parameters)
{
m_CompressionParams[variable.m_Name]
[opName + ":" + p.first] = p.second;
}
break;
*/
}
}

size_t datasize = 0;
bool compressed = false;
std::string compressionMethod;
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void ZmqReqRep::SendReply(const void *reply, const size_t size)
}

std::shared_ptr<std::vector<char>>
ZmqReqRep::Request(const void *request, const size_t size,
ZmqReqRep::Request(const char *request, const size_t size,
const std::string &address)
{
auto reply = std::make_shared<std::vector<char>>();
Expand Down Expand Up @@ -183,7 +183,7 @@ ZmqReqRep::Request(const void *request, const size_t size,
return reply;
}

std::shared_ptr<std::vector<char>> ZmqReqRep::Request(const void *request,
std::shared_ptr<std::vector<char>> ZmqReqRep::Request(const char *request,
const size_t size)
{
auto reply = std::make_shared<std::vector<char>>();
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ZmqReqRep
void OpenRequester(const std::string &address, const int timeout,
const size_t receiverBufferSize);
std::shared_ptr<std::vector<char>>
Request(const void *request, const size_t size, const std::string &address);
std::shared_ptr<std::vector<char>> Request(const void *request,
Request(const char *request, const size_t size, const std::string &address);
std::shared_ptr<std::vector<char>> Request(const char *request,
const size_t size);

// replier
Expand Down