diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index db442adc80..35639a886f 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -214,6 +214,11 @@ if(ADIOS2_USE_SST AND NOT MSVC) set(ADIOS2_SST_HAVE_CRAY_DRC TRUE) endif() endif() + find_package(NVSTREAM) + if(NVSTREAM_FOUND) + find_package(Boost OPTIONAL_COMPONENTS thread log filesystem system) + set(ADIOS2_SST_HAVE_NVSTREAM TRUE) + endif() endif() #SysV IPC diff --git a/cmake/FindNVSTREAM.cmake b/cmake/FindNVSTREAM.cmake new file mode 100644 index 0000000000..23be40cc60 --- /dev/null +++ b/cmake/FindNVSTREAM.cmake @@ -0,0 +1,63 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# +# +# FindNVSTREAM +# ----------- +# +# Try to find the NVSTREAM library +# +# This module defines the following variables: +# +# NVSTREAM_FOUND - System has NVSTREAM +# NVSTREAM_INCLUDE_DIRS - The NVSTREAM include directory +# NVSTREAM_LIBRARIES - Link these to use NVSTREAM +# NVSTREAM_VERSION - Version of the NVSTREAM library to support +# +# and the following imported targets: +# NVStream::NVStream - The core NVSTREAM library +# +# You can also set the following variable to help guide the search: +# NVSTREAM_ROOT - The install prefix for NVSTREAM containing the +# include and lib folders +# Note: this can be set as a CMake variable or an +# environment variable. If specified as a CMake +# variable, it will override any setting specified +# as an environment variable. + +if(NOT NVSTREAM_FOUND) + if((NOT NVSTREAM_ROOT) AND (NOT (ENV{NVSTREAM_ROOT} STREQUAL ""))) + set(NVSTREAM_ROOT "$ENV{NVSTREAM_ROOT}") + endif() + if(NVSTREAM_ROOT) + set(NVSTREAM_INCLUDE_OPTS HINTS ${NVSTREAM_ROOT}/include NO_DEFAULT_PATHS) + set(NVSTREAM_LIBRARY_OPTS + HINTS ${NVSTREAM_ROOT}/lib ${NVSTREAM_ROOT}/lib64 + NO_DEFAULT_PATHS + ) + endif() + + find_path(NVSTREAM_INCLUDE_DIR nvs/store.h ${NVSTREAM_INCLUDE_OPTS}) + find_library(NVSTREAM_LIBRARY libyuma.a ${NVSTREAM_LIBRARY_OPTS}) + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(NVStream + FOUND_VAR NVSTREAM_FOUND + VERSION_VAR NVSTREAM_VERSION + REQUIRED_VARS NVSTREAM_LIBRARY NVSTREAM_INCLUDE_DIR + ) + message(STATUS "NVSTREAM_FOUND is ${NVSTREAM_FOUND}, LIB is ${NVSTREAM_LIBRARY}") + if(NVSTREAM_FOUND) + set(NVSTREAM_INCLUDE_DIRS ${NVSTREAM_INCLUDE_DIR}) + set(NVSTREAM_LIBRARIES ${NVSTREAM_LIBRARY}) + if(NVSTREAM_FOUND AND NOT TARGET NVStream::NVStream) + message(STATUS "ADDING LIBRARY NVSTREAM_FOUND is ${NVSTREAM_FOUND}, LIB is ${NVSTREAM_LIBRARY}") + add_library(NVStream::NVStream UNKNOWN IMPORTED) + set_target_properties(NVStream::NVStream PROPERTIES + IMPORTED_LOCATION "${NVSTREAM_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${NVSTREAM_INCLUDE_DIR}" + ) + endif() + endif() +endif() diff --git a/source/adios2/toolkit/sst/CMakeLists.txt b/source/adios2/toolkit/sst/CMakeLists.txt index fc8b2f6465..6a3205e3d4 100644 --- a/source/adios2/toolkit/sst/CMakeLists.txt +++ b/source/adios2/toolkit/sst/CMakeLists.txt @@ -25,6 +25,12 @@ if(ADIOS2_SST_HAVE_LIBFABRIC) endif() endif() +if(ADIOS2_SST_HAVE_NVSTREAM) + target_sources(sst PRIVATE dp/nvstream_dp.c dp/nvswrapper.cpp) + target_link_libraries(sst PRIVATE NVStream::NVStream ${Boost_LIBRARIES}) + set(CMAKE_REQUIRED_INCLUDES ${NVSTREAM_INCLUDE_DIRS}) +endif() + if(ADIOS2_HAVE_ZFP) target_sources(sst PRIVATE cp/ffs_zfp.c) target_link_libraries(sst PRIVATE zfp::zfp) @@ -44,6 +50,7 @@ set(SST_CONFIG_OPTS LIBFABRIC FI_GNI CRAY_DRC + NVSTREAM ) include(SSTFunctions) GenerateSSTHeaderConfig(${SST_CONFIG_OPTS}) diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 38750d404d..c724840e80 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -79,7 +79,13 @@ static char *readContactInfoFile(const char *Name, SstStream Stream, else { char Tmp[strlen(SSTMAGICV0)]; - fread(Tmp, strlen(SSTMAGICV0), 1, WriterInfo); + if (fread(Tmp, strlen(SSTMAGICV0), 1, WriterInfo) != 1) + { + fprintf(stderr, + "Filesystem read failed in SST Open, failing operation\n"); + fclose(WriterInfo); + Badfile++; + } Size -= strlen(SSTMAGICV0); if (strncmp(Tmp, SSTMAGICV0, strlen(SSTMAGICV0)) != 0) { @@ -99,8 +105,11 @@ static char *readContactInfoFile(const char *Name, SstStream Stream, char *Buffer = calloc(1, Size + 1); if (fread(Buffer, Size, 1, WriterInfo) != 1) { - fprintf(stderr, "Filesystem read failed, exiting\n"); - exit(1); + fprintf(stderr, + "Filesystem read failed in SST Open, failing operation\n"); + free(Buffer); + fclose(WriterInfo); + return NULL; } fclose(WriterInfo); return Buffer; diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c index 11230dfa55..7f91a7522d 100644 --- a/source/adios2/toolkit/sst/dp/dp.c +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -13,6 +13,9 @@ #ifdef SST_HAVE_LIBFABRIC extern CP_DP_Interface LoadRdmaDP(); #endif /* SST_HAVE_LIBFABRIC */ +#ifdef SST_HAVE_NVSTREAM +extern CP_DP_Interface LoadNvstreamDP(); +#endif /* SST_HAVE_LIBFABRIC */ extern CP_DP_Interface LoadEVpathDP(); typedef struct _DPElement @@ -60,6 +63,11 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, AddDPPossibility(Svcs, CP_Stream, List, LoadRdmaDP(), "rdma", Params); #endif /* SST_HAVE_LIBFABRIC */ +#ifdef SST_HAVE_NVSTREAM + List = AddDPPossibility(Svcs, CP_Stream, List, LoadNvstreamDP(), "nvstream", + Params); +#endif /* SST_HAVE_LIBFABRIC */ + int SelectedDP = -1; int BestPriority = -1; int BestPrioDP = -1; diff --git a/source/adios2/toolkit/sst/dp/nvstream_dp.c b/source/adios2/toolkit/sst/dp/nvstream_dp.c new file mode 100644 index 0000000000..c95e01e666 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvstream_dp.c @@ -0,0 +1,842 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "sst_data.h" + +#include "adios2/toolkit/profiling/taustubs/taustubs.h" +#include "dp_interface.h" +#include "nvswrapper.h" + +/* + * Some conventions: + * `RS` indicates a reader-side item. + * `WS` indicates a writer-side item. + * `WSR` indicates a writer-side per-reader item. + * + * We keep different "stream" structures for the reader side and for the + * writer side. On the writer side, there's actually a "stream" + * per-connected-reader (a WSR_Stream), with the idea that some (many?) + * RDMA transports will require connections/pairing, so we'd need to track + * resources per reader. + * + * Generally the 'contact information' we exchange at init time includes + * the address of the local 'stream' data structure. This address isn't + * particularly useful to the far side, but it can be returned with + * requests to indicate what resource is targeted. For example, when a + * remote memory read request arrives at the writer from the reader, it + * includes the WSR_Stream value that is the address of the writer-side + * per-reader data structure. Upon message arrival, we just cast that + * value back into a pointer. + * + * By design, neither the data plane nor the control plane reference the + * other's symbols directly. The interface between the control plane and + * the data plane is represented by the types and structures defined in + * dp_interface.h and is a set of function pointers and FFS-style + * descriptions of the data structures to be communicated at init time. + * This allows for the future possibility of loading planes at run-time, etc. + * + * This "nvstream" data plane uses control plane functionality to implement + * the ReadRemoteMemory functionality. That is, it both the request to + * read memory and the response which carries the data are actually + * accomplished using the connections and message delivery facilities of + * the control plane, made available here via CP_Services. A real data + * plane would replace one or both of these with RDMA functionality. + */ + +typedef struct _Nvstream_RS_Stream +{ + CManager cm; + void *CP_Stream; + CMFormat ReadRequestFormat; + pthread_mutex_t DataLock; + int Rank; + + /* writer info */ + int WriterCohortSize; + CP_PeerCohort PeerCohort; + void **writer_nvs; + struct _NvstreamWriterContactInfo *WriterContactInfo; + struct _NvstreamCompletionHandle *PendingReadRequests; + + /* queued timestep info */ + struct _RSTimestepEntry *QueuedTimesteps; +} * Nvstream_RS_Stream; + +typedef struct _Nvstream_WSR_Stream +{ + struct _Nvstream_WS_Stream *WS_Stream; + CP_PeerCohort PeerCohort; + int ReaderCohortSize; + char *ReaderRequests; + struct _NvstreamReaderContactInfo *ReaderContactInfo; + struct _NvstreamWriterContactInfo + *WriterContactInfo; /* included so we can free on destroy */ +} * Nvstream_WSR_Stream; + +typedef struct _TimestepEntry +{ + long Timestep; + struct _SstData Data; + struct _NvstreamPerTimestepInfo *DP_TimestepInfo; + struct _TimestepEntry *Next; +} * TimestepList; + +typedef struct _RSTimestepEntry +{ + long Timestep; + int WriterRank; + char *Data; + long DataSize; + long DataStart; + struct _RSTimestepEntry *Next; +} * RSTimestepList; + +typedef struct _Nvstream_WS_Stream +{ + CManager cm; + void *CP_Stream; + int Rank; + void *nvs; + + TimestepList Timesteps; + CMFormat ReadReplyFormat; + + int ReaderCount; + Nvstream_WSR_Stream *Readers; +} * Nvstream_WS_Stream; + +typedef struct _NvstreamReaderContactInfo +{ + char *ContactString; + CMConnection Conn; + void *RS_Stream; +} * NvstreamReaderContactInfo; + +typedef struct _NvstreamWriterContactInfo +{ + char *ContactString; + void *WS_Stream; +} * NvstreamWriterContactInfo; + +typedef struct _NvstreamReadRequestMsg +{ + long Timestep; + size_t Offset; + size_t Length; + void *WS_Stream; + void *RS_Stream; + int RequestingRank; + int NotifyCondition; +} * NvstreamReadRequestMsg; + +static FMField NvstreamReadRequestList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(NvstreamReadRequestMsg, Timestep)}, + {"Offset", "integer", sizeof(size_t), + FMOffset(NvstreamReadRequestMsg, Offset)}, + {"Length", "integer", sizeof(size_t), + FMOffset(NvstreamReadRequestMsg, Length)}, + {"WS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadRequestMsg, WS_Stream)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadRequestMsg, RS_Stream)}, + {"RequestingRank", "integer", sizeof(int), + FMOffset(NvstreamReadRequestMsg, RequestingRank)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(NvstreamReadRequestMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReadRequestStructs[] = { + {"NvstreamReadRequest", NvstreamReadRequestList, + sizeof(struct _NvstreamReadRequestMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +typedef struct _NvstreamReadReplyMsg +{ + long Timestep; + size_t DataLength; + void *RS_Stream; + char *Data; + int NotifyCondition; +} * NvstreamReadReplyMsg; + +static FMField NvstreamReadReplyList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(NvstreamReadReplyMsg, Timestep)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadReplyMsg, RS_Stream)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(NvstreamReadReplyMsg, DataLength)}, + {"Data", "char[DataLength]", sizeof(char), + FMOffset(NvstreamReadReplyMsg, Data)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(NvstreamReadReplyMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReadReplyStructs[] = { + {"NvstreamReadReply", NvstreamReadReplyList, + sizeof(struct _NvstreamReadReplyMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs); + +static DP_RS_Stream NvstreamInitReader(CP_Services Svcs, void *CP_Stream, + void **ReaderContactInfoPtr, + struct _SstParams *Params) +{ + Nvstream_RS_Stream Stream = malloc(sizeof(struct _Nvstream_RS_Stream)); + NvstreamReaderContactInfo Contact = + malloc(sizeof(struct _NvstreamReaderContactInfo)); + CManager cm = Svcs->getCManager(CP_Stream); + char *NvstreamContactString; + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + CManager CM = Svcs->getCManager(CP_Stream); + attr_list ListenAttrs = create_attr_list(); + + memset(Stream, 0, sizeof(*Stream)); + memset(Contact, 0, sizeof(*Contact)); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + pthread_mutex_init(&Stream->DataLock, NULL); + + SMPI_Comm_rank(comm, &Stream->Rank); + + set_string_attr(ListenAttrs, attr_atom_from_string("CM_TRANSPORT"), + "sockets"); + + if (Params->DataInterface) + { + set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), + strdup(Params->DataInterface)); + } + else if (Params->NetworkInterface) + { + set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), + strdup(Params->NetworkInterface)); + } + + CMlisten_specific(CM, ListenAttrs); + attr_list ContactList = CMget_specific_contact_list(CM, ListenAttrs); + + NvstreamContactString = attr_list_to_string(ContactList); + + Contact->ContactString = NvstreamContactString; + Contact->RS_Stream = Stream; + + *ReaderContactInfoPtr = Contact; + + return Stream; +} + +static void NvstreamDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) +{ + Nvstream_RS_Stream RS_Stream = (Nvstream_RS_Stream)RS_Stream_v; + free(RS_Stream); +} + +static void NvstreamReadRequestHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs) +{ + TAU_START_FUNC(); + NvstreamReadRequestMsg ReadRequestMsg = (NvstreamReadRequestMsg)msg_v; + Nvstream_WSR_Stream WSR_Stream = ReadRequestMsg->WS_Stream; + + Nvstream_WS_Stream WS_Stream = WSR_Stream->WS_Stream; + TimestepList tmp = WS_Stream->Timesteps; + CP_Services Svcs = (CP_Services)client_Data; + int RequestingRank = ReadRequestMsg->RequestingRank; + + Svcs->verbose(WS_Stream->CP_Stream, + "Got a request to read remote memory " + "from reader rank %d: timestep %d, " + "offset %d, length %d\n", + RequestingRank, ReadRequestMsg->Timestep, + ReadRequestMsg->Offset, ReadRequestMsg->Length); + while (tmp != NULL) + { + if (tmp->Timestep == ReadRequestMsg->Timestep) + { + struct _NvstreamReadReplyMsg ReadReplyMsg; + /* memset avoids uninit byte warnings from valgrind */ + memset(&ReadReplyMsg, 0, sizeof(ReadReplyMsg)); + ReadReplyMsg.Timestep = ReadRequestMsg->Timestep; + ReadReplyMsg.DataLength = ReadRequestMsg->Length; + ReadReplyMsg.Data = tmp->Data.block + ReadRequestMsg->Offset; + ReadReplyMsg.RS_Stream = ReadRequestMsg->RS_Stream; + ReadReplyMsg.NotifyCondition = ReadRequestMsg->NotifyCondition; + Svcs->verbose( + WS_Stream->CP_Stream, + "Sending a reply to reader rank %d for remote memory read\n", + RequestingRank); + if (!WSR_Stream->ReaderContactInfo[RequestingRank].Conn) + { + attr_list List = attr_list_from_string( + WSR_Stream->ReaderContactInfo[RequestingRank] + .ContactString); + CMConnection Conn = CMget_conn(cm, List); + free_attr_list(List); + WSR_Stream->ReaderContactInfo[RequestingRank].Conn = Conn; + } + CMwrite(WSR_Stream->ReaderContactInfo[RequestingRank].Conn, + WS_Stream->ReadReplyFormat, &ReadReplyMsg); + + TAU_STOP_FUNC(); + return; + } + tmp = tmp->Next; + } + /* + * Shouldn't ever get here because we should never get a request for a + * timestep that we don't have. + */ + fprintf(stderr, "Writer rank %d - Failed to read Timestep %ld, not found\n", + WSR_Stream->WS_Stream->Rank, ReadRequestMsg->Timestep); + /* + * in the interest of not failing a writer on a reader failure, don't + * assert(0) here. Probably this sort of error should close the link to + * a reader though. + */ + TAU_STOP_FUNC(); +} + +typedef struct _NvstreamCompletionHandle +{ + int CMcondition; + CManager cm; + void *CPStream; + void *DPStream; + void *Buffer; + int Failed; + int Rank; + struct _NvstreamCompletionHandle *Next; +} * NvstreamCompletionHandle; + +static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs) +{ + TAU_START_FUNC(); + NvstreamReadReplyMsg ReadReplyMsg = (NvstreamReadReplyMsg)msg_v; + Nvstream_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + NvstreamCompletionHandle Handle = NULL; + + if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition)) + { + Svcs->verbose(RS_Stream->CP_Stream, "Got a reply to remote memory " + "read, but the condition is " + "already signalled, returning\n"); + TAU_STOP_FUNC(); + return; + } + Handle = CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + if (!Handle) + { + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read, but condition not found\n"); + TAU_STOP_FUNC(); + return; + } + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read from rank %d, condition is %d\n", + Handle->Rank, ReadReplyMsg->NotifyCondition); + + /* + * `Handle` contains the full request info and is `client_data` + * associated with the CMCondition. Once we get it, copy the incoming + * data to the buffer area given by the request + */ + memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); + + /* + * Signal the condition to wake the reader if they are waiting. + */ + CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); + TAU_STOP_FUNC(); +} + +static DP_WS_Stream NvstreamInitWriter(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params) +{ + Nvstream_WS_Stream Stream = malloc(sizeof(struct _Nvstream_WS_Stream)); + CManager cm = Svcs->getCManager(CP_Stream); + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + memset(Stream, 0, sizeof(struct _Nvstream_WS_Stream)); + + SMPI_Comm_rank(comm, &Stream->Rank); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + Stream->nvs = nvs_create_store(); + + Svcs->verbose(Stream->CP_Stream, + "Initializing NVSTREAM writer, create store returned %p\n", + Stream->nvs); + + return (void *)Stream; +} + +static void NvstreamDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) +{ + Nvstream_WS_Stream WS_Stream = (Nvstream_WS_Stream)WS_Stream_v; + nvs_finalize_(WS_Stream->nvs); + for (int i = 0; i < WS_Stream->ReaderCount; i++) + { + if (WS_Stream->Readers[i]) + { + free(WS_Stream->Readers[i]->WriterContactInfo->ContactString); + free(WS_Stream->Readers[i]->WriterContactInfo); + free(WS_Stream->Readers[i]->ReaderContactInfo->ContactString); + if (WS_Stream->Readers[i]->ReaderContactInfo->Conn) + CMConnection_close( + WS_Stream->Readers[i]->ReaderContactInfo->Conn); + free(WS_Stream->Readers[i]->ReaderContactInfo); + free(WS_Stream->Readers[i]); + } + } + free(WS_Stream->Readers); + free(WS_Stream); +} + +static DP_WSR_Stream NvstreamInitWriterPerReader(CP_Services Svcs, + DP_WS_Stream WS_Stream_v, + int readerCohortSize, + CP_PeerCohort PeerCohort, + void **providedReaderInfo_v, + void **WriterContactInfoPtr) +{ + Nvstream_WS_Stream WS_Stream = (Nvstream_WS_Stream)WS_Stream_v; + Nvstream_WSR_Stream WSR_Stream = malloc(sizeof(*WSR_Stream)); + NvstreamWriterContactInfo ContactInfo; + MPI_Comm comm = Svcs->getMPIComm(WS_Stream->CP_Stream); + int Rank; + NvstreamReaderContactInfo *providedReaderInfo = + (NvstreamReaderContactInfo *)providedReaderInfo_v; + + SMPI_Comm_rank(comm, &Rank); + + WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */ + WSR_Stream->PeerCohort = PeerCohort; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + WSR_Stream->ReaderCohortSize = readerCohortSize; + WSR_Stream->ReaderContactInfo = + malloc(sizeof(struct _NvstreamReaderContactInfo) * readerCohortSize); + for (int i = 0; i < readerCohortSize; i++) + { + WSR_Stream->ReaderContactInfo[i].ContactString = NULL; + WSR_Stream->ReaderContactInfo[i].Conn = NULL; + WSR_Stream->ReaderContactInfo[i].RS_Stream = + providedReaderInfo[i]->RS_Stream; + Svcs->verbose( + WS_Stream->CP_Stream, + "Received contact info \"%s\", RD_Stream %p for Reader Rank %d\n", + WSR_Stream->ReaderContactInfo[i].ContactString, + WSR_Stream->ReaderContactInfo[i].RS_Stream, i); + } + + /* + * add this writer-side reader-specific stream to the parent writer stream + * structure + */ + WS_Stream->Readers = realloc( + WS_Stream->Readers, sizeof(*WSR_Stream) * (WS_Stream->ReaderCount + 1)); + WS_Stream->Readers[WS_Stream->ReaderCount] = WSR_Stream; + WS_Stream->ReaderCount++; + + ContactInfo = malloc(sizeof(struct _NvstreamWriterContactInfo)); + memset(ContactInfo, 0, sizeof(struct _NvstreamWriterContactInfo)); + ContactInfo->ContactString = strdup(nvs_get_store_name(WS_Stream->nvs)); + ContactInfo->WS_Stream = WSR_Stream; + *WriterContactInfoPtr = ContactInfo; + WSR_Stream->WriterContactInfo = ContactInfo; + + return WSR_Stream; +} + +static void NvstreamDestroyWriterPerReader(CP_Services Svcs, + DP_WSR_Stream WSR_Stream_v) +{ + Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSR_Stream_v; + free(WSR_Stream); +} + +static void NvstreamProvideWriterDataToReader(CP_Services Svcs, + DP_RS_Stream RS_Stream_v, + int writerCohortSize, + CP_PeerCohort PeerCohort, + void **providedWriterInfo_v) +{ + Nvstream_RS_Stream RS_Stream = (Nvstream_RS_Stream)RS_Stream_v; + NvstreamWriterContactInfo *providedWriterInfo = + (NvstreamWriterContactInfo *)providedWriterInfo_v; + + RS_Stream->PeerCohort = PeerCohort; + RS_Stream->WriterCohortSize = writerCohortSize; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + RS_Stream->WriterContactInfo = + malloc(sizeof(struct _NvstreamWriterContactInfo) * writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + RS_Stream->WriterContactInfo[i].ContactString = + strdup(providedWriterInfo[i]->ContactString); + RS_Stream->WriterContactInfo[i].WS_Stream = + providedWriterInfo[i]->WS_Stream; + Svcs->verbose( + RS_Stream->CP_Stream, + "Received contact info \"%s\", WS_stream %p for WSR Rank %d\n", + RS_Stream->WriterContactInfo[i].ContactString, + RS_Stream->WriterContactInfo[i].WS_Stream, i); + } + RS_Stream->writer_nvs = malloc(sizeof(void *) * writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + RS_Stream->writer_nvs[i] = + nvs_open_store(RS_Stream->WriterContactInfo[i].ContactString); + } +} + +static void AddRequestToList(CP_Services Svcs, Nvstream_RS_Stream Stream, + NvstreamCompletionHandle Handle) +{ + Handle->Next = Stream->PendingReadRequests; + Stream->PendingReadRequests = Handle; +} + +static void RemoveRequestFromList(CP_Services Svcs, Nvstream_RS_Stream Stream, + NvstreamCompletionHandle Handle) +{ + NvstreamCompletionHandle Tmp = Stream->PendingReadRequests; + + if (Stream->PendingReadRequests == Handle) + { + Stream->PendingReadRequests = Handle->Next; + return; + } + + while (Tmp != NULL && Tmp->Next != Handle) + { + Tmp = Tmp->Next; + } + + if (Tmp == NULL) + return; + + // Tmp->Next must be the handle to remove + Tmp->Next = Tmp->Next->Next; +} + +static void FailRequestsToRank(CP_Services Svcs, CManager cm, + Nvstream_RS_Stream Stream, int FailedRank) +{ + NvstreamCompletionHandle Tmp = Stream->PendingReadRequests; + Svcs->verbose(Stream->CP_Stream, + "Fail pending requests to writer rank %d\n", FailedRank); + while (Tmp != NULL) + { + if (Tmp->Rank == FailedRank) + { + Tmp->Failed = 1; + Svcs->verbose(Tmp->CPStream, + "Found a pending remote memory read " + "to failed writer rank %d, marking as " + "failed and signalling condition %d\n", + Tmp->Rank, Tmp->CMcondition); + CMCondition_signal(cm, Tmp->CMcondition); + Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n", + Tmp->Rank, Tmp->CMcondition); + } + Tmp = Tmp->Next; + } + Svcs->verbose(Stream->CP_Stream, + "Done Failing requests to writer rank %d\n", FailedRank); +} + +typedef struct _NvstreamPerTimestepInfo +{ + char *CheckString; + int CheckInt; +} * NvstreamPerTimestepInfo; + +static void *NvstreamReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, + int Rank, long Timestep, size_t Offset, + size_t Length, void *Buffer, + void *DP_TimestepInfo) +{ + Nvstream_RS_Stream Stream = (Nvstream_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + NvstreamCompletionHandle ret = + malloc(sizeof(struct _NvstreamCompletionHandle)); + NvstreamPerTimestepInfo TimestepInfo = + (NvstreamPerTimestepInfo)DP_TimestepInfo; + struct _NvstreamReadRequestMsg ReadRequestMsg; + + static long LastRequestedTimestep = -1; + + LastRequestedTimestep = Timestep; + + ret->CPStream = Stream->CP_Stream; + ret->DPStream = Stream; + ret->Failed = 0; + ret->cm = cm; + ret->Buffer = Buffer; + ret->Rank = Rank; + ret->CMcondition = -1; + + Svcs->verbose(Stream->CP_Stream, + "Adios requesting to read remote memory for Timestep %d " + "from Rank %d, WSR_Stream = %p, nvs = %p\n", + Timestep, Rank, Stream->WriterContactInfo[Rank].WS_Stream, + Stream->writer_nvs[Rank]); + + char *StringName = malloc(20); + void *NVBlock; + char *BaseAddr; + sprintf(StringName, "Timestep_%ld", Timestep); + BaseAddr = nvs_get_with_malloc(Stream->writer_nvs[Rank], StringName, 1); + + if (BaseAddr) + { + memcpy(Buffer, BaseAddr + Offset, Length); + } + else + { + fprintf(stderr, "remote memory read failed\n"); + } + return ret; +} + +static int NvstreamWaitForCompletion(CP_Services Svcs, void *Handle_v) +{ + NvstreamCompletionHandle Handle = (NvstreamCompletionHandle)Handle_v; + int Ret = 1; + Svcs->verbose( + Handle->CPStream, + "Waiting for completion of memory read to rank %d, condition %d\n", + Handle->Rank, Handle->CMcondition); + /* + * Wait for the CM condition to be signalled. If it has been already, + * this returns immediately. Copying the incoming data to the waiting + * buffer has been done by the reply handler. + */ + if (Handle->CMcondition != -1) + CMCondition_wait(Handle->cm, Handle->CMcondition); + if (Handle->Failed) + { + Svcs->verbose(Handle->CPStream, + "Remote memory read to rank %d with " + "condition %d has FAILED because of " + "writer failure\n", + Handle->Rank, Handle->CMcondition); + Ret = 0; + } + else + { + Svcs->verbose( + Handle->CPStream, + "Remote memory read to rank %d with condition %d has completed\n", + Handle->Rank, Handle->CMcondition); + } + RemoveRequestFromList(Svcs, Handle->DPStream, Handle); + free(Handle); + return Ret; +} + +static void NvstreamNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, + int FailedPeerRank) +{ + Nvstream_RS_Stream Stream = (Nvstream_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + Svcs->verbose(Stream->CP_Stream, + "received notification that writer peer " + "%d has failed, failing any pending " + "requests\n", + FailedPeerRank); + FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank); +} + +static void NvstreamProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + struct _SstData *Data, + struct _SstData *LocalMetadata, + long Timestep, void **TimestepInfoPtr) +{ + Nvstream_WS_Stream Stream = (Nvstream_WS_Stream)Stream_v; + TimestepList Entry = malloc(sizeof(struct _TimestepEntry)); + struct _NvstreamPerTimestepInfo *Info = NULL; + // malloc(sizeof(struct _NvstreamPerTimestepInfo)); + + // This section exercised the CP's ability to distribute DP per timestep + // info. Commenting out as needed for Nvstream for now + // Info->CheckString = malloc(64); + // sprintf(Info->CheckString, "Nvstream info for timestep %ld from rank + // %d", + // Timestep, Stream->Rank); + // Info->CheckInt = Stream->Rank * 1000 + Timestep; + // *TimestepInfoPtr = Info; + memset(Entry, 0, sizeof(*Entry)); + Entry->DP_TimestepInfo = NULL; + Entry->Data = *Data; + Entry->Timestep = Timestep; + + Entry->Next = Stream->Timesteps; + Stream->Timesteps = Entry; + *TimestepInfoPtr = NULL; + + char *StringName = malloc(20); + void *NVBlock; + sprintf(StringName, "Timestep_%ld", Timestep); + NVBlock = nvs_alloc(Stream->nvs, &Data->DataSize, StringName); + memcpy(NVBlock, Data->block, Data->DataSize); + nvs_snapshot_(Stream->nvs, &Stream->Rank); + fprintf(stderr, "Did alloc, memcpy and snapshot with name %s\n", + StringName); +} + +static void NvstreamReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + long Timestep) +{ + Nvstream_WS_Stream Stream = (Nvstream_WS_Stream)Stream_v; + TimestepList List = Stream->Timesteps; + + Svcs->verbose(Stream->CP_Stream, "Releasing timestep %ld\n", Timestep); + if (Stream->Timesteps->Timestep == Timestep) + { + Stream->Timesteps = List->Next; + if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) + free(List->DP_TimestepInfo->CheckString); + if (List->DP_TimestepInfo) + free(List->DP_TimestepInfo); + free(List); + } + else + { + TimestepList last = List; + List = List->Next; + while (List != NULL) + { + if (List->Timestep == Timestep) + { + last->Next = List->Next; + if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) + free(List->DP_TimestepInfo->CheckString); + if (List->DP_TimestepInfo) + free(List->DP_TimestepInfo); + free(List); + return; + } + last = List; + List = List->Next; + } + /* + * Shouldn't ever get here because we should never release a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to release Timestep %ld, not found\n", + Timestep); + assert(0); + } +} + +static FMField NvstreamReaderContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(NvstreamReaderContactInfo, ContactString)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(NvstreamReaderContactInfo, RS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReaderContactStructs[] = { + {"NvstreamReaderContactInfo", NvstreamReaderContactList, + sizeof(struct _NvstreamReaderContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField NvstreamWriterContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(NvstreamWriterContactInfo, ContactString)}, + {"writer_ID", "integer", sizeof(void *), + FMOffset(NvstreamWriterContactInfo, WS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamWriterContactStructs[] = { + {"NvstreamWriterContactInfo", NvstreamWriterContactList, + sizeof(struct _NvstreamWriterContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField NvstreamTimestepInfoList[] = { + {"CheckString", "string", sizeof(char *), + FMOffset(NvstreamPerTimestepInfo, CheckString)}, + {"CheckInt", "integer", sizeof(void *), + FMOffset(NvstreamPerTimestepInfo, CheckInt)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamTimestepInfoStructs[] = { + {"NvstreamTimestepInfo", NvstreamTimestepInfoList, + sizeof(struct _NvstreamPerTimestepInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static struct _CP_DP_Interface nvstreamDPInterface; + +static int NvstreamGetPriority(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params) +{ + /* The nvstream DP priority 10 */ + return 10; +} + +extern CP_DP_Interface LoadNvstreamDP() +{ + memset(&nvstreamDPInterface, 0, sizeof(nvstreamDPInterface)); + nvstreamDPInterface.ReaderContactFormats = NvstreamReaderContactStructs; + nvstreamDPInterface.WriterContactFormats = NvstreamWriterContactStructs; + nvstreamDPInterface.TimestepInfoFormats = + NULL; // NvstreamTimestepInfoStructs; + nvstreamDPInterface.initReader = NvstreamInitReader; + nvstreamDPInterface.initWriter = NvstreamInitWriter; + nvstreamDPInterface.initWriterPerReader = NvstreamInitWriterPerReader; + nvstreamDPInterface.provideWriterDataToReader = + NvstreamProvideWriterDataToReader; + nvstreamDPInterface.readRemoteMemory = NvstreamReadRemoteMemory; + nvstreamDPInterface.waitForCompletion = NvstreamWaitForCompletion; + nvstreamDPInterface.notifyConnFailure = NvstreamNotifyConnFailure; + nvstreamDPInterface.provideTimestep = NvstreamProvideTimestep; + nvstreamDPInterface.releaseTimestep = NvstreamReleaseTimestep; + nvstreamDPInterface.readerRegisterTimestep = NULL; + nvstreamDPInterface.readerReleaseTimestep = NULL; + nvstreamDPInterface.readPatternLocked = NULL; + nvstreamDPInterface.destroyReader = NvstreamDestroyReader; + nvstreamDPInterface.destroyWriter = NvstreamDestroyWriter; + nvstreamDPInterface.destroyWriterPerReader = NvstreamDestroyWriterPerReader; + nvstreamDPInterface.getPriority = NvstreamGetPriority; + nvstreamDPInterface.unGetPriority = NULL; + return &nvstreamDPInterface; +} diff --git a/source/adios2/toolkit/sst/dp/nvswrapper.cpp b/source/adios2/toolkit/sst/dp/nvswrapper.cpp new file mode 100644 index 0000000000..052e3ac439 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvswrapper.cpp @@ -0,0 +1,64 @@ +#include "nvs/store.h" +#include "nvs/store_manager.h" +#include +#include +#include + +#include "nvswrapper.h" + +extern "C" { +void *nvs_create_store() +{ + nvs::Store *st = nvs::StoreManager::GetInstance(""); + std::string store_name = st->get_store_id(); + return (void *)st; +} + +void *nvs_open_store(char *STORE_ID) +{ + std::string store_name = STORE_ID; + nvs::Store *st = nvs::StoreManager::GetInstance(store_name); + return (void *)st; +} + +char *nvs_get_store_name(void *vst) +{ + nvs::Store *st = (nvs::Store *)vst; + const char *tmp = st->get_store_id().c_str(); + return strdup(tmp); +} + +void *nvs_alloc(void *vst, unsigned long *size, char *s) +{ + void *ptr; + nvs::Store *st = (nvs::Store *)vst; + st->create_obj(std::string(s), *size, &ptr); + return ptr; +} + +void *nvs_get_with_malloc(void *vst, char *key, long version) +{ + void *ptr; + nvs::Store *st = (nvs::Store *)vst; + nvs::ErrorCode tmp = st->get_with_malloc(std::string(key), version, &ptr); + if (tmp != nvs::ErrorCode::NO_ERROR) + { + std::cout << "get with malloc error" << std::endl; + } + return ptr; +} + +void nvs_free_(void *vst, void *ptr) {} + +void nvs_snapshot_(void *vst, int *proc_id) +{ + nvs::Store *st = (nvs::Store *)vst; + st->put_all(); +} + +int nvs_finalize_(void *vst) +{ + nvs::Store *st = (nvs::Store *)vst; + delete st; +} +} diff --git a/source/adios2/toolkit/sst/dp/nvswrapper.h b/source/adios2/toolkit/sst/dp/nvswrapper.h new file mode 100644 index 0000000000..8b9a0091e8 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvswrapper.h @@ -0,0 +1,34 @@ +// +// Created by pradeep on 1/11/18. +// Adapted by eisen July 2019 +// + +#ifndef NVSWRAPPER_H +#define NVSWRAPPER_H + +/* C interface for fortran apps */ +#ifdef __cplusplus +extern "C" { +#endif + +void *nvs_open_store(char *store_name); + +void *nvs_create_store(); + +char *nvs_get_store_name(void *vst); + +void *nvs_alloc(void *vst, unsigned long *n, char *s); + +void *nvs_get_with_malloc(void *vst, char *key, long version); + +void nvs_free_(void *vst, void *ptr); + +void nvs_snapshot_(void *vst, int *proc_id); + +int nvs_finalize_(void *vst); + +#ifdef __cplusplus +} +#endif + +#endif // NVSWRAPPER_H