Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bp5 directio #3052

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,48 @@ if(ADIOS2_HAVE_MPI)
add_definitions(-DOMPI_SKIP_MPICXX -DMPICH_SKIP_MPICXX)
endif()


#------------------------------------------------------------------------------#
# POSIX O_DIRECT is only working for Unix in adios for now
#------------------------------------------------------------------------------#
if(CYGWIN)
#-tb O_DIRECT messes up cygwin
set(ADIOS2_HAVE_O_DIRECT 0)
elseif(MSVC)
# Windows has other things but we are not using them
set(ADIOS2_HAVE_O_DIRECT 0)
elseif(APPLE)
# Mac has other things but we are not using them
set(ADIOS2_HAVE_O_DIRECT 0)
else()

message(STATUS "Checking for O_DIRECT")
include(CheckCXXSourceCompiles)
check_cxx_source_compiles("
#include <unistd.h>
#include <fcntl.h>
int main(int argc, char * argv[]) { argc = O_DIRECT; }
" O_DIRECT_WORKS)

if (O_DIRECT_WORKS)
set(ADIOS2_HAVE_O_DIRECT 1)
else()
set(ADIOS2_HAVE_O_DIRECT 0)
endif()

endif()

#if(NOT HAVE_O_DIRECT)
# message(WARNING " ----- The open() flag O_DIRECT is not available! ---- ")
#else()
# message(STATUS " ----- The open() flag O_DIRECT is available! ---- ")
#endif()


set(ADIOS2_CONFIG_OPTS
BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME SysVShMem ZeroMQ Profiling Endian_Reverse
BP5 DataMan DataSpaces HDF5 HDF5_VOL MHS SST CUDA Fortran MPI Python Blosc BZip2 LIBPRESSIO MGARD PNG SZ ZFP DAOS IME SysVShMem ZeroMQ Profiling Endian_Reverse O_DIRECT
)

GenerateADIOSHeaderConfig(${ADIOS2_CONFIG_OPTS})
configure_file(
${PROJECT_SOURCE_DIR}/CTestCustom.cmake.in
Expand Down Expand Up @@ -229,6 +268,7 @@ if(BUILD_SHARED_LIBS AND ADIOS2_RUN_INSTALL_TEST)
endif()
endif()


#------------------------------------------------------------------------------#
# Third party libraries
#------------------------------------------------------------------------------#
Expand Down
2 changes: 2 additions & 0 deletions examples/basics/globalArray/globalArray_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ int main(int argc, char *argv[])
io.SetParameter("NumAggregators", "1");
io.SetParameter("NumSubFiles", "1");
io.SetParameter("AsyncWrite", "Guided");
io.SetParameter("AsyncOpen", "true");
io.SetParameter("O_DIRECT", "true");

/*
* Define global array: type, name, global dimensions
Expand Down
5 changes: 4 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ class BP5Engine
MACRO(CollectiveMetadata, Bool, bool, true) \
MACRO(NumAggregators, UInt, unsigned int, 0) \
MACRO(NumSubFiles, UInt, unsigned int, 999999) \
MACRO(FileSystemPageSize, UInt, unsigned int, 4096) \
MACRO(StripeSize, UInt, unsigned int, 4096) \
MACRO(DirectIO, Bool, bool, true) \
MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \
MACRO(DirectIOAlignBuffer, UInt, unsigned int, 0) \
MACRO(AggregationType, AggregationType, int, \
(int)AggregationType::TwoLevelShm) \
MACRO(AsyncOpen, Bool, bool, true) \
Expand Down
82 changes: 63 additions & 19 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)

if (m_Parameters.BufferVType == (int)BufferVType::MallocVType)
{
m_BP5Serializer.InitStep(new MallocV("BP5Writer", false,
m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
m_BP5Serializer.InitStep(new MallocV(
"BP5Writer", false, m_BP5Serializer.m_BufferAlign,
m_BP5Serializer.m_BufferBlockSize, m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
}
else
{
m_BP5Serializer.InitStep(new ChunkV("BP5Writer",
false /* always copy */,
m_Parameters.BufferChunkSize));
m_BP5Serializer.InitStep(new ChunkV(
"BP5Writer", false, m_BP5Serializer.m_BufferAlign,
m_BP5Serializer.m_BufferBlockSize, m_Parameters.BufferChunkSize));
}
m_ThisTimestepDataSize = 0;

Expand All @@ -118,7 +119,8 @@ void BP5Writer::PerformPuts()
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::PerformPuts");
m_Profiler.Start("PP");
m_BP5Serializer.PerformPuts();
m_BP5Serializer.PerformPuts(m_Parameters.AsyncWrite ||
m_Parameters.DirectIO);
m_Profiler.Stop("PP");
return;
}
Expand Down Expand Up @@ -269,8 +271,8 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data,
}

// align to PAGE_SIZE
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.FileSystemPageSize);
m_DataPos +=
helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize);
m_StartDataPos = m_DataPos;

if (!SerializedWriters && a->m_Comm.Rank() < a->m_Comm.Size() - 1)
Expand Down Expand Up @@ -467,8 +469,8 @@ void BP5Writer::EndStep()
MarshalAttributes();

// true: advances step
auto TSInfo =
m_BP5Serializer.CloseTimestep(m_WriterStep, m_Parameters.AsyncWrite);
auto TSInfo = m_BP5Serializer.CloseTimestep(
m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */
Expand Down Expand Up @@ -589,14 +591,38 @@ void BP5Writer::InitParameters()
m_Parameters.NumSubFiles = m_Parameters.NumAggregators;
}

if (m_Parameters.FileSystemPageSize == 0)
if (m_Parameters.StripeSize == 0)
{
m_Parameters.FileSystemPageSize = 4096;
m_Parameters.StripeSize = 4096;
}
if (m_Parameters.FileSystemPageSize > 67108864)

if (m_Parameters.StripeSize > 67108864)
{
// Limiting to max 64MB page size
m_Parameters.FileSystemPageSize = 67108864;
m_Parameters.StripeSize = 67108864;
}

if (m_Parameters.DirectIO)
{
if (m_Parameters.DirectIOAlignBuffer == 0)
{
m_Parameters.DirectIOAlignBuffer = m_Parameters.DirectIOAlignOffset;
}
m_BP5Serializer.m_BufferBlockSize = m_Parameters.DirectIOAlignOffset;
m_BP5Serializer.m_BufferAlign = m_Parameters.DirectIOAlignBuffer;
if (m_Parameters.StripeSize % m_Parameters.DirectIOAlignOffset)
{
size_t k =
m_Parameters.StripeSize / m_Parameters.DirectIOAlignOffset + 1;
m_Parameters.StripeSize = k * m_Parameters.DirectIOAlignOffset;
}
if (m_Parameters.BufferChunkSize % m_Parameters.DirectIOAlignOffset)
{
size_t k = m_Parameters.BufferChunkSize /
m_Parameters.DirectIOAlignOffset +
1;
m_Parameters.BufferChunkSize = k * m_Parameters.DirectIOAlignOffset;
}
}
}

Expand Down Expand Up @@ -931,6 +957,14 @@ void BP5Writer::InitTransports()
}
}

if (m_Parameters.DirectIO)
{
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["DirectIO"] = "true";
}
}

bool useProfiler = true;

if (m_IAmWritingData)
Expand All @@ -953,6 +987,11 @@ void BP5Writer::InitTransports()

if (m_Comm.Rank() == 0)
{
// force turn off directio to metadata files
for (size_t i = 0; i < m_IO.m_TransportsParameters.size(); ++i)
{
m_IO.m_TransportsParameters[i]["DirectIO"] = "false";
}
m_FileMetaMetadataManager.OpenFiles(m_MetaMetadataFileNames, m_OpenMode,
m_IO.m_TransportsParameters,
useProfiler);
Expand Down Expand Up @@ -1303,14 +1342,19 @@ void BP5Writer::FlushData(const bool isFinal)
if (m_Parameters.BufferVType == (int)BufferVType::MallocVType)
{
DataBuf = m_BP5Serializer.ReinitStepData(
new MallocV("BP5Writer", false, m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
new MallocV("BP5Writer", false, m_BP5Serializer.m_BufferAlign,
m_BP5Serializer.m_BufferBlockSize,
m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor),
m_Parameters.AsyncWrite || m_Parameters.DirectIO);
}
else
{
DataBuf = m_BP5Serializer.ReinitStepData(
new ChunkV("BP5Writer", false /* always copy */,
m_Parameters.BufferChunkSize));
new ChunkV("BP5Writer", false, m_BP5Serializer.m_BufferAlign,
m_BP5Serializer.m_BufferBlockSize,
m_Parameters.BufferChunkSize),
m_Parameters.AsyncWrite || m_Parameters.DirectIO);
}

auto databufsize = DataBuf->Size();
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/engine/bp5/BP5Writer_EveryoneWrites_Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ void BP5Writer::WriteData_EveryoneWrites_Async(format::BufferV *Data,
}

// align to PAGE_SIZE
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.FileSystemPageSize);
m_DataPos +=
helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize);
m_StartDataPos = m_DataPos;

if (a->m_Comm.Rank() < a->m_Comm.Size() - 1)
Expand Down
16 changes: 11 additions & 5 deletions source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)
// to arrive from the rank below

// align to PAGE_SIZE (only valid on master aggregator at this point)
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.FileSystemPageSize);
m_DataPos +=
helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize);

// Each aggregator needs to know the total size they write
// This calculation is valid on aggregators only
Expand All @@ -59,7 +59,13 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)

if (a->m_Comm.Size() > 1)
{
a->CreateShm(static_cast<size_t>(maxSize), m_Parameters.MaxShmSize);
size_t alignment_size = sizeof(max_align_t);
if (m_Parameters.DirectIO)
{
alignment_size = m_Parameters.DirectIOAlignOffset;
}
a->CreateShm(static_cast<size_t>(maxSize), m_Parameters.MaxShmSize,
alignment_size);
}

shm::TokenChain<uint64_t> tokenChain(&a->m_Comm);
Expand All @@ -74,8 +80,8 @@ void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data)
&m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0,
"AggregatorChain token in BP5Writer::WriteData_TwoLevelShm");
// align to PAGE_SIZE
m_DataPos += helper::PaddingToAlignOffset(
m_DataPos, m_Parameters.FileSystemPageSize);
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.StripeSize);
}
m_StartDataPos = m_DataPos; // metadata needs this info
if (a->m_AggregatorChainComm.Rank() <
Expand Down
16 changes: 11 additions & 5 deletions source/adios2/engine/bp5/BP5Writer_TwoLevelShm_Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data)
// to arrive from the rank below

// align to PAGE_SIZE (only valid on master aggregator at this point)
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.FileSystemPageSize);
m_DataPos +=
helper::PaddingToAlignOffset(m_DataPos, m_Parameters.StripeSize);

// Each aggregator needs to know the total size they write
// This calculation is valid on aggregators only
Expand All @@ -196,7 +196,13 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data)

if (a->m_Comm.Size() > 1)
{
a->CreateShm(static_cast<size_t>(maxSize), m_Parameters.MaxShmSize);
size_t alignment_size = sizeof(max_align_t);
if (m_Parameters.DirectIO)
{
alignment_size = m_Parameters.DirectIOAlignOffset;
}
a->CreateShm(static_cast<size_t>(maxSize), m_Parameters.MaxShmSize,
alignment_size);
}

if (a->m_IsAggregator)
Expand All @@ -209,8 +215,8 @@ void BP5Writer::WriteData_TwoLevelShm_Async(format::BufferV *Data)
&m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0,
"AggregatorChain token in BP5Writer::WriteData_TwoLevelShm");
// align to PAGE_SIZE
m_DataPos += helper::PaddingToAlignOffset(
m_DataPos, m_Parameters.FileSystemPageSize);
m_DataPos += helper::PaddingToAlignOffset(m_DataPos,
m_Parameters.StripeSize);
}
m_StartDataPos = m_DataPos; // metadata needs this info
if (a->m_AggregatorChainComm.Rank() <
Expand Down
16 changes: 8 additions & 8 deletions source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ void MPIShmChain::HandshakeLinks_Complete(HandshakeStruct &hs)
"aggregator, at Open");
}

void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize)
void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize,
const size_t alignment_size)
{
if (!m_Comm.IsMPI())
{
Expand All @@ -224,21 +225,20 @@ void MPIShmChain::CreateShm(size_t blocksize, const size_t maxsegmentsize)
}
char *ptr;
size_t structsize = sizeof(ShmSegment);
structsize += helper::PaddingToAlignOffset(structsize, sizeof(max_align_t));
structsize += helper::PaddingToAlignOffset(structsize, alignment_size);
if (!m_Rank)
{
blocksize +=
helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t));
blocksize += helper::PaddingToAlignOffset(blocksize, alignment_size);
size_t totalsize = structsize + 2 * blocksize;
if (totalsize > maxsegmentsize)
{
// roll back and calculate sizes from maxsegmentsize
totalsize = maxsegmentsize - sizeof(max_align_t) + 1;
totalsize = maxsegmentsize - alignment_size + 1;
totalsize +=
helper::PaddingToAlignOffset(totalsize, sizeof(max_align_t));
blocksize = (totalsize - structsize) / 2 - sizeof(max_align_t) + 1;
helper::PaddingToAlignOffset(totalsize, alignment_size);
blocksize = (totalsize - structsize) / 2 - alignment_size + 1;
blocksize +=
helper::PaddingToAlignOffset(blocksize, sizeof(max_align_t));
helper::PaddingToAlignOffset(blocksize, alignment_size);
totalsize = structsize + 2 * blocksize;
}
m_Win = m_Comm.Win_allocate_shared(totalsize, 1, &ptr);
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/toolkit/aggregator/mpi/MPIShmChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class MPIShmChain : public MPIAggregator
void ResetBuffers() noexcept;

// 2*blocksize+some is allocated but only up to maxsegmentsize
void CreateShm(size_t blocksize, const size_t maxsegmentsize);
void CreateShm(size_t blocksize, const size_t maxsegmentsize,
const size_t alignment_size);
void DestroyShm();

private:
Expand Down
Loading