Skip to content

Commit

Permalink
Merge pull request #2824 from JasonRuonanWang/mhs
Browse files Browse the repository at this point in the history
make multiblock work in MHS Engine and fix the multi-tier test
  • Loading branch information
JasonRuonanWang authored Aug 18, 2021
2 parents 7bd9a24 + 80451df commit 43dbaf9
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 157 deletions.
18 changes: 15 additions & 3 deletions source/adios2/core/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,23 @@ size_t Operator::Decompress(const void *bufferIn, const size_t sizeIn,

size_t Operator::Decompress(const void * /*bufferIn*/, const size_t /*sizeIn*/,
void * /*dataOut*/, const Dims & /*dimensions*/,
DataType /*type*/, const Params & /*parameters*/)
const DataType /*type*/,
const Params & /*parameters*/)
{
throw std::invalid_argument("ERROR: signature (const void*, const "
"size_t, void*, const Dims&, const "
"std::string ) not supported "
"size_t, void*, const Dims&, const DataType, "
"const Params) not supported "
"by derived class implemented with " +
m_Type + ", in call to Decompress\n");
}

size_t Operator::Decompress(const void * /*bufferIn*/, const size_t /*sizeIn*/,
void * /*dataOut*/, const Dims & /*start*/,
const Dims & /*count*/, const DataType /*type*/)
{
throw std::invalid_argument("ERROR: signature (const void*, const "
"size_t, void*, const Dims&, const Dims&, "
"const DataType) not supported "
"by derived class implemented with " +
m_Type + ", in call to Decompress\n");
}
Expand Down
16 changes: 15 additions & 1 deletion source/adios2/core/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,21 @@ class Operator
*/
virtual size_t Decompress(const void *bufferIn, const size_t sizeIn,
void *dataOut, const Dims &dimensions,
DataType type, const Params &parameters);
const DataType type, const Params &parameters);

/**
* Sirius signature
* @param bufferIn
* @param sizeIn
* @param dataOut
* @param start
* @param count
* @param type
* @return
*/
virtual size_t Decompress(const void *bufferIn, const size_t sizeIn,
void *dataOut, const Dims &start,
const Dims &count, const DataType type);

virtual bool IsDataTypeValid(const DataType type) const = 0;

Expand Down
8 changes: 6 additions & 2 deletions source/adios2/engine/mhs/MhsReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ inline void MhsReader::GetDeferredCommon(Variable<std::string> &variable,
template <class T>
void MhsReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
m_SubEngines[0]->Get(variable, data, Mode::Sync);
for (int i = 1; i < m_Tiers; ++i)
for (int i = 0; i < m_Tiers; ++i)
{
auto var = m_SubIOs[i]->InquireVariable<T>(variable.m_Name);
if (!var)
{
break;
}
var->SetSelection({variable.m_Start, variable.m_Count});
m_SubEngines[i]->Get(*var, data, Mode::Sync);
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/mhs/MhsWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ MhsWriter::MhsWriter(IO &io, const std::string &name, const Mode mode,
}
else if (itCompressor->second == "sirius")
{
params.emplace("tiers", std::to_string(m_Tiers));
params.emplace("Tiers", std::to_string(m_Tiers));
m_OperatorMap.emplace(
itVar->second,
std::make_shared<compress::CompressSirius>(params));
Expand Down
45 changes: 31 additions & 14 deletions source/adios2/operator/compress/CompressSirius.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ namespace core
namespace compress
{

int CompressSirius::m_CurrentTier = 0;
int CompressSirius::m_Tiers = 0;
std::unordered_map<std::string, int> CompressSirius::m_CurrentTierMap;
std::vector<std::unordered_map<std::string, std::vector<char>>>
CompressSirius::m_TierBuffersMap;
int CompressSirius::m_CurrentTier;
std::vector<std::vector<char>> CompressSirius::m_TierBuffers;
int CompressSirius::m_Tiers = 0;

CompressSirius::CompressSirius(const Params &parameters)
: Operator("sirius", parameters)
{
helper::GetParameter(parameters, "tiers", m_Tiers);
helper::GetParameter(parameters, "Tiers", m_Tiers);
m_TierBuffersMap.resize(m_Tiers);
m_TierBuffers.resize(m_Tiers);
}

Expand Down Expand Up @@ -64,38 +68,51 @@ size_t CompressSirius::Compress(const void *dataIn, const Dims &dimensions,
}

size_t CompressSirius::Decompress(const void *bufferIn, const size_t sizeIn,
void *dataOut, const Dims &dimensions,
DataType type, const Params &parameters)
void *dataOut, const Dims &start,
const Dims &count, DataType type)
{
size_t outputBytes = std::accumulate(dimensions.begin(), dimensions.end(),
size_t outputBytes = std::accumulate(count.begin(), count.end(),
helper::GetDataTypeSize(type),
std::multiplies<size_t>());

std::string blockId =
helper::DimsToString(start) + helper::DimsToString(count);

// decompress data and copy back to m_TierBuffers
size_t bytesPerTier = outputBytes / m_Tiers;
m_TierBuffers[m_CurrentTier].resize(bytesPerTier);
std::memcpy(m_TierBuffers[m_CurrentTier].data(), bufferIn, bytesPerTier);
auto &currentBuffer = m_TierBuffersMap[m_CurrentTierMap[blockId]][blockId];
auto &currentTier = m_CurrentTierMap[blockId];
currentBuffer.resize(bytesPerTier);
std::memcpy(currentBuffer.data(), bufferIn, bytesPerTier);

// if called from the final tier, then merge all tier buffers and copy back
// to dataOut
size_t accumulatedBytes = 0;
if (m_CurrentTier == m_Tiers - 1)
if (currentTier == m_Tiers - 1)
{
for (const auto &b : m_TierBuffers)
for (auto &bmap : m_TierBuffersMap)
{
auto &b = bmap[blockId];
std::memcpy(reinterpret_cast<char *>(dataOut) + accumulatedBytes,
b.data(), b.size());
accumulatedBytes += b.size();
}
}

m_CurrentTier++;
if (m_CurrentTier % m_Tiers == 0)
currentTier++;
if (currentTier % m_Tiers == 0)
{
m_CurrentTier = 0;
currentTier = 0;
}

return outputBytes;
if (currentTier == 0)
{
return outputBytes;
}
else
{
return 0;
}
}

bool CompressSirius::IsDataTypeValid(const DataType type) const
Expand Down
14 changes: 11 additions & 3 deletions source/adios2/operator/compress/CompressSirius.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define ADIOS2_OPERATOR_COMPRESS_COMPRESSSIRIUS_H_

#include "adios2/core/Operator.h"
#include <unordered_map>

namespace adios2
{
Expand All @@ -35,15 +36,22 @@ class CompressSirius : public Operator
using Operator::Decompress;

size_t Decompress(const void *bufferIn, const size_t sizeIn, void *dataOut,
const Dims &dimensions, DataType type,
const Params &parameters) final;
const Dims &start, const Dims &count,
DataType type) final;

bool IsDataTypeValid(const DataType type) const final;

private:
static int m_Tiers;

// for compress
static std::vector<std::vector<char>> m_TierBuffers;
static int m_CurrentTier;
static int m_Tiers;

// for decompress
static std::vector<std::unordered_map<std::string, std::vector<char>>>
m_TierBuffersMap;
static std::unordered_map<std::string, int> m_CurrentTierMap;
};

} // end namespace compress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,9 @@ void BPSirius::GetData(const char *input,
{
core::compress::CompressSirius op((Params()));
op.Decompress(input, blockOperationInfo.PayloadSize, dataOutput,
blockOperationInfo.PreCount,
blockOperationInfo.PreStart, blockOperationInfo.PreCount,
helper::GetDataTypeFromString(
blockOperationInfo.Info.at("PreDataType")),
blockOperationInfo.Info);
blockOperationInfo.Info.at("PreDataType")));
}

} // end namespace format
Expand Down
163 changes: 163 additions & 0 deletions testing/adios2/engine/mhs/TestMhsCommon.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include <adios2.h>
#include <gtest/gtest.h>
#include <iostream>
#include <numeric>

using namespace adios2;

int printed_lines = 0;

template <class T>
void PrintData(const T *data, const size_t step, const Dims &start,
const Dims &count, const std::string &var,
const int to_print_lines)
{
size_t size =
std::accumulate(count.begin(), count.end(), static_cast<size_t>(1),
std::multiplies<size_t>());
std::cout << " Step: " << step << " Size:" << size << "\n";
size_t printsize = 1024;

if (size < printsize)
{
printsize = size;
}
size_t s = 0;
for (size_t i = 0; i < printsize; ++i)
{
++s;
std::cout << data[i] << " ";
if (s == count[1])
{
std::cout << " <--- Variable " << var << ", Step " << step
<< std::endl;
s = 0;
}
}

std::cout << "]" << std::endl;
}

template <class T>
void GenDataRecursive(std::vector<size_t> start, std::vector<size_t> count,
std::vector<size_t> shape, size_t n0, size_t y,
std::vector<std::complex<T>> &vec, const size_t step)
{
for (size_t i = 0; i < count[0]; i++)
{
size_t i0 = n0 * count[0] + i;
size_t z = y * shape[0] + (i + start[0]);

auto start_next = start;
auto count_next = count;
auto shape_next = shape;
start_next.erase(start_next.begin());
count_next.erase(count_next.begin());
shape_next.erase(shape_next.begin());

if (start_next.size() == 1)
{
for (size_t j = 0; j < count_next[0]; j++)
{
vec[i0 * count_next[0] + j] = {
static_cast<T>(z * shape_next[0] + (j + start_next[0]) +
step),
1};
}
}
else
{
GenDataRecursive(start_next, count_next, shape_next, i0, z, vec,
step);
}
}
}

template <class T>
void GenDataRecursive(std::vector<size_t> start, std::vector<size_t> count,
std::vector<size_t> shape, size_t n0, size_t y,
std::vector<T> &vec, const size_t step)
{
for (size_t i = 0; i < count[0]; i++)
{
size_t i0 = n0 * count[0] + i;
size_t z = y * shape[0] + (i + start[0]);

auto start_next = start;
auto count_next = count;
auto shape_next = shape;
start_next.erase(start_next.begin());
count_next.erase(count_next.begin());
shape_next.erase(shape_next.begin());

if (start_next.size() == 1)
{
for (size_t j = 0; j < count_next[0]; j++)
{
vec[i0 * count_next[0] + j] = static_cast<T>(
z * shape_next[0] + (j + start_next[0]) + step);
}
}
else
{
GenDataRecursive(start_next, count_next, shape_next, i0, z, vec,
step);
}
}
}

template <class T>
void GenData(std::vector<T> &vec, const size_t step,
const std::vector<size_t> &start, const std::vector<size_t> &count,
const std::vector<size_t> &shape)
{
size_t total_size =
std::accumulate(count.begin(), count.end(), static_cast<size_t>(1),
std::multiplies<size_t>());
vec.resize(total_size);
GenDataRecursive(start, count, shape, 0, 0, vec, step);
}

template <class T>
void VerifyData(const std::complex<T> *data, size_t step, const Dims &start,
const Dims &count, const Dims &shape, const std::string &var,
const int to_print_lines = 0, const int rank = 0)
{
size_t size =
std::accumulate(count.begin(), count.end(), static_cast<size_t>(1),
std::multiplies<size_t>());
std::vector<std::complex<T>> tmpdata(size);
GenData(tmpdata, step, start, count, shape);
for (size_t i = 0; i < size; ++i)
{
ASSERT_EQ(data[i], tmpdata[i]) << "Step " << step << " Variable " << var
<< " at " << i << std::endl;
}
if (rank == 0 && printed_lines < to_print_lines)
{
PrintData(data, step, start, count, var, to_print_lines);
++printed_lines;
}
}

template <class T>
void VerifyData(const T *data, size_t step, const Dims &start,
const Dims &count, const Dims &shape, const std::string &var,
const int to_print_lines = 0, const int rank = 0)
{
size_t size =
std::accumulate(count.begin(), count.end(), static_cast<size_t>(1),
std::multiplies<size_t>());
std::vector<T> tmpdata(size);
if (rank == 0 && printed_lines < to_print_lines)
{
PrintData(data, step, start, count, var, to_print_lines);
++printed_lines;
}
GenData(tmpdata, step, start, count, shape);
for (size_t i = 0; i < size; ++i)
{
ASSERT_EQ(data[i], tmpdata[i]) << "Step " << step << " Variable " << var
<< " at " << i << std::endl;
}
}
Loading

0 comments on commit 43dbaf9

Please sign in to comment.