Skip to content

Commit

Permalink
Fix performance issue for request-reply
Browse files Browse the repository at this point in the history
* Introduced a new API `natsSubscription_NoDelay()` to use when one wants immediate notification that a message is received. This is especially important for request-reply pattern.
* Added a test for this new API.
* Updated README.md
  • Loading branch information
kozlovic committed Oct 29, 2015
1 parent 539ecfe commit 57689d7
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 24 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure
```
This is the same for all other callbacks used in the C NATS library.
Usually, delivey of messages is somehow delayed in favor of a better throughput. However, there are cases where the introduction of this delay will be detrimental to performance, for instance with the request-reply pattern. To counter this, you can use the following API:
```c
natsSubscription_NoDelay(natsSubscription *sub);
```
This will instruct the library to notify the delivery thread (for async subscribers) or the `natSubscription_NextMsg()` call (for sync subscribers) immediately when a message is available.

Check `examples/replier.c` for a demonstration of the usage of this call.


## Clustered Usage

Expand Down
2 changes: 2 additions & 0 deletions examples/replier.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ int main(int argc, char **argv)
else
s = natsConnection_SubscribeSync(&sub, conn, subj);
}
if (s == NATS_OK)
s = natsSubscription_NoDeliveryDelay(sub);
if (s == NATS_OK)
s = natsSubscription_AutoUnsubscribe(sub, total);

Expand Down
12 changes: 9 additions & 3 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,13 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)

sub->msgList.count++;

if (sub->signalTimerInterval != 1)
if ((sub->noDelay)
|| (sub->msgList.count >= sub->signalLimit))
{
if (sub->inWait)
natsCondition_Signal(sub->cond);
}
else if (sub->signalTimerInterval != 1)
{
sub->signalTimerInterval = 1;
natsTimer_Reset(sub->signalTimer, 1);
Expand Down Expand Up @@ -1608,7 +1614,7 @@ natsConn_removeSubscription(natsConnection *nc, natsSubscription *removedSub, bo
natsStatus
natsConn_subscribe(natsSubscription **newSub,
natsConnection *nc, const char *subj, const char *queue,
natsMsgHandler cb, void *cbClosure)
natsMsgHandler cb, void *cbClosure, bool noDelay)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
Expand All @@ -1628,7 +1634,7 @@ natsConn_subscribe(natsSubscription **newSub,
return NATS_CONNECTION_CLOSED;
}

s = natsSub_create(&sub, nc, subj, queue, cb, cbClosure);
s = natsSub_create(&sub, nc, subj, queue, cb, cbClosure, noDelay);
if (s == NATS_OK)
{
sub->sid = ++(nc->ssid);
Expand Down
2 changes: 1 addition & 1 deletion src/conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ natsConn_processPong(natsConnection *nc);
natsStatus
natsConn_subscribe(natsSubscription **newSub,
natsConnection *nc, const char *subj, const char *queue,
natsMsgHandler cb, void *cbClosure);
natsMsgHandler cb, void *cbClosure, bool noDelay);

natsStatus
natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max);
Expand Down
10 changes: 10 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,16 @@ natsStatus
natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc,
const char *subject, const char *queueGroup);

/*
* By default, messages that arrive are not immediately delivered. This
* generally improves performance. However, in case of request-reply,
* this delay has a negative impact. In such case, call this function
* to have the subscriber be notified immediately each time a message
* arrives.
*/
natsStatus
natsSubscription_NoDeliveryDelay(natsSubscription *sub);

/*
* Return the next message available to a synchronous subscriber or block until
* one is available.
Expand Down
6 changes: 4 additions & 2 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//#define DEV_MODE (1)

static const char* CString = "C";
static const char* Version = "0.1.7-alpha";
static const char* Version = "0.1.9-alpha";

static const char* NATS_DEFAULT_URL = "nats://localhost:4222";

Expand Down Expand Up @@ -180,7 +180,8 @@ typedef struct __natsSubscription
natsTimer *signalTimer;
int64_t signalTimerInterval;
int signalFailCount;
bool signaled;
int signalLimit;
bool inWait;
bool closed;

natsMsgHandler msgCb;
Expand All @@ -191,6 +192,7 @@ typedef struct __natsSubscription
int pendingMax;

bool slowConsumer;
bool noDelay;

} natsSubscription;

Expand Down
2 changes: 1 addition & 1 deletion src/pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj,

s = natsInbox_Create(&inbox);
if (s == NATS_OK)
s = natsConn_subscribe(&sub, nc, inbox, NULL, NULL, NULL);
s = natsConn_subscribe(&sub, nc, inbox, NULL, NULL, NULL, true);
if (s == NATS_OK)
s = natsSubscription_AutoUnsubscribe(sub, 1);
if (s == NATS_OK)
Expand Down
66 changes: 50 additions & 16 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,12 @@ natsSub_deliverMsgs(void *arg)
{
natsSub_Lock(sub);

while ((sub->msgList.count == 0)
&& !(sub->signaled)
&& !(sub->closed))
{
sub->inWait = true;

while ((sub->msgList.count == 0) && !(sub->closed))
natsCondition_Wait(sub->cond, sub->mu);
}

sub->signaled = false;
sub->inWait = false;

if (sub->closed)
{
Expand Down Expand Up @@ -186,10 +184,9 @@ _signalMsgAvailable(natsTimer *timer, void *closure)
sub->signalTimerInterval = 10000;
natsTimer_Reset(sub->signalTimer, sub->signalTimerInterval);
}
else
else if (sub->inWait)
{
// Signal the delivery thread.
sub->signaled = true;
natsCondition_Signal(sub->cond);
}

Expand Down Expand Up @@ -220,7 +217,8 @@ natsSub_close(natsSubscription *sub)

natsStatus
natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
const char *queueGroup, natsMsgHandler cb, void *cbClosure)
const char *queueGroup, natsMsgHandler cb, void *cbClosure,
bool noDelay)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
Expand All @@ -242,6 +240,8 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
sub->conn = nc;
sub->msgCb = cb;
sub->msgCbClosure = cbClosure;
sub->noDelay = noDelay;
sub->signalLimit = (int)(nc->opts->maxPendingMsgs * 0.75);

sub->subject = NATS_STRDUP(subj);
if (sub->subject == NULL)
Expand All @@ -255,7 +255,7 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
}
if (s == NATS_OK)
s = natsCondition_Create(&(sub->cond));
if (s == NATS_OK)
if ((s == NATS_OK) && !(sub->noDelay))
{
// Set the interval to any value, really, it will get reset to
// a smaller value when the delivery thread should be signaled.
Expand Down Expand Up @@ -302,7 +302,7 @@ natsStatus
natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject,
natsMsgHandler cb, void *cbClosure)
{
return natsConn_subscribe(sub, nc, subject, NULL, cb, cbClosure);
return natsConn_subscribe(sub, nc, subject, NULL, cb, cbClosure, false);
}

/*
Expand All @@ -311,7 +311,7 @@ natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char
natsStatus
natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject)
{
return natsConn_subscribe(sub, nc, subject, NULL, NULL, NULL);
return natsConn_subscribe(sub, nc, subject, NULL, NULL, NULL, false);
}

/*
Expand All @@ -328,7 +328,8 @@ natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc,
if ((queueGroup == NULL) || (strlen(queueGroup) == 0) || (cb == NULL))
return NATS_INVALID_ARG;

return natsConn_subscribe(sub, nc, subject, queueGroup, cb, cbClosure);
return natsConn_subscribe(sub, nc, subject, queueGroup, cb, cbClosure,
false);
}

/*
Expand All @@ -341,9 +342,38 @@ natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc,
if ((queueGroup == NULL) || (strlen(queueGroup) == 0))
return NATS_INVALID_ARG;

return natsConn_subscribe(sub, nc, subject, queueGroup, NULL, NULL);
return natsConn_subscribe(sub, nc, subject, queueGroup, NULL, NULL,
false);
}

/*
* By default, messages that arrive are not immediately delivered. This
* generally improves performance. However, in case of request-reply,
* this delay has a negative impact. In such case, call this function
* to have the subscriber be notified immediately each time a message
* arrives.
*/
natsStatus
natsSubscription_NoDeliveryDelay(natsSubscription *sub)
{
if (sub == NULL)
return NATS_INVALID_ARG;

natsSub_Lock(sub);

if (!(sub->noDelay))
{
sub->noDelay = true;

natsTimer_Stop(sub->signalTimer);
}

natsSub_Unlock(sub);

return NATS_OK;
}


/*
* Return the next message available to a synchronous subscriber or block until
* one is available. A timeout can be used to return when no message has been
Expand All @@ -358,6 +388,9 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo
bool removeSub = false;
int64_t target = 0;

if ((sub == NULL) || (nextMsg == NULL))
return NATS_INVALID_ARG;

natsSub_Lock(sub);

if (sub->closed)
Expand All @@ -384,8 +417,9 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo

if (timeout > 0)
{
sub->inWait = true;

while ((sub->msgList.count == 0)
&& !(sub->signaled)
&& (s != NATS_TIMEOUT)
&& !(sub->closed))
{
Expand All @@ -395,7 +429,7 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo
s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, target);
}

sub->signaled = false;
sub->inWait = false;

if (sub->closed)
s = NATS_INVALID_SUBSCRIPTION;
Expand Down
6 changes: 5 additions & 1 deletion src/sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ natsSub_release(natsSubscription *sub);

natsStatus
natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj,
const char *queueGroup, natsMsgHandler cb, void *cbClosure);
const char *queueGroup, natsMsgHandler cb, void *cbClosure,
bool noDelay);

void
natsSub_close(natsSubscription *sub);

// PUBLIC

natsStatus
natsSubscription_NoDeliveryDelay(natsSubscription *sub);

natsStatus
natsSubscription_QueuedMsgs(natsSubscription *sub, uint64_t *queuedMsgs);

Expand Down
78 changes: 78 additions & 0 deletions src/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,8 @@ _recvTestString(natsConnection *nc, natsSubscription *sub, natsMsg *msg,
arg->status = natsConnection_PublishString(nc,
natsMsg_GetReply(msg),
arg->string);
if (arg->status == NATS_OK)
arg->status = natsConnection_Flush(nc);
break;
}
case 5:
Expand Down Expand Up @@ -5278,6 +5280,81 @@ test_NextMsgCallOnAsyncSub(void)
_stopServer(serverPid);
}

static void
test_NoDelay(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid serverPid = NATS_INVALID_PID;
struct threadArg arg;
int i;
int64_t start;
int64_t withDelay;
int64_t withNoDelay;
int count = 3000;

if (getenv("VALGRIND") != NULL)
count = 1000;

PRINT_TEST_NAME();

s = _createDefaultThreadArgsForCbTests(&arg);
if (s != NATS_OK)
FAIL("Unable to setup test");

serverPid = _startServer(NATS_DEFAULT_URL, NULL, true);
if (serverPid == NATS_INVALID_PID)
FAIL("Unable to start or verify that the server was started!");

arg.control = 4;
arg.string = "reply";

s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
if (s == NATS_OK)
s = natsConnection_Subscribe(&sub, nc, "foo", _recvTestString, (void*) &arg);

test("With delay: ");
start = nats_Now();
for (i=0; (s == NATS_OK) && (i<count); i++)
{
s = natsConnection_Request(&msg, nc, "foo", "help", 4, 1000);
if (s == NATS_OK)
natsMsg_Destroy(msg);
}
withDelay = nats_Now() - start;

natsMutex_Lock(arg.m);
testCond((s == NATS_OK) && (arg.status == NATS_OK));
natsMutex_Unlock(arg.m);

if (s == NATS_OK)
s = natsSubscription_NoDeliveryDelay(sub);

test("With no delay faster for req-reply: ");
start = nats_Now();
for (i=0; (s == NATS_OK) && (i<count); i++)
{
s = natsConnection_Request(&msg, nc, "foo", "help", 4, 1000);
if (s == NATS_OK)
natsMsg_Destroy(msg);
}
withNoDelay = nats_Now() - start;

natsMutex_Lock(arg.m);
testCond((s == NATS_OK) && (arg.status == NATS_OK) && (withNoDelay < withDelay));
natsMutex_Unlock(arg.m);

natsSubscription_Destroy(sub);

natsConnection_Destroy(nc);

_destroyDefaultThreadArgs(&arg);

_stopServer(serverPid);
}

static void
test_ServersOption(void)
{
Expand Down Expand Up @@ -6141,6 +6218,7 @@ int main(int argc, char **argv)
test_AsyncSubscriberStarvation();
test_AsyncSubscriberOnClose();
test_NextMsgCallOnAsyncSub();
test_NoDelay();

printf("\n== Clusters Tests ==\n");

Expand Down

0 comments on commit 57689d7

Please sign in to comment.