diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index aa203a7582..5168f47fc3 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -56,7 +56,7 @@ add_library(adios2_core toolkit/format/buffer/Buffer.cpp toolkit/format/buffer/BufferV.cpp toolkit/format/buffer/malloc/MallocV.cpp -# toolkit/format/buffer/chunk/ChunkV.cpp + toolkit/format/buffer/chunk/ChunkV.cpp toolkit/format/buffer/heap/BufferSTL.cpp toolkit/format/bp/BPBase.cpp toolkit/format/bp/BPBase.tcc diff --git a/source/adios2/common/ADIOSTypes.h b/source/adios2/common/ADIOSTypes.h index 6317daff36..25f4a28473 100644 --- a/source/adios2/common/ADIOSTypes.h +++ b/source/adios2/common/ADIOSTypes.h @@ -185,8 +185,12 @@ constexpr uint64_t DefaultMaxBufferSize = MaxSizeT - 1; constexpr float DefaultBufferGrowthFactor = 1.05f; /** default Buffer Chunk Size - * 2Gb - 100Kb (tolerance)*/ -constexpr uint64_t DefaultBufferChunkSize = 2147381248; + * 16Mb */ +constexpr uint64_t DefaultBufferChunkSize = 16 * 1024 * 1024; + +/** default minimum size not copying deferred writes + * 4Mb */ +constexpr size_t DefaultMinDeferredSize = 4 * 1024 * 1024; /** default size for writing/reading files using POSIX/fstream/stdio write * 2Gb - 100Kb (tolerance)*/ diff --git a/source/adios2/engine/bp5/BP5Engine.cpp b/source/adios2/engine/bp5/BP5Engine.cpp index 0c8c4c635f..c2ac897c36 100644 --- a/source/adios2/engine/bp5/BP5Engine.cpp +++ b/source/adios2/engine/bp5/BP5Engine.cpp @@ -237,6 +237,32 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) return false; }; + auto lf_SetBufferVTypeParameter = [&](const std::string key, int ¶meter, + int def) { + auto itKey = io.m_Parameters.find(key); + parameter = def; + if (itKey != io.m_Parameters.end()) + { + std::string value = itKey->second; + std::transform(value.begin(), value.end(), value.begin(), + ::tolower); + if (value == "malloc") + { + parameter = (int)BufferVType::MallocVType; + } + else if (value == "chunk") + { + parameter = (int)BufferVType::ChunkVType; + } + else + { + throw std::invalid_argument( + "ERROR: Unknown BP5 BufferVType parameter \"" + value + + "\" (must be \"malloc\" or \"chunk\""); + } + } + }; + #define get_params(Param, Type, Typedecl, Default) \ lf_Set##Type##Parameter(#Param, Params.Param, Default); BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params); diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index fb00fd1121..609197f5c4 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -84,6 +84,15 @@ class BP5Engine std::string GetBPVersionFileName(const std::string &name) const noexcept; + enum class BufferVType + { + MallocVType, + ChunkVType, + Auto + }; + + BufferVType UseBufferV = BufferVType::ChunkVType; + #define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \ MACRO(OpenTimeoutSecs, Int, int, 3600) \ MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \ @@ -97,7 +106,9 @@ class BP5Engine MACRO(AsyncTasks, Bool, bool, true) \ MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \ MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \ + MACRO(MinDeferredSize, SizeBytes, size_t, DefaultMinDeferredSize) \ MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \ + MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \ MACRO(ReaderShortCircuitReads, Bool, bool, false) struct BP5Params diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 594d4a47a0..aab245698c 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -12,6 +12,7 @@ #include "adios2/common/ADIOSMacros.h" #include "adios2/core/IO.h" #include "adios2/helper/adiosFunctions.h" //CheckIndexRange +#include "adios2/toolkit/format/buffer/chunk/ChunkV.h" #include "adios2/toolkit/format/buffer/malloc/MallocV.h" #include "adios2/toolkit/transport/file/FileFStream.h" #include @@ -44,9 +45,17 @@ BP5Writer::BP5Writer(IO &io, const std::string &name, const Mode mode, StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds) { m_WriterStep++; - m_BP5Serializer.InitStep(new MallocV("BP5Writer", false, - m_Parameters.InitialBufferSize, - m_Parameters.GrowthFactor)); + if (m_Parameters.BufferVType == (int)BufferVType::MallocVType) + { + m_BP5Serializer.InitStep(new MallocV("BP5Writer", false, + m_Parameters.InitialBufferSize, + m_Parameters.GrowthFactor)); + } + else + { + m_BP5Serializer.InitStep(new ChunkV("BP5Writer", true /* always copy */, + m_Parameters.BufferChunkSize)); + } return StepStatus::OK; } diff --git a/source/adios2/engine/bp5/BP5Writer.tcc b/source/adios2/engine/bp5/BP5Writer.tcc index cd0169ffaf..5adf31ac66 100644 --- a/source/adios2/engine/bp5/BP5Writer.tcc +++ b/source/adios2/engine/bp5/BP5Writer.tcc @@ -48,7 +48,7 @@ void BP5Writer::PutCommon(Variable &variable, const T *values, bool sync) /* If arrays is small, force copying to internal buffer to aggregate * small writes */ size_t n = helper::GetTotalSize(variable.m_Count) * sizeof(T); - if (n < 4194304 /* 4MB */) + if (n < m_Parameters.MinDeferredSize) { sync = true; } diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp new file mode 100644 index 0000000000..30f3615460 --- /dev/null +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.cpp @@ -0,0 +1,108 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BufferV.cpp + * + */ + +#include "ChunkV.h" +#include "adios2/toolkit/format/buffer/BufferV.h" +#include +#include + +namespace adios2 +{ +namespace format +{ + +ChunkV::ChunkV(const std::string type, const bool AlwaysCopy, + const size_t ChunkSize) +: BufferV(type, AlwaysCopy), m_ChunkSize(ChunkSize) +{ +} + +ChunkV::~ChunkV() +{ + for (const auto &Chunk : m_Chunks) + { + free((void *)Chunk); + } +} + +size_t ChunkV::AddToVec(const size_t size, const void *buf, int align, + bool CopyReqd) +{ + int badAlign = CurOffset % align; + if (badAlign) + { + int addAlign = align - badAlign; + char zero[16] = {0}; + AddToVec(addAlign, zero, 1, true); + } + size_t retOffset = CurOffset; + + if (size == 0) + return CurOffset; + + if (!CopyReqd && !m_AlwaysCopy) + { + // just add buf to internal version of output vector + VecEntry entry = {true, buf, 0, size}; + DataV.push_back(entry); + } + else + { + // we can possibly append this entry to the last if the last was + // internal + bool AppendPossible = DataV.size() && !DataV.back().External; + + if (AppendPossible && (m_TailChunkPos + size > m_ChunkSize)) + { + // No room in current chunk, close it out + // realloc down to used size (helpful?) and set size in array + m_Chunks.back() = (char *)realloc(m_Chunks.back(), m_TailChunkPos); + + m_TailChunkPos = 0; + m_TailChunk = NULL; + AppendPossible = false; + } + if (AppendPossible) + { + // We can use current chunk, just append the data; + memcpy(m_TailChunk + m_TailChunkPos, buf, size); + DataV.back().Size += size; + m_TailChunkPos += size; + } + else + { + // We need a new chunk, get the larger of size or m_ChunkSize + size_t NewSize = m_ChunkSize; + if (size > m_ChunkSize) + NewSize = size; + m_TailChunk = (char *)malloc(NewSize); + memcpy(m_TailChunk, buf, size); + m_TailChunkPos = size; + VecEntry entry = {false, m_TailChunk, 0, size}; + DataV.push_back(entry); + } + } + CurOffset = retOffset + size; + return retOffset; +} + +ChunkV::BufferV_iovec ChunkV::DataVec() noexcept +{ + BufferV_iovec ret = new iovec[DataV.size() + 1]; + for (std::size_t i = 0; i < DataV.size(); ++i) + { + // For ChunkV, all entries in DataV are actual iov entries. + ret[i].iov_base = DataV[i].Base; + ret[i].iov_len = DataV[i].Size; + } + ret[DataV.size()] = {NULL, 0}; + return ret; +} + +} // end namespace format +} // end namespace adios2 diff --git a/source/adios2/toolkit/format/buffer/chunk/ChunkV.h b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h new file mode 100644 index 0000000000..c0eb02a8bf --- /dev/null +++ b/source/adios2/toolkit/format/buffer/chunk/ChunkV.h @@ -0,0 +1,45 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + */ + +#ifndef ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_CHUNKV_H_ +#define ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_CHUNKV_H_ + +#include "adios2/common/ADIOSConfig.h" +#include "adios2/common/ADIOSTypes.h" + +#include "adios2/toolkit/format/buffer/BufferV.h" + +namespace adios2 +{ +namespace format +{ + +class ChunkV : public BufferV +{ +public: + uint64_t Size() noexcept; + + const size_t m_ChunkSize; + + ChunkV(const std::string type, const bool AlwaysCopy = false, + const size_t ChunkSize = DefaultBufferChunkSize); + virtual ~ChunkV(); + + virtual BufferV_iovec DataVec() noexcept; + + virtual size_t AddToVec(const size_t size, const void *buf, int align, + bool CopyReqd); + +private: + std::vector m_Chunks; + size_t m_TailChunkPos = 0; + char *m_TailChunk = NULL; +}; + +} // end namespace format +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_FORMAT_BUFFER_MALLOC_MALLOCV_H_ */