Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Crash processing inbound message for destroyed subscription #638

Merged
merged 1 commit into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 35 additions & 43 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,6 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
bool sc = false;
bool sm = false;
nats_MsgList *list = NULL;
natsMutex *mu = NULL;
natsCondition *cond = NULL;
// For JetStream cases
jsSub *jsi = NULL;
Expand All @@ -2626,75 +2625,72 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
int jct = 0;
natsMsgFilter mf = NULL;
void *mfc = NULL;
bool unlock = false;

// Do this outside of locks, even if we end-up having to destroy
// it because we have reached the maxPendingMsgs count or other
// conditions. This reduces lock contention.
s = _createMsg(&msg, nc, buf, bufLen, nc->ps->ma.hdr);
if (s != NATS_OK)
return s;
// bufLen is the total length of headers + data. Since headers become
// more and more prevalent, it makes sense to count them both toward
// the subscription's pending limit. So use bufLen for accounting.

natsMutex_Lock(nc->subsMu);

nc->stats.inMsgs += 1;
nc->stats.inBytes += (uint64_t) bufLen;

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);
if (sub != NULL)
if ((mf = nc->filter) != NULL)
{
mf = nc->filter;
mfc = nc->filterClosure;
natsMutex_Unlock(nc->subsMu);

(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;

natsMutex_Lock(nc->subsMu);
}
natsMutex_Unlock(nc->subsMu);

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);
if (sub == NULL)
{
natsMutex_Unlock(nc->subsMu);
natsMsg_Destroy(msg);
return NATS_OK;
}
// We need to retain the subscription since as soon as we release the
// nc->subsMu lock, the subscription could be destroyed and we would
// reference freed memory.
natsSubAndLdw_LockAndRetain(sub);

// Do this outside of sub's lock, even if we end-up having to destroy
// it because we have reached the maxPendingMsgs count. This reduces
// lock contention.
s = _createMsg(&msg, nc, buf, bufLen, nc->ps->ma.hdr);
if (s != NATS_OK)
return s;
// bufLen is the total length of headers + data. Since headers become
// more and more prevalent, it makes sense to count them both toward
// the subscription's pending limit. So use bufLen for accounting.
natsMutex_Unlock(nc->subsMu);

if (mf != NULL)
if (sub->closed || sub->drainSkip)
{
(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;
natsSubAndLdw_UnlockAndRelease(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}

// Pick mutex, condition variable and list based on if the sub is
// Pick condition variable and list based on if the sub is
// part of a global delivery thread pool or not.
// Note about `list`: this is used only to link messages, but
// sub->msgList needs to be used to update/check number of pending
// messages, since in case of delivery thread pool, `list` will have
// messages from many different subscriptions.
if ((ldw = sub->libDlvWorker) != NULL)
{
mu = ldw->lock;
cond = ldw->cond;
list = &(ldw->msgList);
if (sub->jsi != NULL)
{
natsSub_Lock(sub);
unlock = true;
}
}
else
{
mu = sub->mu;
cond = sub->cond;
list = &(sub->msgList);
}

natsMutex_Lock(mu);
if (sub->closed || sub->drainSkip)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}

jsi = sub->jsi;
// For JS subscriptions (but not pull ones), handle hearbeat and flow control here.
if (jsi && !jsi->pull)
Expand All @@ -2714,9 +2710,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
s = jsSub_checkOrderedMsg(sub, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsSubAndLdw_UnlockAndRelease(sub);
natsMsg_Destroy(msg);
return s;
}
Expand Down Expand Up @@ -2798,9 +2792,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
}
}

natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsSubAndLdw_UnlockAndRelease(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand Down
22 changes: 22 additions & 0 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,35 @@ natsSubAndLdw_Lock(natsSubscription *sub)
SUB_DLV_WORKER_LOCK(sub);
}

void
natsSubAndLdw_LockAndRetain(natsSubscription *sub)
{
natsMutex_Lock(sub->mu);
sub->refs++;
SUB_DLV_WORKER_LOCK(sub);
}

void
natsSubAndLdw_Unlock(natsSubscription *sub)
{
SUB_DLV_WORKER_UNLOCK(sub);
natsMutex_Unlock(sub->mu);
}

void
natsSubAndLdw_UnlockAndRelease(natsSubscription *sub)
{
int refs = 0;

SUB_DLV_WORKER_UNLOCK(sub);

refs = --(sub->refs);
natsMutex_Unlock(sub->mu);

if (refs == 0)
_freeSubscription(sub);
}

// Runs under the subscription lock but will release it for a JS subscription
// if the JS consumer needs to be deleted.
static void
Expand Down
6 changes: 6 additions & 0 deletions src/sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ natsSubAndLdw_Lock(natsSubscription *sub);
void
natsSubAndLdw_Unlock(natsSubscription *sub);

void
natsSubAndLdw_LockAndRetain(natsSubscription *sub);

void
natsSubAndLdw_UnlockAndRelease(natsSubscription *sub);

void
natsSub_close(natsSubscription *sub, bool connectionClosed);

Expand Down