diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index b90e197d6c..8314515cf9 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -15,7 +15,7 @@ #include "adios2/core/Engine.h" #include "adios2/engine/dataman/DataManMonitor.h" -#include "adios2/toolkit/format/dataman/DataManSerializer.h" +#include "adios2/toolkit/format/dataman/DataManSerializer.tcc" #include "adios2/toolkit/zmq/zmqpubsub/ZmqPubSub.h" #include "adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h" diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.cpp b/source/adios2/toolkit/format/dataman/DataManSerializer.cpp index fd9abfa11d..7a205ea791 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.cpp +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.cpp @@ -8,7 +8,6 @@ * Author: Jason Wang */ -#include "DataManSerializer.h" #include "DataManSerializer.tcc" #include @@ -22,8 +21,7 @@ namespace format DataManSerializer::DataManSerializer(helper::Comm const &comm, const bool isRowMajor) : m_Comm(comm), m_IsRowMajor(isRowMajor), - m_IsLittleEndian(helper::IsLittleEndian()), - m_DeferredRequestsToSend(std::make_shared()) + m_IsLittleEndian(helper::IsLittleEndian()) { m_MpiRank = m_Comm.Rank(); m_MpiSize = m_Comm.Size(); @@ -64,230 +62,6 @@ VecPtr DataManSerializer::GetLocalPack() return m_LocalBuffer; } -void DataManSerializer::AggregateMetadata() -{ - TAU_SCOPED_TIMER_FUNC(); - - m_ProtectedStepsMutex.lock(); - for (const auto &idMap : m_ProtectedStepsToAggregate) - { - m_MetadataJson["P"][std::to_string(idMap.first)] = idMap.second; - } - m_ProtectedStepsMutex.unlock(); - - auto localJsonPack = SerializeJson(m_MetadataJson); - unsigned int size = localJsonPack->size(); - unsigned int maxSize; - m_Comm.Allreduce(&size, &maxSize, 1, helper::Comm::Op::Max); - maxSize += sizeof(uint64_t); - localJsonPack->resize(maxSize, '\0'); - *(reinterpret_cast(localJsonPack->data() + - localJsonPack->size()) - - 1) = size; - - std::vector globalJsonStr(m_MpiSize * maxSize); - m_Comm.Allgather(localJsonPack->data(), maxSize, globalJsonStr.data(), - maxSize); - - nlohmann::json aggMetadata; - - for (int i = 0; i < m_MpiSize; ++i) - { - size_t deserializeSize = - *(reinterpret_cast(globalJsonStr.data() + - (i + 1) * maxSize) - - 1); - nlohmann::json metaj = DeserializeJson( - globalJsonStr.data() + i * maxSize, deserializeSize); - for (auto stepMapIt = metaj.begin(); stepMapIt != metaj.end(); - ++stepMapIt) - { - if (stepMapIt.key() == "P") - { - std::lock_guard l(m_ProtectedStepsMutex); - for (auto appidMapIt = stepMapIt.value().begin(); - appidMapIt != stepMapIt.value().end(); ++appidMapIt) - { - auto stepVecToAdd = appidMapIt->get>(); - auto &stepVecExisted = - m_ProtectedStepsAggregated[stoull(appidMapIt.key())]; - for (const auto &protectedStep : stepVecToAdd) - { - auto it = - std::find(stepVecExisted.begin(), - stepVecExisted.end(), protectedStep); - if (it == stepVecExisted.end()) - { - stepVecExisted.push_back(protectedStep); - } - } - std::sort(stepVecExisted.begin(), stepVecExisted.end()); - } - if (m_Verbosity >= 5) - { - std::cout << "Rank "; - std::cout << m_MpiRank; - std::cout << " All protected steps aggregated before " - "reducing are: "; - for (const auto &stepVecPair : m_ProtectedStepsAggregated) - { - for (const auto &step : stepVecPair.second) - { - std::cout << step << ", "; - } - } - std::cout << std::endl; - } - for (auto &stepVecPair : m_ProtectedStepsAggregated) - { - while (stepVecPair.second.size() > 3) - { - stepVecPair.second.erase(stepVecPair.second.begin()); - } - } - if (m_Verbosity >= 5) - { - std::cout << "Rank "; - std::cout << m_MpiRank; - std::cout << " All protected steps aggregated after " - "reducing are: "; - for (const auto &stepVecPair : m_ProtectedStepsAggregated) - { - for (const auto &step : stepVecPair.second) - { - std::cout << step << ", "; - } - } - std::cout << std::endl; - } - } - else - { - for (auto rankMapIt = stepMapIt.value().begin(); - rankMapIt != stepMapIt.value().end(); ++rankMapIt) - { - aggMetadata[stepMapIt.key()][rankMapIt.key()] = - rankMapIt.value(); - } - } - } - } - - m_AggregatedMetadataJsonMutex.lock(); - for (auto stepMapIt = aggMetadata.begin(); stepMapIt != aggMetadata.end(); - ++stepMapIt) - { - m_AggregatedMetadataJson[stepMapIt.key()] = stepMapIt.value(); - } - m_AggregatedMetadataJsonMutex.unlock(); -} - -VecPtr DataManSerializer::GetAggregatedMetadataPack(const int64_t stepRequested, - int64_t &stepProvided, - const int64_t appID) -{ - - TAU_SCOPED_TIMER_FUNC(); - - std::lock_guard l(m_AggregatedMetadataJsonMutex); - - VecPtr ret = nullptr; - - stepProvided = -1; - - if (stepRequested == -1) // getting the earliest step - { - int64_t min = std::numeric_limits::max(); - for (auto stepMapIt = m_AggregatedMetadataJson.begin(); - stepMapIt != m_AggregatedMetadataJson.end(); ++stepMapIt) - { - int64_t step = stoll(stepMapIt.key()); - if (min > step) - { - min = step; - } - } - if (min < std::numeric_limits::max()) - { - nlohmann::json retJ; - retJ[std::to_string(min)] = - m_AggregatedMetadataJson[std::to_string(min)]; - ret = SerializeJson(retJ); - stepProvided = min; - } - } - else if (stepRequested == -2) // getting the latest step - { - int64_t max = std::numeric_limits::min(); - for (auto stepMapIt = m_AggregatedMetadataJson.begin(); - stepMapIt != m_AggregatedMetadataJson.end(); ++stepMapIt) - { - int64_t step = stoll(stepMapIt.key()); - if (max < step) - { - max = step; - } - } - if (max >= 0) - { - nlohmann::json retJ; - retJ[std::to_string(max)] = - m_AggregatedMetadataJson[std::to_string(max)]; - ret = SerializeJson(retJ); - stepProvided = max; - } - } - else if (stepRequested == -3) // getting static variables - { - ret = SerializeJson(m_StaticDataJson); - } - else if (stepRequested == -4) // getting all steps - { - ret = SerializeJson(m_AggregatedMetadataJson); - } - else - { - auto it = m_AggregatedMetadataJson.find(std::to_string(stepRequested)); - if (it != m_AggregatedMetadataJson.end()) - { - nlohmann::json retJ; - retJ[std::to_string(stepRequested)] = *it; - ret = SerializeJson(retJ); - stepProvided = stepRequested; - } - } - - return ret; -} - -void DataManSerializer::PutAggregatedMetadata(VecPtr input, - helper::Comm const &comm) -{ - TAU_SCOPED_TIMER_FUNC(); - if (input == nullptr) - { - Log(1, - "DataManSerializer::PutAggregatedMetadata received nullptr input", - true, true); - return; - } - - comm.BroadcastVector(*input); - - if (input->size() > 0) - { - nlohmann::json metaJ = DeserializeJson(input->data(), input->size()); - JsonToVarMap(metaJ, nullptr); - - if (m_Verbosity >= 100) - { - std::cout << "DataManSerializer::PutAggregatedMetadata: " - << std::endl; - std::cout << metaJ.dump(4) << std::endl; - } - } -} - bool DataManSerializer::IsCompressionAvailable(const std::string &method, DataType type, const Dims &count) { @@ -655,297 +429,6 @@ DmvVecPtrMap DataManSerializer::GetFullMetadataMap() return m_DataManVarMap; } -DmvVecPtr DataManSerializer::GetStepMetadata(const size_t step) -{ - TAU_SCOPED_TIMER_FUNC(); - std::lock_guard l(m_DataManVarMapMutex); - auto it = m_DataManVarMap.find(step); - if (it != m_DataManVarMap.end()) - { - return it->second; - } - return nullptr; -} - -int DataManSerializer::PutDeferredRequest(const std::string &variable, - const size_t step, const Dims &start, - const Dims &count, void *data) -{ - - TAU_SCOPED_TIMER_FUNC(); - DmvVecPtr varVec; - - m_DataManVarMapMutex.lock(); - auto stepVecIt = m_DataManVarMap.find(step); - if (stepVecIt == m_DataManVarMap.end()) - { - // aggregated metadata does not have this step - std::cout << "aggregated metadata does not have Step " << step - << std::endl; - return -1; - } - else - { - varVec = stepVecIt->second; - } - m_DataManVarMapMutex.unlock(); - - std::unordered_map jmap; - - for (const auto &var : *varVec) - { - if (var.name == variable) - { - if (var.start.size() != start.size() || - var.count.size() != count.size() || - start.size() != count.size()) - { - throw("DataManSerializer::PutDeferredRequest() requested " - "start, count and shape do not match"); - } - bool toContinue = false; - for (size_t i = 0; i < start.size(); ++i) - { - if (start[i] >= var.start[i] + var.count[i] || - start[i] + count[i] <= var.start[i]) - { - toContinue = true; - } - } - if (toContinue) - { - continue; - } - - jmap[var.address].emplace_back(); - nlohmann::json &j = jmap[var.address].back(); - j["N"] = variable; - j["O"] = var.start; - j["C"] = var.count; - j["T"] = step; - } - } - - for (const auto &i : jmap) - { - auto charVec = (*m_DeferredRequestsToSend)[i.first]; - if (charVec == nullptr) - { - charVec = std::make_shared>(); - } - nlohmann::json jsonSer; - if (charVec->size() > 0) - { - jsonSer = DeserializeJson(charVec->data(), charVec->size()); - } - for (auto j = i.second.begin(); j != i.second.end(); ++j) - { - jsonSer.push_back(*j); - } - (*m_DeferredRequestsToSend)[i.first] = SerializeJson(jsonSer); - } - - return 0; -} - -DeferredRequestMapPtr DataManSerializer::GetDeferredRequest() -{ - TAU_SCOPED_TIMER_FUNC(); - auto t = m_DeferredRequestsToSend; - m_DeferredRequestsToSend = std::make_shared(); - return t; -} - -VecPtr DataManSerializer::GenerateReply( - const std::vector &request, size_t &step, - const std::unordered_map &compressionParams) -{ - TAU_SCOPED_TIMER_FUNC(); - auto replyMetaJ = std::make_shared(); - auto replyLocalBuffer = - std::make_shared>(sizeof(uint64_t) * 2); - - nlohmann::json metaj; - try - { - metaj = DeserializeJson(request.data(), request.size()); - } - catch (std::exception &e) - { - Log(1, - "DataManSerializer::GenerateReply() received staging request " - "but failed to deserialize due to " + - std::string(e.what()), - true, true); - step = std::numeric_limits::max(); - return replyLocalBuffer; - } - - for (const auto &req : metaj) - { - std::string variable = req["N"].get(); - Dims start = req["O"].get(); - Dims count = req["C"].get(); - step = req["T"].get(); - - DmvVecPtr varVec; - - { - std::lock_guard l(m_DataManVarMapMutex); - auto itVarVec = m_DataManVarMap.find(step); - if (itVarVec == m_DataManVarMap.end()) - { - Log(1, - "DataManSerializer::GenerateReply() received staging " - "request but DataManVarMap does not have Step " + - std::to_string(step), - true, true); - if (m_Verbosity >= 1) - { - std::string msg = - "DataManSerializer::GenerateReply() current steps are "; - for (auto s : m_DataManVarMap) - { - msg += std::to_string(s.first) + ", "; - } - Log(1, msg, true, true); - } - return replyLocalBuffer; - } - else - { - varVec = itVarVec->second; - if (varVec == nullptr) - { - Log(1, - "DataManSerializer::GenerateReply() received " - "staging request but DataManVarMap contains a " - "nullptr for Step " + - std::to_string(step), - true, true); - return replyLocalBuffer; - } - } - } - for (const auto &var : *varVec) - { - if (var.name == variable) - { - Params compressionParamsVar; - auto compressionParamsIter = compressionParams.find(var.name); - if (compressionParamsIter != compressionParams.end()) - { - compressionParamsVar = compressionParamsIter->second; - } - Dims ovlpStart, ovlpCount; - bool ovlp = CalculateOverlap(var.start, var.count, start, count, - ovlpStart, ovlpCount); - if (ovlp) - { - std::vector tmpBuffer; - if (var.type == DataType::Compound) - { - throw("Compound type is not supported yet."); - } -#define declare_type(T) \ - else if (var.type == helper::GetDataType()) \ - { \ - tmpBuffer.reserve(std::accumulate(ovlpCount.begin(), ovlpCount.end(), \ - sizeof(T), \ - std::multiplies())); \ - GetData(reinterpret_cast(tmpBuffer.data()), variable, ovlpStart, \ - ovlpCount, step); \ - PutData(reinterpret_cast(tmpBuffer.data()), variable, var.shape, \ - ovlpStart, ovlpCount, ovlpStart, ovlpCount, var.doid, step, \ - var.rank, var.address, compressionParamsVar, replyLocalBuffer, \ - replyMetaJ); \ - } - ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) -#undef declare_type - - auto metapack = SerializeJson(*replyMetaJ); - size_t metasize = metapack->size(); - (reinterpret_cast( - replyLocalBuffer->data()))[0] = - replyLocalBuffer->size(); - (reinterpret_cast( - replyLocalBuffer->data()))[1] = metasize; - replyLocalBuffer->resize(replyLocalBuffer->size() + - metasize); - std::memcpy(replyLocalBuffer->data() + - replyLocalBuffer->size() - metasize, - metapack->data(), metasize); - } - } - } - } - if (m_Verbosity >= 1) - { - if (replyLocalBuffer->size() <= 16) - { - std::cout << "DataManSerializer::GenerateReply returns a buffer " - "with size " - << replyLocalBuffer->size() - << ", which means no data is contained in the buffer. " - "This will cause the deserializer to unpack incorrect " - "data for Step " - << step << "." << std::endl; - } - } - return replyLocalBuffer; -} - -bool DataManSerializer::CalculateOverlap(const Dims &inStart, - const Dims &inCount, - const Dims &outStart, - const Dims &outCount, Dims &ovlpStart, - Dims &ovlpCount) -{ - TAU_SCOPED_TIMER_FUNC(); - - if (inStart.size() != inCount.size() || - outStart.size() != outCount.size() || inStart.size() != outStart.size()) - { - return false; - } - if (ovlpStart.size() != inStart.size()) - { - ovlpStart.resize(inStart.size()); - } - if (ovlpCount.size() != inStart.size()) - { - ovlpCount.resize(inStart.size()); - } - for (size_t i = 0; i < inStart.size(); ++i) - { - if (inStart[i] + inCount[i] <= outStart[i]) - { - return false; - } - if (outStart[i] + outCount[i] <= inStart[i]) - { - return false; - } - if (inStart[i] < outStart[i]) - { - ovlpStart[i] = outStart[i]; - } - else - { - ovlpStart[i] = inStart[i]; - } - if (inStart[i] + inCount[i] < outStart[i] + outCount[i]) - { - ovlpCount[i] = inStart[i] + inCount[i] - ovlpStart[i]; - } - else - { - ovlpCount[i] = outStart[i] + outCount[i] - ovlpStart[i]; - } - } - return true; -} - size_t DataManSerializer::LocalBufferSize() { return m_LocalBuffer->size(); } VecPtr DataManSerializer::SerializeJson(const nlohmann::json &message) diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.h b/source/adios2/toolkit/format/dataman/DataManSerializer.h index ed01065d68..a0fa110dc0 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.h +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.h @@ -116,23 +116,9 @@ class DataManSerializer // attach attributes to local pack void AttachAttributesToLocalPack(); - // aggregate metadata across all writer ranks and put it into map - void AggregateMetadata(); - - // get aggregated metadata pack for sending from staging writer to staging - // reader - VecPtr GetAggregatedMetadataPack(const int64_t stepRequested, - int64_t &stepProvided, - const int64_t appID); - // put local metadata and data buffer together and return the merged buffer VecPtr GetLocalPack(); - // generate reply on staging writer based on the request from reader - VecPtr GenerateReply( - const std::vector &request, size_t &step, - const std::unordered_map &compressionParams); - // ************ deserializer functions // put binary pack for deserialization @@ -154,18 +140,6 @@ class DataManSerializer // deserializer DmvVecPtrMap GetFullMetadataMap(); - DmvVecPtr GetStepMetadata(const size_t step); - - void PutAggregatedMetadata(VecPtr input, helper::Comm const &comm); - - int PutDeferredRequest(const std::string &variable, const size_t step, - const Dims &start, const Dims &count, void *data); - DeferredRequestMapPtr GetDeferredRequest(); - - bool CalculateOverlap(const Dims &inStart, const Dims &inCount, - const Dims &outStart, const Dims &outCount, - Dims &ovlpStart, Dims &ovlpCount); - void SetDestination(const std::string &dest); std::string GetDestination(); @@ -228,10 +202,6 @@ class DataManSerializer DmvVecPtrMap m_DataManVarMap; std::mutex m_DataManVarMapMutex; - std::unordered_map> m_ProtectedStepsToAggregate; - std::unordered_map> m_ProtectedStepsAggregated; - std::mutex m_ProtectedStepsMutex; - // used to count buffers that have been put into deserializer, asynchronous // engines such as dataman use this to tell if a certain step has received // all blocks from all writers @@ -248,10 +218,6 @@ class DataManSerializer std::mutex m_StaticDataJsonMutex; bool m_StaticDataFinished = false; - // for generating deferred requests, only accessed from reader main thread, - // does not need mutex - DeferredRequestMapPtr m_DeferredRequestsToSend; - // string, msgpack, cbor, ubjson std::string m_UseJsonSerialization = "string"; diff --git a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc index 8414eb3124..81fe3a9e28 100644 --- a/source/adios2/toolkit/format/dataman/DataManSerializer.tcc +++ b/source/adios2/toolkit/format/dataman/DataManSerializer.tcc @@ -238,155 +238,6 @@ void DataManSerializer::PutData(const T *inputData, const std::string &varName, true, true); } -template -bool DataManSerializer::PutZfp(nlohmann::json &metaj, size_t &datasize, - const T *inputData, const Dims &varCount, - const Params ¶ms) -{ - TAU_SCOPED_TIMER_FUNC(); -#ifdef ADIOS2_HAVE_ZFP - Params p; - for (const auto &i : params) - { - std::string prefix = i.first.substr(0, 4); - if (prefix == "zfp:" || prefix == "Zfp:" || prefix == "ZFP:") - { - std::string key = i.first.substr(4); - metaj[i.first] = i.second; - p[key] = i.second; - } - } - core::compress::CompressZFP compressor(p); - m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), - sizeof(T), - std::multiplies())); - try - { - Params info; - datasize = compressor.Compress(inputData, varCount, sizeof(T), - helper::GetDataType(), - m_CompressBuffer.data(), p, info); - return true; - } - catch (std::exception &e) - { - std::cout << "Got exception " << e.what() - << " from ZFP. Turned off compression." << std::endl; - } -#else - throw(std::invalid_argument( - "ZFP compression used but ZFP library is not linked to ADIOS2")); -#endif - return false; -} - -template -bool DataManSerializer::PutSz(nlohmann::json &metaj, size_t &datasize, - const T *inputData, const Dims &varCount, - const Params ¶ms) -{ - TAU_SCOPED_TIMER_FUNC(); -#ifdef ADIOS2_HAVE_SZ - Params p; - for (const auto &i : params) - { - std::string prefix = i.first.substr(0, 3); - if (prefix == "sz:" || prefix == "Sz:" || prefix == "SZ:") - { - std::string key = i.first.substr(3); - metaj[i.first] = i.second; - p[key] = i.second; - } - } - m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), - sizeof(T), - std::multiplies())); - core::compress::CompressSZ compressor(p); - try - { - Params info; - datasize = compressor.Compress(inputData, varCount, sizeof(T), - helper::GetDataType(), - m_CompressBuffer.data(), p, info); - return true; - } - catch (std::exception &e) - { - std::cout << "Got exception " << e.what() - << " from SZ. Turned off compression." << std::endl; - } -#else - throw(std::invalid_argument( - "SZ compression used but SZ library is not linked to ADIOS2")); -#endif - return false; -} - -template -bool DataManSerializer::PutBZip2(nlohmann::json &metaj, size_t &datasize, - const T *inputData, const Dims &varCount, - const Params ¶ms) -{ - TAU_SCOPED_TIMER_FUNC(); -#ifdef ADIOS2_HAVE_BZIP2 - Params p; - for (const auto &i : params) - { - std::string prefix = i.first.substr(0, 6); - if (prefix == "bzip2:" || prefix == "Bzip2:" || prefix == "BZip2:" || - prefix == "BZIP2:") - { - std::string key = i.first.substr(6); - metaj[i.first] = i.second; - p[key] = i.second; - } - } - m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), - sizeof(T), - std::multiplies())); - core::compress::CompressBZIP2 compressor(p); - try - { - Params info; - datasize = compressor.Compress(inputData, varCount, sizeof(T), - helper::GetDataType(), - m_CompressBuffer.data(), p, info); - return true; - } - catch (std::exception &e) - { - std::cout << "Got exception " << e.what() - << " from BZip2. Turned off compression." << std::endl; - } -#else - throw(std::invalid_argument( - "BZip2 compression used but BZip2 library is not linked to ADIOS2")); -#endif - return false; -} - -template -void DataManSerializer::PutAttribute(const core::Attribute &attribute) -{ - TAU_SCOPED_TIMER_FUNC(); - nlohmann::json staticVar; - staticVar["N"] = attribute.m_Name; - staticVar["Y"] = ToString(attribute.m_Type); - staticVar["V"] = attribute.m_IsSingleValue; - if (attribute.m_IsSingleValue) - { - staticVar["G"] = attribute.m_DataSingleValue; - } - else - { - staticVar["G"] = attribute.m_DataArray; - } - - m_StaticDataJsonMutex.lock(); - m_StaticDataJson["S"].emplace_back(std::move(staticVar)); - m_StaticDataJsonMutex.unlock(); -} - template int DataManSerializer::GetData(T *outputData, const std::string &varName, const Dims &varStart, const Dims &varCount, @@ -566,6 +417,155 @@ int DataManSerializer::GetData(T *outputData, const std::string &varName, return 0; } +template +bool DataManSerializer::PutZfp(nlohmann::json &metaj, size_t &datasize, + const T *inputData, const Dims &varCount, + const Params ¶ms) +{ + TAU_SCOPED_TIMER_FUNC(); +#ifdef ADIOS2_HAVE_ZFP + Params p; + for (const auto &i : params) + { + std::string prefix = i.first.substr(0, 4); + if (prefix == "zfp:" || prefix == "Zfp:" || prefix == "ZFP:") + { + std::string key = i.first.substr(4); + metaj[i.first] = i.second; + p[key] = i.second; + } + } + core::compress::CompressZFP compressor(p); + m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), + sizeof(T), + std::multiplies())); + try + { + Params info; + datasize = compressor.Compress(inputData, varCount, sizeof(T), + helper::GetDataType(), + m_CompressBuffer.data(), p, info); + return true; + } + catch (std::exception &e) + { + std::cout << "Got exception " << e.what() + << " from ZFP. Turned off compression." << std::endl; + } +#else + throw(std::invalid_argument( + "ZFP compression used but ZFP library is not linked to ADIOS2")); +#endif + return false; +} + +template +bool DataManSerializer::PutSz(nlohmann::json &metaj, size_t &datasize, + const T *inputData, const Dims &varCount, + const Params ¶ms) +{ + TAU_SCOPED_TIMER_FUNC(); +#ifdef ADIOS2_HAVE_SZ + Params p; + for (const auto &i : params) + { + std::string prefix = i.first.substr(0, 3); + if (prefix == "sz:" || prefix == "Sz:" || prefix == "SZ:") + { + std::string key = i.first.substr(3); + metaj[i.first] = i.second; + p[key] = i.second; + } + } + m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), + sizeof(T), + std::multiplies())); + core::compress::CompressSZ compressor(p); + try + { + Params info; + datasize = compressor.Compress(inputData, varCount, sizeof(T), + helper::GetDataType(), + m_CompressBuffer.data(), p, info); + return true; + } + catch (std::exception &e) + { + std::cout << "Got exception " << e.what() + << " from SZ. Turned off compression." << std::endl; + } +#else + throw(std::invalid_argument( + "SZ compression used but SZ library is not linked to ADIOS2")); +#endif + return false; +} + +template +bool DataManSerializer::PutBZip2(nlohmann::json &metaj, size_t &datasize, + const T *inputData, const Dims &varCount, + const Params ¶ms) +{ + TAU_SCOPED_TIMER_FUNC(); +#ifdef ADIOS2_HAVE_BZIP2 + Params p; + for (const auto &i : params) + { + std::string prefix = i.first.substr(0, 6); + if (prefix == "bzip2:" || prefix == "Bzip2:" || prefix == "BZip2:" || + prefix == "BZIP2:") + { + std::string key = i.first.substr(6); + metaj[i.first] = i.second; + p[key] = i.second; + } + } + m_CompressBuffer.reserve(std::accumulate(varCount.begin(), varCount.end(), + sizeof(T), + std::multiplies())); + core::compress::CompressBZIP2 compressor(p); + try + { + Params info; + datasize = compressor.Compress(inputData, varCount, sizeof(T), + helper::GetDataType(), + m_CompressBuffer.data(), p, info); + return true; + } + catch (std::exception &e) + { + std::cout << "Got exception " << e.what() + << " from BZip2. Turned off compression." << std::endl; + } +#else + throw(std::invalid_argument( + "BZip2 compression used but BZip2 library is not linked to ADIOS2")); +#endif + return false; +} + +template +void DataManSerializer::PutAttribute(const core::Attribute &attribute) +{ + TAU_SCOPED_TIMER_FUNC(); + nlohmann::json staticVar; + staticVar["N"] = attribute.m_Name; + staticVar["Y"] = ToString(attribute.m_Type); + staticVar["V"] = attribute.m_IsSingleValue; + if (attribute.m_IsSingleValue) + { + staticVar["G"] = attribute.m_DataSingleValue; + } + else + { + staticVar["G"] = attribute.m_DataArray; + } + + m_StaticDataJsonMutex.lock(); + m_StaticDataJson["S"].emplace_back(std::move(staticVar)); + m_StaticDataJsonMutex.unlock(); +} + } // namespace format } // namespace adios2