From 57689d77ea4bb147926b118c25ee88f7066a19f1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 28 Oct 2015 20:15:05 -0600 Subject: [PATCH] Fix performance issue for request-reply * Introduced a new API `natsSubscription_NoDelay()` to use when one wants immediate notification that a message is received. This is especially important for request-reply pattern. * Added a test for this new API. * Updated README.md --- README.md | 8 +++++ examples/replier.c | 2 ++ src/conn.c | 12 +++++-- src/conn.h | 2 +- src/nats.h | 10 ++++++ src/natsp.h | 6 ++-- src/pub.c | 2 +- src/sub.c | 66 +++++++++++++++++++++++++++++---------- src/sub.h | 6 +++- src/test.c | 78 ++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 168 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 9a0798aa5..3295a015b 100644 --- a/README.md +++ b/README.md @@ -284,6 +284,14 @@ asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure ``` This is the same for all other callbacks used in the C NATS library. +Usually, delivey of messages is somehow delayed in favor of a better throughput. However, there are cases where the introduction of this delay will be detrimental to performance, for instance with the request-reply pattern. To counter this, you can use the following API: +```c +natsSubscription_NoDelay(natsSubscription *sub); +``` +This will instruct the library to notify the delivery thread (for async subscribers) or the `natSubscription_NextMsg()` call (for sync subscribers) immediately when a message is available. + +Check `examples/replier.c` for a demonstration of the usage of this call. + ## Clustered Usage diff --git a/examples/replier.c b/examples/replier.c index e9749f58a..af8f74e27 100644 --- a/examples/replier.c +++ b/examples/replier.c @@ -136,6 +136,8 @@ int main(int argc, char **argv) else s = natsConnection_SubscribeSync(&sub, conn, subj); } + if (s == NATS_OK) + s = natsSubscription_NoDeliveryDelay(sub); if (s == NATS_OK) s = natsSubscription_AutoUnsubscribe(sub, total); diff --git a/src/conn.c b/src/conn.c index 4bdc32ad1..4ea3795fc 100644 --- a/src/conn.c +++ b/src/conn.c @@ -1491,7 +1491,13 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen) sub->msgList.count++; - if (sub->signalTimerInterval != 1) + if ((sub->noDelay) + || (sub->msgList.count >= sub->signalLimit)) + { + if (sub->inWait) + natsCondition_Signal(sub->cond); + } + else if (sub->signalTimerInterval != 1) { sub->signalTimerInterval = 1; natsTimer_Reset(sub->signalTimer, 1); @@ -1608,7 +1614,7 @@ natsConn_removeSubscription(natsConnection *nc, natsSubscription *removedSub, bo natsStatus natsConn_subscribe(natsSubscription **newSub, natsConnection *nc, const char *subj, const char *queue, - natsMsgHandler cb, void *cbClosure) + natsMsgHandler cb, void *cbClosure, bool noDelay) { natsStatus s = NATS_OK; natsSubscription *sub = NULL; @@ -1628,7 +1634,7 @@ natsConn_subscribe(natsSubscription **newSub, return NATS_CONNECTION_CLOSED; } - s = natsSub_create(&sub, nc, subj, queue, cb, cbClosure); + s = natsSub_create(&sub, nc, subj, queue, cb, cbClosure, noDelay); if (s == NATS_OK) { sub->sid = ++(nc->ssid); diff --git a/src/conn.h b/src/conn.h index 8fe7001fe..58306cf66 100644 --- a/src/conn.h +++ b/src/conn.h @@ -59,7 +59,7 @@ natsConn_processPong(natsConnection *nc); natsStatus natsConn_subscribe(natsSubscription **newSub, natsConnection *nc, const char *subj, const char *queue, - natsMsgHandler cb, void *cbClosure); + natsMsgHandler cb, void *cbClosure, bool noDelay); natsStatus natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max); diff --git a/src/nats.h b/src/nats.h index b6b1c1490..721e11322 100644 --- a/src/nats.h +++ b/src/nats.h @@ -590,6 +590,16 @@ natsStatus natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject, const char *queueGroup); +/* + * By default, messages that arrive are not immediately delivered. This + * generally improves performance. However, in case of request-reply, + * this delay has a negative impact. In such case, call this function + * to have the subscriber be notified immediately each time a message + * arrives. + */ +natsStatus +natsSubscription_NoDeliveryDelay(natsSubscription *sub); + /* * Return the next message available to a synchronous subscriber or block until * one is available. diff --git a/src/natsp.h b/src/natsp.h index 345140747..01d9455a0 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -30,7 +30,7 @@ //#define DEV_MODE (1) static const char* CString = "C"; -static const char* Version = "0.1.7-alpha"; +static const char* Version = "0.1.9-alpha"; static const char* NATS_DEFAULT_URL = "nats://localhost:4222"; @@ -180,7 +180,8 @@ typedef struct __natsSubscription natsTimer *signalTimer; int64_t signalTimerInterval; int signalFailCount; - bool signaled; + int signalLimit; + bool inWait; bool closed; natsMsgHandler msgCb; @@ -191,6 +192,7 @@ typedef struct __natsSubscription int pendingMax; bool slowConsumer; + bool noDelay; } natsSubscription; diff --git a/src/pub.c b/src/pub.c index 2d70e2285..08b7187a1 100644 --- a/src/pub.c +++ b/src/pub.c @@ -224,7 +224,7 @@ natsConnection_Request(natsMsg **replyMsg, natsConnection *nc, const char *subj, s = natsInbox_Create(&inbox); if (s == NATS_OK) - s = natsConn_subscribe(&sub, nc, inbox, NULL, NULL, NULL); + s = natsConn_subscribe(&sub, nc, inbox, NULL, NULL, NULL, true); if (s == NATS_OK) s = natsSubscription_AutoUnsubscribe(sub, 1); if (s == NATS_OK) diff --git a/src/sub.c b/src/sub.c index 5f56f7af3..407b1b9cb 100644 --- a/src/sub.c +++ b/src/sub.c @@ -106,14 +106,12 @@ natsSub_deliverMsgs(void *arg) { natsSub_Lock(sub); - while ((sub->msgList.count == 0) - && !(sub->signaled) - && !(sub->closed)) - { + sub->inWait = true; + + while ((sub->msgList.count == 0) && !(sub->closed)) natsCondition_Wait(sub->cond, sub->mu); - } - sub->signaled = false; + sub->inWait = false; if (sub->closed) { @@ -186,10 +184,9 @@ _signalMsgAvailable(natsTimer *timer, void *closure) sub->signalTimerInterval = 10000; natsTimer_Reset(sub->signalTimer, sub->signalTimerInterval); } - else + else if (sub->inWait) { // Signal the delivery thread. - sub->signaled = true; natsCondition_Signal(sub->cond); } @@ -220,7 +217,8 @@ natsSub_close(natsSubscription *sub) natsStatus natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, - const char *queueGroup, natsMsgHandler cb, void *cbClosure) + const char *queueGroup, natsMsgHandler cb, void *cbClosure, + bool noDelay) { natsStatus s = NATS_OK; natsSubscription *sub = NULL; @@ -242,6 +240,8 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, sub->conn = nc; sub->msgCb = cb; sub->msgCbClosure = cbClosure; + sub->noDelay = noDelay; + sub->signalLimit = (int)(nc->opts->maxPendingMsgs * 0.75); sub->subject = NATS_STRDUP(subj); if (sub->subject == NULL) @@ -255,7 +255,7 @@ natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, } if (s == NATS_OK) s = natsCondition_Create(&(sub->cond)); - if (s == NATS_OK) + if ((s == NATS_OK) && !(sub->noDelay)) { // Set the interval to any value, really, it will get reset to // a smaller value when the delivery thread should be signaled. @@ -302,7 +302,7 @@ natsStatus natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char *subject, natsMsgHandler cb, void *cbClosure) { - return natsConn_subscribe(sub, nc, subject, NULL, cb, cbClosure); + return natsConn_subscribe(sub, nc, subject, NULL, cb, cbClosure, false); } /* @@ -311,7 +311,7 @@ natsConnection_Subscribe(natsSubscription **sub, natsConnection *nc, const char natsStatus natsConnection_SubscribeSync(natsSubscription **sub, natsConnection *nc, const char *subject) { - return natsConn_subscribe(sub, nc, subject, NULL, NULL, NULL); + return natsConn_subscribe(sub, nc, subject, NULL, NULL, NULL, false); } /* @@ -328,7 +328,8 @@ natsConnection_QueueSubscribe(natsSubscription **sub, natsConnection *nc, if ((queueGroup == NULL) || (strlen(queueGroup) == 0) || (cb == NULL)) return NATS_INVALID_ARG; - return natsConn_subscribe(sub, nc, subject, queueGroup, cb, cbClosure); + return natsConn_subscribe(sub, nc, subject, queueGroup, cb, cbClosure, + false); } /* @@ -341,9 +342,38 @@ natsConnection_QueueSubscribeSync(natsSubscription **sub, natsConnection *nc, if ((queueGroup == NULL) || (strlen(queueGroup) == 0)) return NATS_INVALID_ARG; - return natsConn_subscribe(sub, nc, subject, queueGroup, NULL, NULL); + return natsConn_subscribe(sub, nc, subject, queueGroup, NULL, NULL, + false); +} + +/* + * By default, messages that arrive are not immediately delivered. This + * generally improves performance. However, in case of request-reply, + * this delay has a negative impact. In such case, call this function + * to have the subscriber be notified immediately each time a message + * arrives. + */ +natsStatus +natsSubscription_NoDeliveryDelay(natsSubscription *sub) +{ + if (sub == NULL) + return NATS_INVALID_ARG; + + natsSub_Lock(sub); + + if (!(sub->noDelay)) + { + sub->noDelay = true; + + natsTimer_Stop(sub->signalTimer); + } + + natsSub_Unlock(sub); + + return NATS_OK; } + /* * Return the next message available to a synchronous subscriber or block until * one is available. A timeout can be used to return when no message has been @@ -358,6 +388,9 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo bool removeSub = false; int64_t target = 0; + if ((sub == NULL) || (nextMsg == NULL)) + return NATS_INVALID_ARG; + natsSub_Lock(sub); if (sub->closed) @@ -384,8 +417,9 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo if (timeout > 0) { + sub->inWait = true; + while ((sub->msgList.count == 0) - && !(sub->signaled) && (s != NATS_TIMEOUT) && !(sub->closed)) { @@ -395,7 +429,7 @@ natsSubscription_NextMsg(natsMsg **nextMsg, natsSubscription *sub, int64_t timeo s = natsCondition_AbsoluteTimedWait(sub->cond, sub->mu, target); } - sub->signaled = false; + sub->inWait = false; if (sub->closed) s = NATS_INVALID_SUBSCRIPTION; diff --git a/src/sub.h b/src/sub.h index ba1efd4d8..06e578cc1 100644 --- a/src/sub.h +++ b/src/sub.h @@ -28,13 +28,17 @@ natsSub_release(natsSubscription *sub); natsStatus natsSub_create(natsSubscription **newSub, natsConnection *nc, const char *subj, - const char *queueGroup, natsMsgHandler cb, void *cbClosure); + const char *queueGroup, natsMsgHandler cb, void *cbClosure, + bool noDelay); void natsSub_close(natsSubscription *sub); // PUBLIC +natsStatus +natsSubscription_NoDeliveryDelay(natsSubscription *sub); + natsStatus natsSubscription_QueuedMsgs(natsSubscription *sub, uint64_t *queuedMsgs); diff --git a/src/test.c b/src/test.c index 2794e6543..5ac4b4db4 100644 --- a/src/test.c +++ b/src/test.c @@ -2325,6 +2325,8 @@ _recvTestString(natsConnection *nc, natsSubscription *sub, natsMsg *msg, arg->status = natsConnection_PublishString(nc, natsMsg_GetReply(msg), arg->string); + if (arg->status == NATS_OK) + arg->status = natsConnection_Flush(nc); break; } case 5: @@ -5278,6 +5280,81 @@ test_NextMsgCallOnAsyncSub(void) _stopServer(serverPid); } +static void +test_NoDelay(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsPid serverPid = NATS_INVALID_PID; + struct threadArg arg; + int i; + int64_t start; + int64_t withDelay; + int64_t withNoDelay; + int count = 3000; + + if (getenv("VALGRIND") != NULL) + count = 1000; + + PRINT_TEST_NAME(); + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test"); + + serverPid = _startServer(NATS_DEFAULT_URL, NULL, true); + if (serverPid == NATS_INVALID_PID) + FAIL("Unable to start or verify that the server was started!"); + + arg.control = 4; + arg.string = "reply"; + + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + if (s == NATS_OK) + s = natsConnection_Subscribe(&sub, nc, "foo", _recvTestString, (void*) &arg); + + test("With delay: "); + start = nats_Now(); + for (i=0; (s == NATS_OK) && (i