Skip to content

Commit

Permalink
Rewrite BP4Serializer::DeserializeIndicesPerRankThreads hoping XL com…
Browse files Browse the repository at this point in the history
…piler will work with it.
  • Loading branch information
pnorbert committed Jul 8, 2019
1 parent 3a9726f commit 09b910e
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 23 deletions.
171 changes: 148 additions & 23 deletions source/adios2/toolkit/format/bp4/BP4Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1081,13 +1081,14 @@ std::vector<char> BP4Serializer::SerializeIndices(
}

std::unordered_map<std::string, std::vector<BP4Base::SerialElementIndex>>
BP4Serializer::DeserializeIndicesPerRankThreads(
BP4Serializer::DeserializeIndicesPerRankSingleThread(
const std::vector<char> &serialized, MPI_Comm comm,
const bool isRankConstant) const noexcept
{
std::unordered_map<std::string, std::vector<SerialElementIndex>>
deserialized;

/*
auto lf_Deserialize_no_mutex = [&](const int rankSource,
const size_t serializedPosition,
const bool isRankConstant) {
Expand Down Expand Up @@ -1139,7 +1140,152 @@ BP4Serializer::DeserializeIndicesPerRankThreads(
SerialElementIndex &index = deserializedIndexes->at(rankSource);
helper::InsertToBuffer(index.Buffer, &serialized[serializedPosition],
bufferSize);
};
};*/

// BODY OF FUNCTION starts here
const size_t serializedSize = serialized.size();
int rank;
SMPI_Comm_rank(comm, &rank);

if (rank != 0 || serializedSize < 8)
{
return deserialized;
}

size_t serializedPosition = 0;

while (serializedPosition < serializedSize - 4)
{
const int rankSource = static_cast<int>(
helper::ReadValue<uint32_t>(serialized, serializedPosition));

if (serializedPosition <= serializedSize)
{
// lf_Deserialize_no_mutex(rankSource, serializedPosition,
// isRankConstant);
size_t localPosition = serializedPosition;

ElementIndexHeader header =
ReadElementIndexHeader(serialized, localPosition);

if (!isRankConstant || deserialized.count(header.Name) != 1)
{

std::vector<BP4Base::SerialElementIndex> *deserializedIndexes;

// deserializedIndexes =
// &(deserialized.emplace(std::piecewise_construct,
// std::forward_as_tuple(header.Name),
// std::forward_as_tuple(
// m_SizeMPI,
// SerialElementIndex(header.MemberID,
// 0))).first->second);

auto search = deserialized.find(header.Name);
if (search == deserialized.end())
{
// variable does not exist, we need to add it
deserializedIndexes =
&(deserialized
.emplace(std::piecewise_construct,
std::forward_as_tuple(header.Name),
std::forward_as_tuple(
m_SizeMPI, SerialElementIndex(
header.MemberID, 0)))
.first->second);
// std::cout << "rank " << rankSource << ": did not find "
// << header.Name << ", added it" << std::endl;
}
else
{
deserializedIndexes = &(search->second);
// std::cout << "rank " << rankSource << ": found " <<
// header.Name
// << std::endl;
}

const size_t bufferSize =
static_cast<size_t>(header.Length) + 4;
SerialElementIndex &index = deserializedIndexes->at(rankSource);
helper::InsertToBuffer(
index.Buffer, &serialized[serializedPosition], bufferSize);
}
}

const size_t bufferSize = static_cast<size_t>(
helper::ReadValue<uint32_t>(serialized, serializedPosition));
serializedPosition += bufferSize;
}

return deserialized;
}

std::unordered_map<std::string, std::vector<BP4Base::SerialElementIndex>>
BP4Serializer::DeserializeIndicesPerRankThreads(
const std::vector<char> &serialized, MPI_Comm comm,
const bool isRankConstant) const noexcept
{
if (m_Threads == 1)
{
return BP4Serializer::DeserializeIndicesPerRankSingleThread(
serialized, comm, isRankConstant);
}

std::unordered_map<std::string, std::vector<SerialElementIndex>>
deserialized;
/*
auto lf_Deserialize_no_mutex = [&](const int rankSource,
const size_t serializedPosition,
const bool isRankConstant) {
size_t localPosition = serializedPosition;
ElementIndexHeader header =
ReadElementIndexHeader(serialized, localPosition);
if (isRankConstant)
{
if (deserialized.count(header.Name) == 1)
{
return;
}
}
std::vector<BP4Base::SerialElementIndex> *deserializedIndexes;
// deserializedIndexes =
// &(deserialized.emplace(std::piecewise_construct,
// std::forward_as_tuple(header.Name),
// std::forward_as_tuple(
// m_SizeMPI, SerialElementIndex(header.MemberID,
// 0))).first->second);
auto search = deserialized.find(header.Name);
if (search == deserialized.end())
{
// variable does not exist, we need to add it
deserializedIndexes =
&(deserialized
.emplace(std::piecewise_construct,
std::forward_as_tuple(header.Name),
std::forward_as_tuple(
m_SizeMPI,
SerialElementIndex(header.MemberID, 0)))
.first->second);
// std::cout << "rank " << rankSource << ": did not find " <<
// header.Name << ", added it" << std::endl;
}
else
{
deserializedIndexes = &(search->second);
// std::cout << "rank " << rankSource << ": found " << header.Name
// << std::endl;
}
const size_t bufferSize = static_cast<size_t>(header.Length) + 4;
SerialElementIndex &index = deserializedIndexes->at(rankSource);
helper::InsertToBuffer(index.Buffer, &serialized[serializedPosition],
bufferSize);
};*/

auto lf_Deserialize = [&](const int rankSource,
const size_t serializedPosition,
Expand Down Expand Up @@ -1188,27 +1334,6 @@ BP4Serializer::DeserializeIndicesPerRankThreads(

size_t serializedPosition = 0;

if (m_Threads == 1)
{
while (serializedPosition < serializedSize)
{
const int rankSource = static_cast<int>(
helper::ReadValue<uint32_t>(serialized, serializedPosition));

if (serializedPosition <= serializedSize)
{
lf_Deserialize_no_mutex(rankSource, serializedPosition,
isRankConstant);
}

const size_t bufferSize = static_cast<size_t>(
helper::ReadValue<uint32_t>(serialized, serializedPosition));
serializedPosition += bufferSize;
}

return deserialized;
}

std::vector<std::future<void>> asyncs(m_Threads);
std::vector<size_t> asyncPositions(m_Threads);
std::vector<int> asyncRankSources(m_Threads);
Expand Down
8 changes: 8 additions & 0 deletions source/adios2/toolkit/format/bp4/BP4Serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ class BP4Serializer : public BP4Base
MPI_Comm comm,
const bool isRankConstant) const noexcept;

/** private function called by DeserializeIndicesPerRankThreads
* in case of a single thread
*/
std::unordered_map<std::string, std::vector<SerialElementIndex>>
DeserializeIndicesPerRankSingleThread(const std::vector<char> &serialized,
MPI_Comm comm,
const bool isRankConstant) const
noexcept;
/**
* Merge indices by time step (default) and write to m_HeapBuffer.m_Metadata
* @param nameRankIndices
Expand Down

0 comments on commit 09b910e

Please sign in to comment.