diff --git a/src/dpiConn.c b/src/dpiConn.c index 82aea31e..67b6a8ef 100644 --- a/src/dpiConn.c +++ b/src/dpiConn.c @@ -2147,6 +2147,7 @@ int dpiConn_subscribe(dpiConn *conn, dpiSubscrCreateParams *params, int dpiConn_unsubscribe(dpiConn *conn, dpiSubscr *subscr) { dpiError error; + int status; if (dpiConn__check(conn, __func__, &error) < 0) return dpiGen__endPublicFn(conn, DPI_FAILURE, &error); @@ -2154,9 +2155,13 @@ int dpiConn_unsubscribe(dpiConn *conn, dpiSubscr *subscr) &error) < 0) return dpiGen__endPublicFn(conn, DPI_FAILURE, &error); if (subscr->registered) { - if (dpiOci__subscriptionUnRegister(conn, subscr, &error) < 0) + dpiMutex__acquire(subscr->mutex); + status = dpiOci__subscriptionUnRegister(conn, subscr, &error); + if (status == DPI_SUCCESS) + subscr->registered = 0; + dpiMutex__release(subscr->mutex); + if (status < 0) return dpiGen__endPublicFn(subscr, DPI_FAILURE, &error); - subscr->registered = 0; } dpiGen__setRefCount(subscr, &error, -1); diff --git a/src/dpiImpl.h b/src/dpiImpl.h index d0db476b..e008e09a 100644 --- a/src/dpiImpl.h +++ b/src/dpiImpl.h @@ -1047,6 +1047,7 @@ struct dpiSubscr { dpiType_HEAD dpiConn *conn; // connection which created this void *handle; // OCI subscription handle + dpiMutexType mutex; // enables thread safety dpiSubscrNamespace subscrNamespace; // OCI namespace dpiSubscrQOS qos; // quality of service flags dpiSubscrCallback callback; // callback when event is propagated diff --git a/src/dpiSubscr.c b/src/dpiSubscr.c index d0750c59..c4b00a55 100644 --- a/src/dpiSubscr.c +++ b/src/dpiSubscr.c @@ -41,8 +41,18 @@ static void dpiSubscr__callback(dpiSubscr *subscr, UNUSED void *handle, // ensure that the subscription handle is still valid if (dpiGen__startPublicFn(subscr, DPI_HTYPE_SUBSCR, __func__, 1, - &error) < 0) + &error) < 0) { dpiGen__endPublicFn(subscr, DPI_FAILURE, &error); + return; + } + + // if the subscription is no longer registered, nothing further to do + dpiMutex__acquire(subscr->mutex); + if (!subscr->registered) { + dpiMutex__release(subscr->mutex); + dpiGen__endPublicFn(subscr, DPI_SUCCESS, &error); + return; + } // populate message memset(&message, 0, sizeof(message)); @@ -52,11 +62,13 @@ static void dpiSubscr__callback(dpiSubscr *subscr, UNUSED void *handle, } message.registered = subscr->registered; - // invoke user callback + // invoke user callback; temporarily increase reference count to ensure + // that the subscription is not freed during the callback + dpiGen__setRefCount(subscr, &error, 1); (*subscr->callback)(subscr->callbackContext, &message); - - // clean up message dpiSubscr__freeMessage(&message); + dpiMutex__release(subscr->mutex); + dpiGen__setRefCount(subscr, &error, -1); dpiGen__endPublicFn(subscr, DPI_SUCCESS, &error); } @@ -95,6 +107,7 @@ int dpiSubscr__create(dpiSubscr *subscr, dpiConn *conn, subscr->callbackContext = params->callbackContext; subscr->subscrNamespace = params->subscrNamespace; subscr->qos = params->qos; + dpiMutex__initialize(subscr->mutex); // create the subscription handle if (dpiOci__handleAlloc(conn->env->handle, &subscr->handle, @@ -237,6 +250,7 @@ int dpiSubscr__create(dpiSubscr *subscr, dpiConn *conn, //----------------------------------------------------------------------------- void dpiSubscr__free(dpiSubscr *subscr, dpiError *error) { + dpiMutex__acquire(subscr->mutex); if (subscr->handle) { if (subscr->registered) dpiOci__subscriptionUnRegister(subscr->conn, subscr, error); @@ -247,6 +261,8 @@ void dpiSubscr__free(dpiSubscr *subscr, dpiError *error) dpiGen__setRefCount(subscr->conn, error, -1); subscr->conn = NULL; } + dpiMutex__release(subscr->mutex); + dpiMutex__destroy(subscr->mutex); dpiUtils__freeMemory(subscr); }