Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make adios2_remote_server multithreaded and compress responses using … #4407

Merged
merged 2 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bindings/Python/py11Variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ size_t Variable::SelectionSize() const
return size;
}

void Variable::SetAccuracy(const adios2::Accuracy &a)
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::SetAccuracy");
m_VariableBase->SetAccuracy(a);
}

adios2::Accuracy Variable::GetAccuracy() const
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::GetAccuracy");
return m_VariableBase->GetAccuracy();
}

adios2::Accuracy Variable::GetAccuracyRequested() const
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::GetAccuracyRequested");
return m_VariableBase->GetAccuracyRequested();
}

size_t Variable::AddOperation(const Operator op, const Params &parameters)
{
helper::CheckForNullptr(m_VariableBase, "in call to Variable::AddOperation");
Expand Down
4 changes: 4 additions & 0 deletions bindings/Python/py11Variable.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Variable

size_t SelectionSize() const;

void SetAccuracy(const adios2::Accuracy &a);
adios2::Accuracy GetAccuracy() const;
adios2::Accuracy GetAccuracyRequested() const;

std::string Name() const;

std::string Type() const;
Expand Down
18 changes: 18 additions & 0 deletions bindings/Python/py11glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
m.attr("is_built_with_mpi") = false;
#endif

m.attr("L2_norm") = adios2::L2_norm;
m.attr("Linf_norm") = adios2::Linf_norm;

// enum classes
pybind11::enum_<adios2::Mode>(m, "Mode")
.value("Write", adios2::Mode::Write)
Expand Down Expand Up @@ -131,6 +134,18 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
.value("StoreData", adios2::DerivedVarType::StoreData)
.export_values();

pybind11::class_<adios2::Accuracy>(m, "Accuracy")
.def(pybind11::init<double, double, bool>())
.def_readwrite("error", &adios2::Accuracy::error)
.def_readwrite("norm", &adios2::Accuracy::norm)
.def_readwrite("relative", &adios2::Accuracy::relative)

.def("__repr__", [](const adios2::Accuracy &self) {
std::ostringstream _stream;
_stream << "(" << self.error << ", " << self.norm << ", " << self.relative << ")";
return _stream.str();
});

pybind11::class_<adios2::py11::ADIOS>(m, "ADIOS")
// Python 2
.def("__nonzero__",
Expand Down Expand Up @@ -373,6 +388,9 @@ PYBIND11_MODULE(ADIOS2_PYTHON_MODULE_NAME, m)
.def("SetBlockSelection", &adios2::py11::Variable::SetBlockSelection)
.def("SetSelection", &adios2::py11::Variable::SetSelection)
.def("SetStepSelection", &adios2::py11::Variable::SetStepSelection)
.def("SetAccuracy", &adios2::py11::Variable::SetAccuracy)
.def("GetAccuracy", &adios2::py11::Variable::GetAccuracy)
.def("GetAccuracyRequested", &adios2::py11::Variable::GetAccuracyRequested)
.def("SelectionSize", &adios2::py11::Variable::SelectionSize)
.def("Name", &adios2::py11::Variable::Name)
.def("Type", &adios2::py11::Variable::Type)
Expand Down
23 changes: 23 additions & 0 deletions python/adios2/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
accompanying file Copyright.txt for details.
"""

from adios2 import bindings


class Variable:
"""High level representation of the Variable class in the adios2.bindings"""
Expand Down Expand Up @@ -88,6 +90,18 @@ def set_step_selection(self, step_selection):
"""
self.impl.SetStepSelection(step_selection)

def set_accuracy(self, error, norm, relative):
"""
Set Accuracy for (remote) reading for this variable

Args:
error: floating point value
norm: floating point value
relative: True or False
"""
acc = bindings.Accuracy(error, norm, relative)
self.impl.SetAccuracy(acc)

def shape(self, step=None):
"""
Get the shape assigned to the given step for this variable.
Expand Down Expand Up @@ -177,6 +191,15 @@ def name(self):
"""
return self.impl.Name()

def get_accuracy(self):
"""
Get the accuracy of the variable (of its last read)

Returns:
adios2.bindings.Accuracy struct (with error, norm and relative fields).
"""
return self.impl.GetAccuracy()

def add_operation_string(self, name, params={}):
"""
Add an operation (operator) as a string
Expand Down
71 changes: 65 additions & 6 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,14 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
{
auto &Req = GetRequests[req_seq];
const DataType varType = m_IO.InquireVariableType(Req.VarName);
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);

std::string keyPrefix = m_Fingerprint + "|" + Req.VarName + std::to_string(Req.RelStep);
if (Req.BlockID != std::numeric_limits<std::size_t>::max())
{
MinVarInfo *minBlocksInfo = nullptr;
if (MinBlocksInfoMap.find(keyPrefix) == MinBlocksInfoMap.end())
{
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);
minBlocksInfo = MinBlocksInfo(*VB, Req.RelStep);
MinBlocksInfoMap[keyPrefix] = minBlocksInfo;
}
Expand Down Expand Up @@ -584,7 +584,7 @@ void BP5Reader::PerformRemoteGetsWithKVCache()
box.StartToVector(start);
box.CountToVector(count);
auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, count, start,
ReqInfo.Data);
VB->m_AccuracyRequested, ReqInfo.Data);
handles.push_back(handle);
remoteRequestsInfo.push_back(ReqInfo);
}
Expand Down Expand Up @@ -656,13 +656,72 @@ void BP5Reader::PerformRemoteGets()
std::vector<Remote::GetHandle> handles;
for (auto &Req : GetRequests)
{
auto handle =
m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
VariableBase *VB = m_BP5Deserializer->GetVariableBaseFromBP5VarRec(Req.VarRec);
auto handle = m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start,
VB->m_AccuracyRequested, Req.Data);
handles.push_back(handle);
}
for (auto &handle : handles)

size_t nHandles = handles.size();
// TP endGenerate = NOW();
// double generateTime = DURATION(startGenerate, endGenerate);

size_t nextHandle = 0;
std::mutex mutexReadRequests;

auto lf_GetNextHandle = [&]() -> size_t {
std::lock_guard<std::mutex> lockGuard(mutexReadRequests);
size_t reqidx = MaxSizeT;
if (nextHandle < nHandles)
{
reqidx = nextHandle;
++nextHandle;
}
return reqidx;
};

auto lf_WaitForGet = [&](const size_t threadID) -> bool {
while (true)
{
const auto reqidx = lf_GetNextHandle();
if (reqidx > nHandles)
{
break;
}
m_Remote->WaitForGet(handles[reqidx]);
// std::cout << "BP5Reader::PerformRemoteGets: thread " << threadID
// << " done with response " << reqidx << std::endl;
}
return true;
};

if (m_Threads > 1 && nHandles > 1)
{
m_Remote->WaitForGet(handle);
size_t nThreads = (m_Threads < nHandles ? m_Threads : nHandles);
std::vector<std::future<bool>> futures(nThreads - 1);

// launch Threads-1 threads to process subsets of handles,
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
futures[tid] = std::async(std::launch::async, lf_WaitForGet, tid + 1);
}

// main thread runs last subset of reads
lf_WaitForGet(0);

// wait for all async threads
for (auto &f : futures)
{
f.get();
}
}
else
{
for (auto &handle : handles)
{
m_Remote->WaitForGet(handle);
}
}
}

Expand Down
10 changes: 6 additions & 4 deletions source/adios2/engine/campaign/CampaignReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ void CampaignReader::PerformGets()
{
std::cout << "Campaign Reader " << m_ReaderRank << " PerformGets()\n";
}
for (auto ep : m_Engines)
{
ep->PerformGets();
}
m_NeedPerformGets = false;
}

Expand Down Expand Up @@ -451,10 +455,8 @@ std::string CampaignReader::VariableExprStr(const VariableBase &Var)
void CampaignReader::DoGetDeferred(Variable<T> &variable, T *data) \
{ \
PERFSTUBS_SCOPED_TIMER("CampaignReader::Get"); \
auto it = m_VarInternalInfo.find(variable.m_Name); \
Variable<T> *v = reinterpret_cast<Variable<T> *>(it->second.originalVar); \
Engine *e = m_Engines[it->second.engineIdx]; \
e->Get(*v, data, adios2::Mode::Deferred); \
auto p = TranslateToActualVariable(variable); \
p.second->Get(*p.first, data, adios2::Mode::Deferred); \
} \
\
std::map<size_t, std::vector<typename Variable<T>::BPInfo>> \
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/engine/campaign/CampaignReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ inline Variable<T> CampaignReader::DuplicateVariable(Variable<T> *variable, IO &
v.m_StepsCount = variable->m_StepsCount;
v.m_Start = variable->m_Start;
v.m_Count = variable->m_Count;
v.m_AccuracyRequested = variable->m_AccuracyRequested;
v.m_AccuracyProvided = variable->m_AccuracyProvided;

v.m_Engine = this; // Variable::Shape() uses this member to call engine
vii.originalVar = static_cast<void *>(variable);
Expand Down Expand Up @@ -81,6 +83,7 @@ CampaignReader::TranslateToActualVariable(Variable<T> &variable)
v->m_MemoryStart = variable.m_MemoryStart;
v->m_MemoryCount = variable.m_MemoryCount;
v->m_MemSpace = variable.m_MemSpace;
v->m_AccuracyRequested = variable.m_AccuracyRequested;
return std::make_pair(v, e);
}

Expand Down
1 change: 0 additions & 1 deletion source/adios2/operator/compress/CompressMGARD.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ size_t CompressMGARD::Operate(const char *dataIn, const Dims &blockStart, const
mgard_x::compress(mgardDim, mgardType, mgardCount, tolerance, s, errorBoundType, dataIn,
compressedData, sizeOut, config, true);
bufferOutOffset += sizeOut;

return bufferOutOffset;
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/operator/refactor/RefactorMDR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ RefactorMDR::RefactorMDR(const Params &parameters)
// config.block_size = 64;

config.dev_type = mgard_x::device_type::AUTO;
config.prefetch = false;
// config.prefetch = false;
// config.max_memory_footprint = max_memory_footprint;

config.lossless = mgard_x::lossless_type::Huffman_Zstd;
Expand Down
39 changes: 36 additions & 3 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#include "EVPathRemote.h"
#include "Remote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/core/Operator.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"
#include "adios2/helper/adiosSystem.h"
#include "adios2/operator/OperatorFactory.h"

#ifdef _MSC_VER
#define strdup(x) _strdup(x)
#endif
Expand Down Expand Up @@ -60,7 +63,32 @@ void ReadResponseHandler(CManager cm, CMConnection conn, void *vevent, void *cli
{
EVPathRemoteCommon::ReadResponseMsg read_response_msg =
static_cast<EVPathRemoteCommon::ReadResponseMsg>(vevent);
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);

switch (read_response_msg->OperatorType)
{
case adios2::core::Operator::OperatorType::COMPRESS_MGARD: {
auto op = adios2::core::MakeOperator("mgard", {});
op->InverseOperate(read_response_msg->ReadData, read_response_msg->Size,
(char *)read_response_msg->Dest);
break;
}

case adios2::core::Operator::OperatorType::COMPRESS_ZFP: {
auto op = adios2::core::MakeOperator("zfp", {});
op->InverseOperate(read_response_msg->ReadData, read_response_msg->Size,
(char *)read_response_msg->Dest);
break;
}

case adios2::core::Operator::OperatorType::COMPRESS_NULL:
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);
break;
default:
helper::Throw<std::invalid_argument>("Remote", "EVPathRemote", "ReadResponseHandler",
"Invalid operator type " +
std::to_string(read_response_msg->OperatorType) +
" received in response");
}
CMCondition_signal(cm, read_response_msg->ReadResponseCondition);
return;
};
Expand Down Expand Up @@ -157,7 +185,7 @@ void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port
}

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
Dims &Start, Accuracy &accuracy, void *dest)
{
EVPathRemoteCommon::_GetRequestMsg GetMsg;
memset(&GetMsg, 0, sizeof(GetMsg));
Expand All @@ -169,6 +197,9 @@ EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t Blo
GetMsg.DimCount = (int)Count.size();
GetMsg.Count = Count.data();
GetMsg.Start = Start.data();
GetMsg.Error = accuracy.error;
GetMsg.Norm = accuracy.norm;
GetMsg.Relative = accuracy.relative;
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
return (Remote::GetHandle)(intptr_t)GetMsg.GetResponseCondition;
Expand All @@ -190,7 +221,9 @@ EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);
int result = CMCondition_wait(ev_state.cm, (int)(intptr_t)handle);

return result;
}
#else

Expand Down
3 changes: 2 additions & 1 deletion source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class EVPathRemote : public Remote

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);
GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
Accuracy &accuracy, void *dest);

bool WaitForGet(GetHandle handle);

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void Remote::OpenSimpleFile(const std::string hostname, const int32_t port,
};

Remote::GetHandle Remote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest)
Accuracy &accuracy, void *dest)
{
ThrowUp("RemoteGet");
return (Remote::GetHandle)(intptr_t)0;
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Remote
typedef void *GetHandle;

virtual GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start,
void *dest);
Accuracy &accuracy, void *dest);

virtual bool WaitForGet(GetHandle handle);

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/toolkit/remote/XrootdRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ bool XrootdRemote::WaitForGet(GetHandle handle)
}

Remote::GetHandle XrootdRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
Dims &Start, Accuracy &accuracy, void *dest)
{
#ifdef ADIOS2_HAVE_XROOTD
char rName[512] = "/etc";
Expand Down
Loading
Loading