From b6398a26f636faaa58fcd0b480c96b485941dde1 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Fri, 2 Aug 2019 18:48:25 -0400 Subject: [PATCH 1/4] WIP: Shared memory data plane based on GT NVStream --- cmake/DetectOptions.cmake | 5 + cmake/FindNVSTREAM.cmake | 63 + source/adios2/toolkit/sst/CMakeLists.txt | 7 + source/adios2/toolkit/sst/dp/dp.c | 8 + source/adios2/toolkit/sst/dp/nvstream_dp.c | 1143 +++++++++++++++++++ source/adios2/toolkit/sst/dp/nvswrapper.cpp | 72 ++ source/adios2/toolkit/sst/dp/nvswrapper.h | 34 + 7 files changed, 1332 insertions(+) create mode 100644 cmake/FindNVSTREAM.cmake create mode 100644 source/adios2/toolkit/sst/dp/nvstream_dp.c create mode 100644 source/adios2/toolkit/sst/dp/nvswrapper.cpp create mode 100644 source/adios2/toolkit/sst/dp/nvswrapper.h diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index 8ee6b38abb..a4ac895764 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -203,6 +203,11 @@ if(ADIOS2_USE_SST AND NOT MSVC) set(ADIOS2_SST_HAVE_CRAY_DRC TRUE) endif() endif() + find_package(NVSTREAM) + if(NVSTREAM_FOUND) + find_package(Boost OPTIONAL_COMPONENTS thread log filesystem system) + set(ADIOS2_SST_HAVE_NVSTREAM TRUE) + endif() endif() #SysV IPC diff --git a/cmake/FindNVSTREAM.cmake b/cmake/FindNVSTREAM.cmake new file mode 100644 index 0000000000..23be40cc60 --- /dev/null +++ b/cmake/FindNVSTREAM.cmake @@ -0,0 +1,63 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# +# +# FindNVSTREAM +# ----------- +# +# Try to find the NVSTREAM library +# +# This module defines the following variables: +# +# NVSTREAM_FOUND - System has NVSTREAM +# NVSTREAM_INCLUDE_DIRS - The NVSTREAM include directory +# NVSTREAM_LIBRARIES - Link these to use NVSTREAM +# NVSTREAM_VERSION - Version of the NVSTREAM library to support +# +# and the following imported targets: +# NVStream::NVStream - The core NVSTREAM library +# +# You can also set the following variable to help guide the search: +# NVSTREAM_ROOT - The install prefix for NVSTREAM containing the +# include and lib folders +# Note: this can be set as a CMake variable or an +# environment variable. If specified as a CMake +# variable, it will override any setting specified +# as an environment variable. + +if(NOT NVSTREAM_FOUND) + if((NOT NVSTREAM_ROOT) AND (NOT (ENV{NVSTREAM_ROOT} STREQUAL ""))) + set(NVSTREAM_ROOT "$ENV{NVSTREAM_ROOT}") + endif() + if(NVSTREAM_ROOT) + set(NVSTREAM_INCLUDE_OPTS HINTS ${NVSTREAM_ROOT}/include NO_DEFAULT_PATHS) + set(NVSTREAM_LIBRARY_OPTS + HINTS ${NVSTREAM_ROOT}/lib ${NVSTREAM_ROOT}/lib64 + NO_DEFAULT_PATHS + ) + endif() + + find_path(NVSTREAM_INCLUDE_DIR nvs/store.h ${NVSTREAM_INCLUDE_OPTS}) + find_library(NVSTREAM_LIBRARY libyuma.a ${NVSTREAM_LIBRARY_OPTS}) + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args(NVStream + FOUND_VAR NVSTREAM_FOUND + VERSION_VAR NVSTREAM_VERSION + REQUIRED_VARS NVSTREAM_LIBRARY NVSTREAM_INCLUDE_DIR + ) + message(STATUS "NVSTREAM_FOUND is ${NVSTREAM_FOUND}, LIB is ${NVSTREAM_LIBRARY}") + if(NVSTREAM_FOUND) + set(NVSTREAM_INCLUDE_DIRS ${NVSTREAM_INCLUDE_DIR}) + set(NVSTREAM_LIBRARIES ${NVSTREAM_LIBRARY}) + if(NVSTREAM_FOUND AND NOT TARGET NVStream::NVStream) + message(STATUS "ADDING LIBRARY NVSTREAM_FOUND is ${NVSTREAM_FOUND}, LIB is ${NVSTREAM_LIBRARY}") + add_library(NVStream::NVStream UNKNOWN IMPORTED) + set_target_properties(NVStream::NVStream PROPERTIES + IMPORTED_LOCATION "${NVSTREAM_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${NVSTREAM_INCLUDE_DIR}" + ) + endif() + endif() +endif() diff --git a/source/adios2/toolkit/sst/CMakeLists.txt b/source/adios2/toolkit/sst/CMakeLists.txt index fc8b2f6465..6a3205e3d4 100644 --- a/source/adios2/toolkit/sst/CMakeLists.txt +++ b/source/adios2/toolkit/sst/CMakeLists.txt @@ -25,6 +25,12 @@ if(ADIOS2_SST_HAVE_LIBFABRIC) endif() endif() +if(ADIOS2_SST_HAVE_NVSTREAM) + target_sources(sst PRIVATE dp/nvstream_dp.c dp/nvswrapper.cpp) + target_link_libraries(sst PRIVATE NVStream::NVStream ${Boost_LIBRARIES}) + set(CMAKE_REQUIRED_INCLUDES ${NVSTREAM_INCLUDE_DIRS}) +endif() + if(ADIOS2_HAVE_ZFP) target_sources(sst PRIVATE cp/ffs_zfp.c) target_link_libraries(sst PRIVATE zfp::zfp) @@ -44,6 +50,7 @@ set(SST_CONFIG_OPTS LIBFABRIC FI_GNI CRAY_DRC + NVSTREAM ) include(SSTFunctions) GenerateSSTHeaderConfig(${SST_CONFIG_OPTS}) diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c index 11230dfa55..7f91a7522d 100644 --- a/source/adios2/toolkit/sst/dp/dp.c +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -13,6 +13,9 @@ #ifdef SST_HAVE_LIBFABRIC extern CP_DP_Interface LoadRdmaDP(); #endif /* SST_HAVE_LIBFABRIC */ +#ifdef SST_HAVE_NVSTREAM +extern CP_DP_Interface LoadNvstreamDP(); +#endif /* SST_HAVE_LIBFABRIC */ extern CP_DP_Interface LoadEVpathDP(); typedef struct _DPElement @@ -60,6 +63,11 @@ CP_DP_Interface SelectDP(CP_Services Svcs, void *CP_Stream, AddDPPossibility(Svcs, CP_Stream, List, LoadRdmaDP(), "rdma", Params); #endif /* SST_HAVE_LIBFABRIC */ +#ifdef SST_HAVE_NVSTREAM + List = AddDPPossibility(Svcs, CP_Stream, List, LoadNvstreamDP(), "nvstream", + Params); +#endif /* SST_HAVE_LIBFABRIC */ + int SelectedDP = -1; int BestPriority = -1; int BestPrioDP = -1; diff --git a/source/adios2/toolkit/sst/dp/nvstream_dp.c b/source/adios2/toolkit/sst/dp/nvstream_dp.c new file mode 100644 index 0000000000..5e3c48da48 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvstream_dp.c @@ -0,0 +1,1143 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "sst_data.h" + +#include "adios2/toolkit/profiling/taustubs/taustubs.h" +#include "dp_interface.h" +#include "nvswrapper.h" + +/* + * Some conventions: + * `RS` indicates a reader-side item. + * `WS` indicates a writer-side item. + * `WSR` indicates a writer-side per-reader item. + * + * We keep different "stream" structures for the reader side and for the + * writer side. On the writer side, there's actually a "stream" + * per-connected-reader (a WSR_Stream), with the idea that some (many?) + * RDMA transports will require connections/pairing, so we'd need to track + * resources per reader. + * + * Generally the 'contact information' we exchange at init time includes + * the address of the local 'stream' data structure. This address isn't + * particularly useful to the far side, but it can be returned with + * requests to indicate what resource is targeted. For example, when a + * remote memory read request arrives at the writer from the reader, it + * includes the WSR_Stream value that is the address of the writer-side + * per-reader data structure. Upon message arrival, we just cast that + * value back into a pointer. + * + * By design, neither the data plane nor the control plane reference the + * other's symbols directly. The interface between the control plane and + * the data plane is represented by the types and structures defined in + * dp_interface.h and is a set of function pointers and FFS-style + * descriptions of the data structures to be communicated at init time. + * This allows for the future possibility of loading planes at run-time, etc. + * + * This "nvstream" data plane uses control plane functionality to implement + * the ReadRemoteMemory functionality. That is, it both the request to + * read memory and the response which carries the data are actually + * accomplished using the connections and message delivery facilities of + * the control plane, made available here via CP_Services. A real data + * plane would replace one or both of these with RDMA functionality. + */ + +typedef struct _Nvstream_RS_Stream +{ + CManager cm; + void *CP_Stream; + CMFormat ReadRequestFormat; + pthread_mutex_t DataLock; + int Rank; + + /* writer info */ + int WriterCohortSize; + CP_PeerCohort PeerCohort; + void **writer_nvs; + struct _NvstreamWriterContactInfo *WriterContactInfo; + struct _NvstreamCompletionHandle *PendingReadRequests; + + /* queued timestep info */ + struct _RSTimestepEntry *QueuedTimesteps; +} * Nvstream_RS_Stream; + +typedef struct _Nvstream_WSR_Stream +{ + struct _Nvstream_WS_Stream *WS_Stream; + CP_PeerCohort PeerCohort; + int ReaderCohortSize; + int ReadPatternLockTimestep; + char *ReaderRequests; + struct _NvstreamReaderContactInfo *ReaderContactInfo; + struct _NvstreamWriterContactInfo + *WriterContactInfo; /* included so we can free on destroy */ +} * Nvstream_WSR_Stream; + +typedef struct _TimestepEntry +{ + long Timestep; + struct _SstData Data; + struct _NvstreamPerTimestepInfo *DP_TimestepInfo; + struct _ReaderRequestTrackRec *ReaderRequests; + struct _TimestepEntry *Next; +} * TimestepList; + +typedef struct _RSTimestepEntry +{ + long Timestep; + int WriterRank; + char *Data; + long DataSize; + long DataStart; + struct _RSTimestepEntry *Next; +} * RSTimestepList; + +typedef struct _ReaderRequestTrackRec +{ + Nvstream_WSR_Stream Reader; + char *RequestList; + struct _ReaderRequestTrackRec *Next; +} * ReaderRequestTrackPtr; + +typedef struct _Nvstream_WS_Stream +{ + CManager cm; + void *CP_Stream; + int Rank; + void *nvs; + + TimestepList Timesteps; + CMFormat ReadReplyFormat; + CMFormat PreloadFormat; + + int ReaderCount; + Nvstream_WSR_Stream *Readers; +} * Nvstream_WS_Stream; + +typedef struct _NvstreamReaderContactInfo +{ + char *ContactString; + CMConnection Conn; + void *RS_Stream; +} * NvstreamReaderContactInfo; + +typedef struct _NvstreamWriterContactInfo +{ + char *ContactString; + void *WS_Stream; +} * NvstreamWriterContactInfo; + +typedef struct _NvstreamReadRequestMsg +{ + long Timestep; + size_t Offset; + size_t Length; + void *WS_Stream; + void *RS_Stream; + int RequestingRank; + int NotifyCondition; +} * NvstreamReadRequestMsg; + +static FMField NvstreamReadRequestList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(NvstreamReadRequestMsg, Timestep)}, + {"Offset", "integer", sizeof(size_t), + FMOffset(NvstreamReadRequestMsg, Offset)}, + {"Length", "integer", sizeof(size_t), + FMOffset(NvstreamReadRequestMsg, Length)}, + {"WS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadRequestMsg, WS_Stream)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadRequestMsg, RS_Stream)}, + {"RequestingRank", "integer", sizeof(int), + FMOffset(NvstreamReadRequestMsg, RequestingRank)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(NvstreamReadRequestMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReadRequestStructs[] = { + {"NvstreamReadRequest", NvstreamReadRequestList, + sizeof(struct _NvstreamReadRequestMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +typedef struct _NvstreamReadReplyMsg +{ + long Timestep; + size_t DataLength; + void *RS_Stream; + char *Data; + int NotifyCondition; +} * NvstreamReadReplyMsg; + +static FMField NvstreamReadReplyList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(NvstreamReadReplyMsg, Timestep)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamReadReplyMsg, RS_Stream)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(NvstreamReadReplyMsg, DataLength)}, + {"Data", "char[DataLength]", sizeof(char), + FMOffset(NvstreamReadReplyMsg, Data)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(NvstreamReadReplyMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReadReplyStructs[] = { + {"NvstreamReadReply", NvstreamReadReplyList, + sizeof(struct _NvstreamReadReplyMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs); + +typedef struct _NvstreamPreloadMsg +{ + long Timestep; + size_t DataLength; + int WriterRank; + void *RS_Stream; + char *Data; +} * NvstreamPreloadMsg; + +static FMField NvstreamPreloadList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(NvstreamPreloadMsg, Timestep)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(NvstreamPreloadMsg, DataLength)}, + {"WriterRank", "integer", sizeof(size_t), + FMOffset(NvstreamPreloadMsg, WriterRank)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(NvstreamPreloadMsg, RS_Stream)}, + {"Data", "char[DataLength]", sizeof(char), + FMOffset(NvstreamPreloadMsg, Data)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamPreloadStructs[] = { + {"NvstreamPreload", NvstreamPreloadList, sizeof(struct _NvstreamPreloadMsg), + NULL}, + {NULL, NULL, 0, NULL}}; + +static void NvstreamPreloadHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs); +static void DiscardPriorPreloaded(CP_Services Svcs, + Nvstream_RS_Stream RS_Stream, long Timestep); + +static DP_RS_Stream NvstreamInitReader(CP_Services Svcs, void *CP_Stream, + void **ReaderContactInfoPtr, + struct _SstParams *Params) +{ + Nvstream_RS_Stream Stream = malloc(sizeof(struct _Nvstream_RS_Stream)); + NvstreamReaderContactInfo Contact = + malloc(sizeof(struct _NvstreamReaderContactInfo)); + CManager cm = Svcs->getCManager(CP_Stream); + char *NvstreamContactString; + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + CManager CM = Svcs->getCManager(CP_Stream); + attr_list ListenAttrs = create_attr_list(); + + memset(Stream, 0, sizeof(*Stream)); + memset(Contact, 0, sizeof(*Contact)); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + pthread_mutex_init(&Stream->DataLock, NULL); + + SMPI_Comm_rank(comm, &Stream->Rank); + + set_string_attr(ListenAttrs, attr_atom_from_string("CM_TRANSPORT"), + "sockets"); + + if (Params->DataInterface) + { + set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), + strdup(Params->DataInterface)); + } + else if (Params->NetworkInterface) + { + set_string_attr(ListenAttrs, attr_atom_from_string("IP_INTERFACE"), + strdup(Params->NetworkInterface)); + } + + CMlisten_specific(CM, ListenAttrs); + attr_list ContactList = CMget_specific_contact_list(CM, ListenAttrs); + + NvstreamContactString = attr_list_to_string(ContactList); + + Contact->ContactString = NvstreamContactString; + Contact->RS_Stream = Stream; + + *ReaderContactInfoPtr = Contact; + + return Stream; +} + +static void NvstreamDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) +{ + Nvstream_RS_Stream RS_Stream = (Nvstream_RS_Stream)RS_Stream_v; + DiscardPriorPreloaded(Svcs, RS_Stream, -1); + free(RS_Stream); +} + +static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader, + int RequestingRank) +{ + ReaderRequestTrackPtr ReqList = TS->ReaderRequests; + while (ReqList != NULL) + { + if (ReqList->Reader == Reader) + { + ReqList->RequestList[RequestingRank] = 1; + } + ReqList = ReqList->Next; + } +} + +static void NvstreamReadRequestHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs) +{ + TAU_START_FUNC(); + NvstreamReadRequestMsg ReadRequestMsg = (NvstreamReadRequestMsg)msg_v; + Nvstream_WSR_Stream WSR_Stream = ReadRequestMsg->WS_Stream; + + Nvstream_WS_Stream WS_Stream = WSR_Stream->WS_Stream; + TimestepList tmp = WS_Stream->Timesteps; + CP_Services Svcs = (CP_Services)client_Data; + int RequestingRank = ReadRequestMsg->RequestingRank; + + Svcs->verbose(WS_Stream->CP_Stream, + "Got a request to read remote memory " + "from reader rank %d: timestep %d, " + "offset %d, length %d\n", + RequestingRank, ReadRequestMsg->Timestep, + ReadRequestMsg->Offset, ReadRequestMsg->Length); + while (tmp != NULL) + { + if (tmp->Timestep == ReadRequestMsg->Timestep) + { + struct _NvstreamReadReplyMsg ReadReplyMsg; + /* memset avoids uninit byte warnings from valgrind */ + MarkReadRequest(tmp, WSR_Stream, RequestingRank); + memset(&ReadReplyMsg, 0, sizeof(ReadReplyMsg)); + ReadReplyMsg.Timestep = ReadRequestMsg->Timestep; + ReadReplyMsg.DataLength = ReadRequestMsg->Length; + ReadReplyMsg.Data = tmp->Data.block + ReadRequestMsg->Offset; + ReadReplyMsg.RS_Stream = ReadRequestMsg->RS_Stream; + ReadReplyMsg.NotifyCondition = ReadRequestMsg->NotifyCondition; + Svcs->verbose( + WS_Stream->CP_Stream, + "Sending a reply to reader rank %d for remote memory read\n", + RequestingRank); + if (!WSR_Stream->ReaderContactInfo[RequestingRank].Conn) + { + attr_list List = attr_list_from_string( + WSR_Stream->ReaderContactInfo[RequestingRank] + .ContactString); + CMConnection Conn = CMget_conn(cm, List); + free_attr_list(List); + WSR_Stream->ReaderContactInfo[RequestingRank].Conn = Conn; + } + CMwrite(WSR_Stream->ReaderContactInfo[RequestingRank].Conn, + WS_Stream->ReadReplyFormat, &ReadReplyMsg); + + TAU_STOP_FUNC(); + return; + } + tmp = tmp->Next; + } + /* + * Shouldn't ever get here because we should never get a request for a + * timestep that we don't have. + */ + fprintf(stderr, "Writer rank %d - Failed to read Timestep %ld, not found\n", + WSR_Stream->WS_Stream->Rank, ReadRequestMsg->Timestep); + /* + * in the interest of not failing a writer on a reader failure, don't + * assert(0) here. Probably this sort of error should close the link to + * a reader though. + */ + TAU_STOP_FUNC(); +} + +typedef struct _NvstreamCompletionHandle +{ + int CMcondition; + CManager cm; + void *CPStream; + void *DPStream; + void *Buffer; + int Failed; + int Rank; + struct _NvstreamCompletionHandle *Next; +} * NvstreamCompletionHandle; + +static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_Data, + attr_list attrs) +{ + TAU_START_FUNC(); + NvstreamReadReplyMsg ReadReplyMsg = (NvstreamReadReplyMsg)msg_v; + Nvstream_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + NvstreamCompletionHandle Handle = NULL; + + if (CMCondition_has_signaled(cm, ReadReplyMsg->NotifyCondition)) + { + Svcs->verbose(RS_Stream->CP_Stream, "Got a reply to remote memory " + "read, but the condition is " + "already signalled, returning\n"); + TAU_STOP_FUNC(); + return; + } + Handle = CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + if (!Handle) + { + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read, but condition not found\n"); + TAU_STOP_FUNC(); + return; + } + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read from rank %d, condition is %d\n", + Handle->Rank, ReadReplyMsg->NotifyCondition); + + /* + * `Handle` contains the full request info and is `client_data` + * associated with the CMCondition. Once we get it, copy the incoming + * data to the buffer area given by the request + */ + memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); + + /* + * Signal the condition to wake the reader if they are waiting. + */ + CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); + TAU_STOP_FUNC(); +} + +static int HandleRequestWithPreloaded(CP_Services Svcs, + Nvstream_RS_Stream RS_Stream, int Rank, + long Timestep, size_t Offset, + size_t Length, void *Buffer) +{ + RSTimestepList Entry = NULL; + pthread_mutex_lock(&RS_Stream->DataLock); + Entry = RS_Stream->QueuedTimesteps; + while (Entry && + ((Entry->WriterRank != Rank) || (Entry->Timestep != Timestep))) + { + Entry = Entry->Next; + } + pthread_mutex_unlock(&RS_Stream->DataLock); + if (!Entry) + { + return 0; + } + Svcs->verbose(RS_Stream->CP_Stream, + "Satisfying remote memory read with preload from writer rank " + "%d for timestep %ld\n", + Rank, Timestep); + memcpy(Buffer, Entry->Data + Offset, Length); + return 1; +} + +static void DiscardPriorPreloaded(CP_Services Svcs, + Nvstream_RS_Stream RS_Stream, long Timestep) +{ + RSTimestepList Entry, Last = NULL; + pthread_mutex_lock(&RS_Stream->DataLock); + Entry = RS_Stream->QueuedTimesteps; + + while (Entry) + { + RSTimestepList Next = Entry->Next; + if ((Entry->Timestep < Timestep) || (Timestep == -1)) + { + RSTimestepList ItemToFree = Entry; + CManager cm = Svcs->getCManager(RS_Stream->CP_Stream); + if (Last) + { + Last->Next = Entry->Next; + } + else + { + RS_Stream->QueuedTimesteps = Entry->Next; + } + /* free item */ + CMreturn_buffer(cm, ItemToFree->Data); + + free(ItemToFree); + } + else + { + Last = Entry; + } + Entry = Next; + } + pthread_mutex_unlock(&RS_Stream->DataLock); +} + +static void NvstreamPreloadHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + NvstreamPreloadMsg PreloadMsg = (NvstreamPreloadMsg)msg_v; + Nvstream_RS_Stream RS_Stream = PreloadMsg->RS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + NvstreamCompletionHandle Handle = NULL; + RSTimestepList Entry = calloc(1, sizeof(*Entry)); + + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a preload message from writer rank %d for timestep %ld\n", + PreloadMsg->WriterRank, PreloadMsg->Timestep); + + /* arrange for this message data to stay around */ + CMtake_buffer(cm, msg_v); + + Entry->Timestep = PreloadMsg->Timestep; + Entry->WriterRank = PreloadMsg->WriterRank; + Entry->Data = PreloadMsg->Data; + Entry->DataSize = PreloadMsg->DataLength; + Entry->DataStart = 0; + + pthread_mutex_lock(&RS_Stream->DataLock); + Entry->Next = RS_Stream->QueuedTimesteps; + RS_Stream->QueuedTimesteps = Entry; + pthread_mutex_unlock(&RS_Stream->DataLock); + return; +} + +static DP_WS_Stream NvstreamInitWriter(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params) +{ + Nvstream_WS_Stream Stream = malloc(sizeof(struct _Nvstream_WS_Stream)); + CManager cm = Svcs->getCManager(CP_Stream); + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + memset(Stream, 0, sizeof(struct _Nvstream_WS_Stream)); + + SMPI_Comm_rank(comm, &Stream->Rank); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + Stream->nvs = nvs_create_store(); + + Svcs->verbose(Stream->CP_Stream, + "Initializing NVSTREAM writer, create store returned %p\n", + Stream->nvs); + + return (void *)Stream; +} + +static void NvstreamDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) +{ + Nvstream_WS_Stream WS_Stream = (Nvstream_WS_Stream)WS_Stream_v; + printf("Calling finalize %p\n", WS_Stream->nvs); + nvs_finalize_(WS_Stream->nvs); + for (int i = 0; i < WS_Stream->ReaderCount; i++) + { + if (WS_Stream->Readers[i]) + { + free(WS_Stream->Readers[i]->WriterContactInfo->ContactString); + free(WS_Stream->Readers[i]->WriterContactInfo); + free(WS_Stream->Readers[i]->ReaderContactInfo->ContactString); + if (WS_Stream->Readers[i]->ReaderContactInfo->Conn) + CMConnection_close( + WS_Stream->Readers[i]->ReaderContactInfo->Conn); + free(WS_Stream->Readers[i]->ReaderContactInfo); + free(WS_Stream->Readers[i]); + } + } + free(WS_Stream->Readers); + free(WS_Stream); +} + +static DP_WSR_Stream NvstreamInitWriterPerReader(CP_Services Svcs, + DP_WS_Stream WS_Stream_v, + int readerCohortSize, + CP_PeerCohort PeerCohort, + void **providedReaderInfo_v, + void **WriterContactInfoPtr) +{ + Nvstream_WS_Stream WS_Stream = (Nvstream_WS_Stream)WS_Stream_v; + Nvstream_WSR_Stream WSR_Stream = malloc(sizeof(*WSR_Stream)); + NvstreamWriterContactInfo ContactInfo; + MPI_Comm comm = Svcs->getMPIComm(WS_Stream->CP_Stream); + int Rank; + NvstreamReaderContactInfo *providedReaderInfo = + (NvstreamReaderContactInfo *)providedReaderInfo_v; + + SMPI_Comm_rank(comm, &Rank); + + WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */ + WSR_Stream->PeerCohort = PeerCohort; + WSR_Stream->ReadPatternLockTimestep = -1; + WSR_Stream->ReaderRequests = NULL; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + WSR_Stream->ReaderCohortSize = readerCohortSize; + WSR_Stream->ReaderContactInfo = + malloc(sizeof(struct _NvstreamReaderContactInfo) * readerCohortSize); + for (int i = 0; i < readerCohortSize; i++) + { + WSR_Stream->ReaderContactInfo[i].ContactString = NULL; + WSR_Stream->ReaderContactInfo[i].Conn = NULL; + WSR_Stream->ReaderContactInfo[i].RS_Stream = + providedReaderInfo[i]->RS_Stream; + Svcs->verbose( + WS_Stream->CP_Stream, + "Received contact info \"%s\", RD_Stream %p for Reader Rank %d\n", + WSR_Stream->ReaderContactInfo[i].ContactString, + WSR_Stream->ReaderContactInfo[i].RS_Stream, i); + } + + /* + * add this writer-side reader-specific stream to the parent writer stream + * structure + */ + WS_Stream->Readers = realloc( + WS_Stream->Readers, sizeof(*WSR_Stream) * (WS_Stream->ReaderCount + 1)); + WS_Stream->Readers[WS_Stream->ReaderCount] = WSR_Stream; + WS_Stream->ReaderCount++; + + ContactInfo = malloc(sizeof(struct _NvstreamWriterContactInfo)); + memset(ContactInfo, 0, sizeof(struct _NvstreamWriterContactInfo)); + ContactInfo->ContactString = strdup(nvs_get_store_name(WS_Stream->nvs)); + ContactInfo->WS_Stream = WSR_Stream; + *WriterContactInfoPtr = ContactInfo; + WSR_Stream->WriterContactInfo = ContactInfo; + + return WSR_Stream; +} + +static void NvstreamDestroyWriterPerReader(CP_Services Svcs, + DP_WSR_Stream WSR_Stream_v) +{ + Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSR_Stream_v; + free(WSR_Stream); +} + +static void NvstreamProvideWriterDataToReader(CP_Services Svcs, + DP_RS_Stream RS_Stream_v, + int writerCohortSize, + CP_PeerCohort PeerCohort, + void **providedWriterInfo_v) +{ + Nvstream_RS_Stream RS_Stream = (Nvstream_RS_Stream)RS_Stream_v; + NvstreamWriterContactInfo *providedWriterInfo = + (NvstreamWriterContactInfo *)providedWriterInfo_v; + + RS_Stream->PeerCohort = PeerCohort; + RS_Stream->WriterCohortSize = writerCohortSize; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + RS_Stream->WriterContactInfo = + malloc(sizeof(struct _NvstreamWriterContactInfo) * writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + RS_Stream->WriterContactInfo[i].ContactString = + strdup(providedWriterInfo[i]->ContactString); + RS_Stream->WriterContactInfo[i].WS_Stream = + providedWriterInfo[i]->WS_Stream; + Svcs->verbose( + RS_Stream->CP_Stream, + "Received contact info \"%s\", WS_stream %p for WSR Rank %d\n", + RS_Stream->WriterContactInfo[i].ContactString, + RS_Stream->WriterContactInfo[i].WS_Stream, i); + } + RS_Stream->writer_nvs = malloc(sizeof(void *) * writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + RS_Stream->writer_nvs[i] = + nvs_open_store(RS_Stream->WriterContactInfo[i].ContactString); + printf("reader got pointer to writer NVS[%d] = %p\n", i, + RS_Stream->writer_nvs[i]); + } +} + +static void AddRequestToList(CP_Services Svcs, Nvstream_RS_Stream Stream, + NvstreamCompletionHandle Handle) +{ + Handle->Next = Stream->PendingReadRequests; + Stream->PendingReadRequests = Handle; +} + +static void RemoveRequestFromList(CP_Services Svcs, Nvstream_RS_Stream Stream, + NvstreamCompletionHandle Handle) +{ + NvstreamCompletionHandle Tmp = Stream->PendingReadRequests; + + if (Stream->PendingReadRequests == Handle) + { + Stream->PendingReadRequests = Handle->Next; + return; + } + + while (Tmp != NULL && Tmp->Next != Handle) + { + Tmp = Tmp->Next; + } + + if (Tmp == NULL) + return; + + // Tmp->Next must be the handle to remove + Tmp->Next = Tmp->Next->Next; +} + +static void FailRequestsToRank(CP_Services Svcs, CManager cm, + Nvstream_RS_Stream Stream, int FailedRank) +{ + NvstreamCompletionHandle Tmp = Stream->PendingReadRequests; + Svcs->verbose(Stream->CP_Stream, + "Fail pending requests to writer rank %d\n", FailedRank); + while (Tmp != NULL) + { + if (Tmp->Rank == FailedRank) + { + Tmp->Failed = 1; + Svcs->verbose(Tmp->CPStream, + "Found a pending remote memory read " + "to failed writer rank %d, marking as " + "failed and signalling condition %d\n", + Tmp->Rank, Tmp->CMcondition); + CMCondition_signal(cm, Tmp->CMcondition); + Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n", + Tmp->Rank, Tmp->CMcondition); + } + Tmp = Tmp->Next; + } + Svcs->verbose(Stream->CP_Stream, + "Done Failing requests to writer rank %d\n", FailedRank); +} + +typedef struct _NvstreamPerTimestepInfo +{ + char *CheckString; + int CheckInt; +} * NvstreamPerTimestepInfo; + +static void *NvstreamReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, + int Rank, long Timestep, size_t Offset, + size_t Length, void *Buffer, + void *DP_TimestepInfo) +{ + Nvstream_RS_Stream Stream = (Nvstream_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + NvstreamCompletionHandle ret = + malloc(sizeof(struct _NvstreamCompletionHandle)); + NvstreamPerTimestepInfo TimestepInfo = + (NvstreamPerTimestepInfo)DP_TimestepInfo; + struct _NvstreamReadRequestMsg ReadRequestMsg; + + int HadPreload; + static long LastRequestedTimestep = -1; + + if ((LastRequestedTimestep != -1) && (LastRequestedTimestep != Timestep)) + { + DiscardPriorPreloaded(Svcs, Stream, Timestep); + } + LastRequestedTimestep = Timestep; + HadPreload = HandleRequestWithPreloaded(Svcs, Stream, Rank, Timestep, + Offset, Length, Buffer); + ret->CPStream = Stream->CP_Stream; + ret->DPStream = Stream; + ret->Failed = 0; + ret->cm = cm; + ret->Buffer = Buffer; + ret->Rank = Rank; + ret->CMcondition = -1; + + if (HadPreload) + { + /* cool, we already had the data. Setup a dummy return handle */ + return ret; + } + + Svcs->verbose(Stream->CP_Stream, + "Adios requesting to read remote memory for Timestep %d " + "from Rank %d, WSR_Stream = %p, nvs = %p\n", + Timestep, Rank, Stream->WriterContactInfo[Rank].WS_Stream, + Stream->writer_nvs[Rank]); + + char *StringName = malloc(20); + void *NVBlock; + char *BaseAddr; + sprintf(StringName, "Timestep_%ld", Timestep); + BaseAddr = nvs_get_with_malloc(Stream->writer_nvs[Rank], StringName, 1); + + if (BaseAddr) + { + memcpy(Buffer, BaseAddr + Offset, Length); + } + else + { + fprintf(stderr, "remote memory read failed\n"); + } + return ret; +} + +static int NvstreamWaitForCompletion(CP_Services Svcs, void *Handle_v) +{ + NvstreamCompletionHandle Handle = (NvstreamCompletionHandle)Handle_v; + int Ret = 1; + Svcs->verbose( + Handle->CPStream, + "Waiting for completion of memory read to rank %d, condition %d\n", + Handle->Rank, Handle->CMcondition); + /* + * Wait for the CM condition to be signalled. If it has been already, + * this returns immediately. Copying the incoming data to the waiting + * buffer has been done by the reply handler. + */ + if (Handle->CMcondition != -1) + CMCondition_wait(Handle->cm, Handle->CMcondition); + if (Handle->Failed) + { + Svcs->verbose(Handle->CPStream, + "Remote memory read to rank %d with " + "condition %d has FAILED because of " + "writer failure\n", + Handle->Rank, Handle->CMcondition); + Ret = 0; + } + else + { + Svcs->verbose( + Handle->CPStream, + "Remote memory read to rank %d with condition %d has completed\n", + Handle->Rank, Handle->CMcondition); + } + RemoveRequestFromList(Svcs, Handle->DPStream, Handle); + free(Handle); + return Ret; +} + +static void NvstreamNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, + int FailedPeerRank) +{ + Nvstream_RS_Stream Stream = (Nvstream_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + Svcs->verbose(Stream->CP_Stream, + "received notification that writer peer " + "%d has failed, failing any pending " + "requests\n", + FailedPeerRank); + FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank); +} + +static void NvstreamReaderRegisterTimestep(CP_Services Svcs, + DP_WSR_Stream WSRStream_v, + long Timestep, + int WriterDefinitionsLocked) +{ + Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSRStream_v; + Nvstream_WS_Stream WS_Stream = + WSR_Stream->WS_Stream; /* pointer to writer struct */ + + if (WriterDefinitionsLocked) + { + /* go ahead and record read patterns */ + TimestepList tmp = WS_Stream->Timesteps; + + while (tmp != NULL) + { + if (tmp->Timestep == Timestep) + { + ReaderRequestTrackPtr ReqTrk = calloc(1, sizeof(*ReqTrk)); + ReqTrk->Reader = WSR_Stream; + ReqTrk->RequestList = calloc(1, WSR_Stream->ReaderCohortSize); + ReqTrk->Next = tmp->ReaderRequests; + tmp->ReaderRequests = ReqTrk; + return; + } + } + tmp = tmp->Next; + printf("TIMESTEP NOT FOUND in READER REGISTER!\n"); + } +} + +static void SendPreloadMsgs(CP_Services Svcs, Nvstream_WSR_Stream WSR_Stream, + TimestepList TS) +{ + Nvstream_WS_Stream WS_Stream = + WSR_Stream->WS_Stream; /* pointer to writer struct */ + struct _NvstreamPreloadMsg PreloadMsg; + memset(&PreloadMsg, 0, sizeof(PreloadMsg)); + PreloadMsg.Timestep = TS->Timestep; + PreloadMsg.DataLength = TS->Data.DataSize; + PreloadMsg.Data = TS->Data.block; + PreloadMsg.WriterRank = WS_Stream->Rank; + + for (int i = 0; i < WSR_Stream->ReaderCohortSize; i++) + { + if (WSR_Stream->ReaderRequests[i]) + { + PreloadMsg.RS_Stream = WSR_Stream->ReaderContactInfo[i].RS_Stream; + CMwrite(WSR_Stream->ReaderContactInfo[i].Conn, + WS_Stream->PreloadFormat, &PreloadMsg); + } + } +} + +static void NvstreamReaderReleaseTimestep(CP_Services Svcs, + DP_WSR_Stream Stream_v, long Timestep) +{ + Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)Stream_v; + Nvstream_WS_Stream WS_Stream = + WSR_Stream->WS_Stream; /* pointer to writer struct */ + TimestepList tmp = WS_Stream->Timesteps; + + if (Timestep == WSR_Stream->ReadPatternLockTimestep) + { + /* save the pattern */ + while (tmp != NULL) + { + if (tmp->Timestep == Timestep) + { + ReaderRequestTrackPtr ReqList = tmp->ReaderRequests; + while (ReqList != NULL) + { + if (ReqList->Reader == WSR_Stream) + { + WSR_Stream->ReaderRequests = ReqList->RequestList; + /* so it doesn't get free'd */ + ReqList->RequestList = NULL; + } + ReqList = ReqList->Next; + } + } + tmp = tmp->Next; + } + if (WSR_Stream->ReaderRequests) + { + /* send out any already queued timesteps */ + tmp = WS_Stream->Timesteps; + while (tmp != NULL) + { + + SendPreloadMsgs(Svcs, WSR_Stream, tmp); + tmp = tmp->Next; + } + } + } +} + +static void NvstreamReadPatternLocked(CP_Services Svcs, + DP_WSR_Stream WSRStream_v, + long EffectiveTimestep) +{ + Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSRStream_v; + Nvstream_WS_Stream WS_Stream = + WSR_Stream->WS_Stream; /* pointer to writer struct */ + + WSR_Stream->ReadPatternLockTimestep = EffectiveTimestep; +} + +static void NvstreamProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + struct _SstData *Data, + struct _SstData *LocalMetadata, + long Timestep, void **TimestepInfoPtr) +{ + Nvstream_WS_Stream Stream = (Nvstream_WS_Stream)Stream_v; + TimestepList Entry = malloc(sizeof(struct _TimestepEntry)); + struct _NvstreamPerTimestepInfo *Info = NULL; + // malloc(sizeof(struct _NvstreamPerTimestepInfo)); + + // This section exercised the CP's ability to distribute DP per timestep + // info. Commenting out as needed for Nvstream for now + // Info->CheckString = malloc(64); + // sprintf(Info->CheckString, "Nvstream info for timestep %ld from rank + // %d", + // Timestep, Stream->Rank); + // Info->CheckInt = Stream->Rank * 1000 + Timestep; + // *TimestepInfoPtr = Info; + memset(Entry, 0, sizeof(*Entry)); + Entry->DP_TimestepInfo = NULL; + Entry->Data = *Data; + Entry->Timestep = Timestep; + + Entry->Next = Stream->Timesteps; + Stream->Timesteps = Entry; + *TimestepInfoPtr = NULL; + + char *StringName = malloc(20); + void *NVBlock; + sprintf(StringName, "Timestep_%ld", Timestep); + NVBlock = nvs_alloc(Stream->nvs, &Data->DataSize, StringName); + memcpy(NVBlock, Data->block, Data->DataSize); + nvs_snapshot_(Stream->nvs, &Stream->Rank); + fprintf(stderr, "Did alloc, memcpy and snapshot with name %s\n", + StringName); +} + +static void NvstreamReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + long Timestep) +{ + Nvstream_WS_Stream Stream = (Nvstream_WS_Stream)Stream_v; + TimestepList List = Stream->Timesteps; + + Svcs->verbose(Stream->CP_Stream, "Releasing timestep %ld\n", Timestep); + if (Stream->Timesteps->Timestep == Timestep) + { + Stream->Timesteps = List->Next; + if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) + free(List->DP_TimestepInfo->CheckString); + if (List->DP_TimestepInfo) + free(List->DP_TimestepInfo); + if (List->ReaderRequests) + { + ReaderRequestTrackPtr tmp = List->ReaderRequests; + while (tmp) + { + ReaderRequestTrackPtr Next = tmp->Next; + if (tmp->RequestList) + free(tmp->RequestList); + free(tmp); + tmp = Next; + } + } + + free(List); + } + else + { + TimestepList last = List; + List = List->Next; + while (List != NULL) + { + if (List->Timestep == Timestep) + { + last->Next = List->Next; + if (List->DP_TimestepInfo && List->DP_TimestepInfo->CheckString) + free(List->DP_TimestepInfo->CheckString); + if (List->DP_TimestepInfo) + free(List->DP_TimestepInfo); + if (List->ReaderRequests) + { + ReaderRequestTrackPtr tmp = List->ReaderRequests; + while (tmp) + { + ReaderRequestTrackPtr Next = tmp->Next; + if (tmp->RequestList) + free(tmp->RequestList); + free(tmp); + tmp = Next; + } + } + + free(List); + return; + } + last = List; + List = List->Next; + } + /* + * Shouldn't ever get here because we should never release a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to release Timestep %ld, not found\n", + Timestep); + assert(0); + } +} + +static FMField NvstreamReaderContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(NvstreamReaderContactInfo, ContactString)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(NvstreamReaderContactInfo, RS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamReaderContactStructs[] = { + {"NvstreamReaderContactInfo", NvstreamReaderContactList, + sizeof(struct _NvstreamReaderContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField NvstreamWriterContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(NvstreamWriterContactInfo, ContactString)}, + {"writer_ID", "integer", sizeof(void *), + FMOffset(NvstreamWriterContactInfo, WS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamWriterContactStructs[] = { + {"NvstreamWriterContactInfo", NvstreamWriterContactList, + sizeof(struct _NvstreamWriterContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField NvstreamTimestepInfoList[] = { + {"CheckString", "string", sizeof(char *), + FMOffset(NvstreamPerTimestepInfo, CheckString)}, + {"CheckInt", "integer", sizeof(void *), + FMOffset(NvstreamPerTimestepInfo, CheckInt)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec NvstreamTimestepInfoStructs[] = { + {"NvstreamTimestepInfo", NvstreamTimestepInfoList, + sizeof(struct _NvstreamPerTimestepInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static struct _CP_DP_Interface nvstreamDPInterface; + +static int NvstreamGetPriority(CP_Services Svcs, void *CP_Stream, + struct _SstParams *Params) +{ + /* The nvstream DP priority 10 */ + return 10; +} + +extern CP_DP_Interface LoadNvstreamDP() +{ + memset(&nvstreamDPInterface, 0, sizeof(nvstreamDPInterface)); + nvstreamDPInterface.ReaderContactFormats = NvstreamReaderContactStructs; + nvstreamDPInterface.WriterContactFormats = NvstreamWriterContactStructs; + nvstreamDPInterface.TimestepInfoFormats = + NULL; // NvstreamTimestepInfoStructs; + nvstreamDPInterface.initReader = NvstreamInitReader; + nvstreamDPInterface.initWriter = NvstreamInitWriter; + nvstreamDPInterface.initWriterPerReader = NvstreamInitWriterPerReader; + nvstreamDPInterface.provideWriterDataToReader = + NvstreamProvideWriterDataToReader; + nvstreamDPInterface.readRemoteMemory = NvstreamReadRemoteMemory; + nvstreamDPInterface.waitForCompletion = NvstreamWaitForCompletion; + nvstreamDPInterface.notifyConnFailure = NvstreamNotifyConnFailure; + nvstreamDPInterface.provideTimestep = NvstreamProvideTimestep; + nvstreamDPInterface.releaseTimestep = NvstreamReleaseTimestep; + nvstreamDPInterface.readerRegisterTimestep = NvstreamReaderRegisterTimestep; + nvstreamDPInterface.readerReleaseTimestep = NvstreamReaderReleaseTimestep; + nvstreamDPInterface.readPatternLocked = NvstreamReadPatternLocked; + nvstreamDPInterface.destroyReader = NvstreamDestroyReader; + nvstreamDPInterface.destroyWriter = NvstreamDestroyWriter; + nvstreamDPInterface.destroyWriterPerReader = NvstreamDestroyWriterPerReader; + nvstreamDPInterface.getPriority = NvstreamGetPriority; + nvstreamDPInterface.unGetPriority = NULL; + return &nvstreamDPInterface; +} diff --git a/source/adios2/toolkit/sst/dp/nvswrapper.cpp b/source/adios2/toolkit/sst/dp/nvswrapper.cpp new file mode 100644 index 0000000000..455ab72bf6 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvswrapper.cpp @@ -0,0 +1,72 @@ +#include "nvs/store.h" +#include "nvs/store_manager.h" +#include +#include +#include + +#include "nvswrapper.h" + +extern "C" { +void *nvs_create_store() +{ + std::cout << "calling GetInstance with a null entry" << std::endl; + nvs::Store *st = nvs::StoreManager::GetInstance(""); + std::string store_name = st->get_store_id(); + printf("Created Store_name %s, returning pointer %p\n", store_name.c_str(), + (void *)st); + return (void *)st; +} + +void *nvs_open_store(char *STORE_ID) +{ + std::string store_name = STORE_ID; + nvs::Store *st = nvs::StoreManager::GetInstance(store_name); + printf("Opened Store_name %s, returning pointer %p\n", store_name.c_str(), + (void *)st); + return (void *)st; +} + +char *nvs_get_store_name(void *vst) +{ + nvs::Store *st = (nvs::Store *)vst; + const char *tmp = st->get_store_id().c_str(); + return strdup(tmp); +} + +void *nvs_alloc(void *vst, unsigned long *size, char *s) +{ + void *ptr; + nvs::Store *st = (nvs::Store *)vst; + st->create_obj(std::string(s), *size, &ptr); + return ptr; +} + +void *nvs_get_with_malloc(void *vst, char *key, long version) +{ + void *ptr; + nvs::Store *st = (nvs::Store *)vst; + nvs::ErrorCode tmp = st->get_with_malloc(std::string(key), version, &ptr); + printf("Get from Store pointer %p\n", (void *)st); + if (tmp != nvs::ErrorCode::NO_ERROR) + { + std::cout << "get with malloc error" << std::endl; + } + return ptr; +} + +void nvs_free_(void *vst, void *ptr) {} + +void nvs_snapshot_(void *vst, int *proc_id) +{ + nvs::Store *st = (nvs::Store *)vst; + printf("Doing put_all on vst %p\n", vst); + st->put_all(); +} + +int nvs_finalize_(void *vst) +{ + nvs::Store *st = (nvs::Store *)vst; + printf("Doing delete on vst %p\n", vst); + delete st; +} +} diff --git a/source/adios2/toolkit/sst/dp/nvswrapper.h b/source/adios2/toolkit/sst/dp/nvswrapper.h new file mode 100644 index 0000000000..8b9a0091e8 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/nvswrapper.h @@ -0,0 +1,34 @@ +// +// Created by pradeep on 1/11/18. +// Adapted by eisen July 2019 +// + +#ifndef NVSWRAPPER_H +#define NVSWRAPPER_H + +/* C interface for fortran apps */ +#ifdef __cplusplus +extern "C" { +#endif + +void *nvs_open_store(char *store_name); + +void *nvs_create_store(); + +char *nvs_get_store_name(void *vst); + +void *nvs_alloc(void *vst, unsigned long *n, char *s); + +void *nvs_get_with_malloc(void *vst, char *key, long version); + +void nvs_free_(void *vst, void *ptr); + +void nvs_snapshot_(void *vst, int *proc_id); + +int nvs_finalize_(void *vst); + +#ifdef __cplusplus +} +#endif + +#endif // NVSWRAPPER_H From 3040fac1d5063fc98ebf76c7ca89cf65afcaa9e2 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Fri, 2 Aug 2019 18:57:40 -0400 Subject: [PATCH 2/4] Kill spurious output --- source/adios2/toolkit/sst/dp/nvstream_dp.c | 3 --- source/adios2/toolkit/sst/dp/nvswrapper.cpp | 8 -------- 2 files changed, 11 deletions(-) diff --git a/source/adios2/toolkit/sst/dp/nvstream_dp.c b/source/adios2/toolkit/sst/dp/nvstream_dp.c index 5e3c48da48..d5885bbf98 100644 --- a/source/adios2/toolkit/sst/dp/nvstream_dp.c +++ b/source/adios2/toolkit/sst/dp/nvstream_dp.c @@ -551,7 +551,6 @@ static DP_WS_Stream NvstreamInitWriter(CP_Services Svcs, void *CP_Stream, static void NvstreamDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { Nvstream_WS_Stream WS_Stream = (Nvstream_WS_Stream)WS_Stream_v; - printf("Calling finalize %p\n", WS_Stream->nvs); nvs_finalize_(WS_Stream->nvs); for (int i = 0; i < WS_Stream->ReaderCount; i++) { @@ -675,8 +674,6 @@ static void NvstreamProvideWriterDataToReader(CP_Services Svcs, { RS_Stream->writer_nvs[i] = nvs_open_store(RS_Stream->WriterContactInfo[i].ContactString); - printf("reader got pointer to writer NVS[%d] = %p\n", i, - RS_Stream->writer_nvs[i]); } } diff --git a/source/adios2/toolkit/sst/dp/nvswrapper.cpp b/source/adios2/toolkit/sst/dp/nvswrapper.cpp index 455ab72bf6..052e3ac439 100644 --- a/source/adios2/toolkit/sst/dp/nvswrapper.cpp +++ b/source/adios2/toolkit/sst/dp/nvswrapper.cpp @@ -9,11 +9,8 @@ extern "C" { void *nvs_create_store() { - std::cout << "calling GetInstance with a null entry" << std::endl; nvs::Store *st = nvs::StoreManager::GetInstance(""); std::string store_name = st->get_store_id(); - printf("Created Store_name %s, returning pointer %p\n", store_name.c_str(), - (void *)st); return (void *)st; } @@ -21,8 +18,6 @@ void *nvs_open_store(char *STORE_ID) { std::string store_name = STORE_ID; nvs::Store *st = nvs::StoreManager::GetInstance(store_name); - printf("Opened Store_name %s, returning pointer %p\n", store_name.c_str(), - (void *)st); return (void *)st; } @@ -46,7 +41,6 @@ void *nvs_get_with_malloc(void *vst, char *key, long version) void *ptr; nvs::Store *st = (nvs::Store *)vst; nvs::ErrorCode tmp = st->get_with_malloc(std::string(key), version, &ptr); - printf("Get from Store pointer %p\n", (void *)st); if (tmp != nvs::ErrorCode::NO_ERROR) { std::cout << "get with malloc error" << std::endl; @@ -59,14 +53,12 @@ void nvs_free_(void *vst, void *ptr) {} void nvs_snapshot_(void *vst, int *proc_id) { nvs::Store *st = (nvs::Store *)vst; - printf("Doing put_all on vst %p\n", vst); st->put_all(); } int nvs_finalize_(void *vst) { nvs::Store *st = (nvs::Store *)vst; - printf("Doing delete on vst %p\n", vst); delete st; } } From 41eb03a7b5f2a88ece5d835f84d0d99a4affe073 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Sat, 3 Aug 2019 07:09:40 -0400 Subject: [PATCH 3/4] clean up fread failures --- source/adios2/toolkit/sst/cp/cp_reader.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index 5aab1e8a94..764d6d0f1a 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -79,7 +79,13 @@ static char *readContactInfoFile(const char *Name, SstStream Stream, else { char Tmp[strlen(SSTMAGICV0)]; - fread(Tmp, strlen(SSTMAGICV0), 1, WriterInfo); + if (fread(Tmp, strlen(SSTMAGICV0), 1, WriterInfo) != 1) + { + fprintf(stderr, + "Filesystem read failed in SST Open, failing operation\n"); + fclose(WriterInfo); + Badfile++; + } Size -= strlen(SSTMAGICV0); if (strncmp(Tmp, SSTMAGICV0, strlen(SSTMAGICV0)) != 0) { @@ -99,8 +105,11 @@ static char *readContactInfoFile(const char *Name, SstStream Stream, char *Buffer = calloc(1, Size + 1); if (fread(Buffer, Size, 1, WriterInfo) != 1) { - fprintf(stderr, "Filesystem read failed, exiting\n"); - exit(1); + fprintf(stderr, + "Filesystem read failed in SST Open, failing operation\n"); + free(Buffer); + fclose(WriterInfo); + return NULL; } fclose(WriterInfo); return Buffer; From 7e086ff68a7f0f9116ba966ce1876a9720a6b973 Mon Sep 17 00:00:00 2001 From: Greg Eisenhauer Date: Sat, 3 Aug 2019 07:10:12 -0400 Subject: [PATCH 4/4] Remove unused preload functionality from nvstream_dp --- source/adios2/toolkit/sst/dp/nvstream_dp.c | 306 +-------------------- 1 file changed, 4 insertions(+), 302 deletions(-) diff --git a/source/adios2/toolkit/sst/dp/nvstream_dp.c b/source/adios2/toolkit/sst/dp/nvstream_dp.c index d5885bbf98..c95e01e666 100644 --- a/source/adios2/toolkit/sst/dp/nvstream_dp.c +++ b/source/adios2/toolkit/sst/dp/nvstream_dp.c @@ -73,7 +73,6 @@ typedef struct _Nvstream_WSR_Stream struct _Nvstream_WS_Stream *WS_Stream; CP_PeerCohort PeerCohort; int ReaderCohortSize; - int ReadPatternLockTimestep; char *ReaderRequests; struct _NvstreamReaderContactInfo *ReaderContactInfo; struct _NvstreamWriterContactInfo @@ -85,7 +84,6 @@ typedef struct _TimestepEntry long Timestep; struct _SstData Data; struct _NvstreamPerTimestepInfo *DP_TimestepInfo; - struct _ReaderRequestTrackRec *ReaderRequests; struct _TimestepEntry *Next; } * TimestepList; @@ -99,13 +97,6 @@ typedef struct _RSTimestepEntry struct _RSTimestepEntry *Next; } * RSTimestepList; -typedef struct _ReaderRequestTrackRec -{ - Nvstream_WSR_Stream Reader; - char *RequestList; - struct _ReaderRequestTrackRec *Next; -} * ReaderRequestTrackPtr; - typedef struct _Nvstream_WS_Stream { CManager cm; @@ -115,7 +106,6 @@ typedef struct _Nvstream_WS_Stream TimestepList Timesteps; CMFormat ReadReplyFormat; - CMFormat PreloadFormat; int ReaderCount; Nvstream_WSR_Stream *Readers; @@ -198,38 +188,6 @@ static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); -typedef struct _NvstreamPreloadMsg -{ - long Timestep; - size_t DataLength; - int WriterRank; - void *RS_Stream; - char *Data; -} * NvstreamPreloadMsg; - -static FMField NvstreamPreloadList[] = { - {"Timestep", "integer", sizeof(long), - FMOffset(NvstreamPreloadMsg, Timestep)}, - {"DataLength", "integer", sizeof(size_t), - FMOffset(NvstreamPreloadMsg, DataLength)}, - {"WriterRank", "integer", sizeof(size_t), - FMOffset(NvstreamPreloadMsg, WriterRank)}, - {"RS_Stream", "integer", sizeof(void *), - FMOffset(NvstreamPreloadMsg, RS_Stream)}, - {"Data", "char[DataLength]", sizeof(char), - FMOffset(NvstreamPreloadMsg, Data)}, - {NULL, NULL, 0, 0}}; - -static FMStructDescRec NvstreamPreloadStructs[] = { - {"NvstreamPreload", NvstreamPreloadList, sizeof(struct _NvstreamPreloadMsg), - NULL}, - {NULL, NULL, 0, NULL}}; - -static void NvstreamPreloadHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs); -static void DiscardPriorPreloaded(CP_Services Svcs, - Nvstream_RS_Stream RS_Stream, long Timestep); - static DP_RS_Stream NvstreamInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params) @@ -286,24 +244,9 @@ static DP_RS_Stream NvstreamInitReader(CP_Services Svcs, void *CP_Stream, static void NvstreamDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { Nvstream_RS_Stream RS_Stream = (Nvstream_RS_Stream)RS_Stream_v; - DiscardPriorPreloaded(Svcs, RS_Stream, -1); free(RS_Stream); } -static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader, - int RequestingRank) -{ - ReaderRequestTrackPtr ReqList = TS->ReaderRequests; - while (ReqList != NULL) - { - if (ReqList->Reader == Reader) - { - ReqList->RequestList[RequestingRank] = 1; - } - ReqList = ReqList->Next; - } -} - static void NvstreamReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs) @@ -329,7 +272,6 @@ static void NvstreamReadRequestHandler(CManager cm, CMConnection conn, { struct _NvstreamReadReplyMsg ReadReplyMsg; /* memset avoids uninit byte warnings from valgrind */ - MarkReadRequest(tmp, WSR_Stream, RequestingRank); memset(&ReadReplyMsg, 0, sizeof(ReadReplyMsg)); ReadReplyMsg.Timestep = ReadRequestMsg->Timestep; ReadReplyMsg.DataLength = ReadRequestMsg->Length; @@ -430,98 +372,6 @@ static void NvstreamReadReplyHandler(CManager cm, CMConnection conn, TAU_STOP_FUNC(); } -static int HandleRequestWithPreloaded(CP_Services Svcs, - Nvstream_RS_Stream RS_Stream, int Rank, - long Timestep, size_t Offset, - size_t Length, void *Buffer) -{ - RSTimestepList Entry = NULL; - pthread_mutex_lock(&RS_Stream->DataLock); - Entry = RS_Stream->QueuedTimesteps; - while (Entry && - ((Entry->WriterRank != Rank) || (Entry->Timestep != Timestep))) - { - Entry = Entry->Next; - } - pthread_mutex_unlock(&RS_Stream->DataLock); - if (!Entry) - { - return 0; - } - Svcs->verbose(RS_Stream->CP_Stream, - "Satisfying remote memory read with preload from writer rank " - "%d for timestep %ld\n", - Rank, Timestep); - memcpy(Buffer, Entry->Data + Offset, Length); - return 1; -} - -static void DiscardPriorPreloaded(CP_Services Svcs, - Nvstream_RS_Stream RS_Stream, long Timestep) -{ - RSTimestepList Entry, Last = NULL; - pthread_mutex_lock(&RS_Stream->DataLock); - Entry = RS_Stream->QueuedTimesteps; - - while (Entry) - { - RSTimestepList Next = Entry->Next; - if ((Entry->Timestep < Timestep) || (Timestep == -1)) - { - RSTimestepList ItemToFree = Entry; - CManager cm = Svcs->getCManager(RS_Stream->CP_Stream); - if (Last) - { - Last->Next = Entry->Next; - } - else - { - RS_Stream->QueuedTimesteps = Entry->Next; - } - /* free item */ - CMreturn_buffer(cm, ItemToFree->Data); - - free(ItemToFree); - } - else - { - Last = Entry; - } - Entry = Next; - } - pthread_mutex_unlock(&RS_Stream->DataLock); -} - -static void NvstreamPreloadHandler(CManager cm, CMConnection conn, void *msg_v, - void *client_Data, attr_list attrs) -{ - NvstreamPreloadMsg PreloadMsg = (NvstreamPreloadMsg)msg_v; - Nvstream_RS_Stream RS_Stream = PreloadMsg->RS_Stream; - CP_Services Svcs = (CP_Services)client_Data; - NvstreamCompletionHandle Handle = NULL; - RSTimestepList Entry = calloc(1, sizeof(*Entry)); - - Svcs->verbose( - RS_Stream->CP_Stream, - "Got a preload message from writer rank %d for timestep %ld\n", - PreloadMsg->WriterRank, PreloadMsg->Timestep); - - /* arrange for this message data to stay around */ - CMtake_buffer(cm, msg_v); - - Entry->Timestep = PreloadMsg->Timestep; - Entry->WriterRank = PreloadMsg->WriterRank; - Entry->Data = PreloadMsg->Data; - Entry->DataSize = PreloadMsg->DataLength; - Entry->DataStart = 0; - - pthread_mutex_lock(&RS_Stream->DataLock); - Entry->Next = RS_Stream->QueuedTimesteps; - RS_Stream->QueuedTimesteps = Entry; - pthread_mutex_unlock(&RS_Stream->DataLock); - return; -} - static DP_WS_Stream NvstreamInitWriter(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { @@ -589,8 +439,6 @@ static DP_WSR_Stream NvstreamInitWriterPerReader(CP_Services Svcs, WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */ WSR_Stream->PeerCohort = PeerCohort; - WSR_Stream->ReadPatternLockTimestep = -1; - WSR_Stream->ReaderRequests = NULL; /* * make a copy of writer contact information (original will not be @@ -753,16 +601,10 @@ static void *NvstreamReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, (NvstreamPerTimestepInfo)DP_TimestepInfo; struct _NvstreamReadRequestMsg ReadRequestMsg; - int HadPreload; static long LastRequestedTimestep = -1; - if ((LastRequestedTimestep != -1) && (LastRequestedTimestep != Timestep)) - { - DiscardPriorPreloaded(Svcs, Stream, Timestep); - } LastRequestedTimestep = Timestep; - HadPreload = HandleRequestWithPreloaded(Svcs, Stream, Rank, Timestep, - Offset, Length, Buffer); + ret->CPStream = Stream->CP_Stream; ret->DPStream = Stream; ret->Failed = 0; @@ -771,12 +613,6 @@ static void *NvstreamReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, ret->Rank = Rank; ret->CMcondition = -1; - if (HadPreload) - { - /* cool, we already had the data. Setup a dummy return handle */ - return ret; - } - Svcs->verbose(Stream->CP_Stream, "Adios requesting to read remote memory for Timestep %d " "from Rank %d, WSR_Stream = %p, nvs = %p\n", @@ -850,114 +686,6 @@ static void NvstreamNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, FailRequestsToRank(Svcs, cm, Stream, FailedPeerRank); } -static void NvstreamReaderRegisterTimestep(CP_Services Svcs, - DP_WSR_Stream WSRStream_v, - long Timestep, - int WriterDefinitionsLocked) -{ - Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSRStream_v; - Nvstream_WS_Stream WS_Stream = - WSR_Stream->WS_Stream; /* pointer to writer struct */ - - if (WriterDefinitionsLocked) - { - /* go ahead and record read patterns */ - TimestepList tmp = WS_Stream->Timesteps; - - while (tmp != NULL) - { - if (tmp->Timestep == Timestep) - { - ReaderRequestTrackPtr ReqTrk = calloc(1, sizeof(*ReqTrk)); - ReqTrk->Reader = WSR_Stream; - ReqTrk->RequestList = calloc(1, WSR_Stream->ReaderCohortSize); - ReqTrk->Next = tmp->ReaderRequests; - tmp->ReaderRequests = ReqTrk; - return; - } - } - tmp = tmp->Next; - printf("TIMESTEP NOT FOUND in READER REGISTER!\n"); - } -} - -static void SendPreloadMsgs(CP_Services Svcs, Nvstream_WSR_Stream WSR_Stream, - TimestepList TS) -{ - Nvstream_WS_Stream WS_Stream = - WSR_Stream->WS_Stream; /* pointer to writer struct */ - struct _NvstreamPreloadMsg PreloadMsg; - memset(&PreloadMsg, 0, sizeof(PreloadMsg)); - PreloadMsg.Timestep = TS->Timestep; - PreloadMsg.DataLength = TS->Data.DataSize; - PreloadMsg.Data = TS->Data.block; - PreloadMsg.WriterRank = WS_Stream->Rank; - - for (int i = 0; i < WSR_Stream->ReaderCohortSize; i++) - { - if (WSR_Stream->ReaderRequests[i]) - { - PreloadMsg.RS_Stream = WSR_Stream->ReaderContactInfo[i].RS_Stream; - CMwrite(WSR_Stream->ReaderContactInfo[i].Conn, - WS_Stream->PreloadFormat, &PreloadMsg); - } - } -} - -static void NvstreamReaderReleaseTimestep(CP_Services Svcs, - DP_WSR_Stream Stream_v, long Timestep) -{ - Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)Stream_v; - Nvstream_WS_Stream WS_Stream = - WSR_Stream->WS_Stream; /* pointer to writer struct */ - TimestepList tmp = WS_Stream->Timesteps; - - if (Timestep == WSR_Stream->ReadPatternLockTimestep) - { - /* save the pattern */ - while (tmp != NULL) - { - if (tmp->Timestep == Timestep) - { - ReaderRequestTrackPtr ReqList = tmp->ReaderRequests; - while (ReqList != NULL) - { - if (ReqList->Reader == WSR_Stream) - { - WSR_Stream->ReaderRequests = ReqList->RequestList; - /* so it doesn't get free'd */ - ReqList->RequestList = NULL; - } - ReqList = ReqList->Next; - } - } - tmp = tmp->Next; - } - if (WSR_Stream->ReaderRequests) - { - /* send out any already queued timesteps */ - tmp = WS_Stream->Timesteps; - while (tmp != NULL) - { - - SendPreloadMsgs(Svcs, WSR_Stream, tmp); - tmp = tmp->Next; - } - } - } -} - -static void NvstreamReadPatternLocked(CP_Services Svcs, - DP_WSR_Stream WSRStream_v, - long EffectiveTimestep) -{ - Nvstream_WSR_Stream WSR_Stream = (Nvstream_WSR_Stream)WSRStream_v; - Nvstream_WS_Stream WS_Stream = - WSR_Stream->WS_Stream; /* pointer to writer struct */ - - WSR_Stream->ReadPatternLockTimestep = EffectiveTimestep; -} - static void NvstreamProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, struct _SstData *Data, struct _SstData *LocalMetadata, @@ -1009,19 +737,6 @@ static void NvstreamReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, free(List->DP_TimestepInfo->CheckString); if (List->DP_TimestepInfo) free(List->DP_TimestepInfo); - if (List->ReaderRequests) - { - ReaderRequestTrackPtr tmp = List->ReaderRequests; - while (tmp) - { - ReaderRequestTrackPtr Next = tmp->Next; - if (tmp->RequestList) - free(tmp->RequestList); - free(tmp); - tmp = Next; - } - } - free(List); } else @@ -1037,19 +752,6 @@ static void NvstreamReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, free(List->DP_TimestepInfo->CheckString); if (List->DP_TimestepInfo) free(List->DP_TimestepInfo); - if (List->ReaderRequests) - { - ReaderRequestTrackPtr tmp = List->ReaderRequests; - while (tmp) - { - ReaderRequestTrackPtr Next = tmp->Next; - if (tmp->RequestList) - free(tmp->RequestList); - free(tmp); - tmp = Next; - } - } - free(List); return; } @@ -1128,9 +830,9 @@ extern CP_DP_Interface LoadNvstreamDP() nvstreamDPInterface.notifyConnFailure = NvstreamNotifyConnFailure; nvstreamDPInterface.provideTimestep = NvstreamProvideTimestep; nvstreamDPInterface.releaseTimestep = NvstreamReleaseTimestep; - nvstreamDPInterface.readerRegisterTimestep = NvstreamReaderRegisterTimestep; - nvstreamDPInterface.readerReleaseTimestep = NvstreamReaderReleaseTimestep; - nvstreamDPInterface.readPatternLocked = NvstreamReadPatternLocked; + nvstreamDPInterface.readerRegisterTimestep = NULL; + nvstreamDPInterface.readerReleaseTimestep = NULL; + nvstreamDPInterface.readPatternLocked = NULL; nvstreamDPInterface.destroyReader = NvstreamDestroyReader; nvstreamDPInterface.destroyWriter = NvstreamDestroyWriter; nvstreamDPInterface.destroyWriterPerReader = NvstreamDestroyWriterPerReader;