diff --git a/src/conn.c b/src/conn.c index bb01cbbe8..ae43c302f 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2600,10 +2600,10 @@ _createMsg(natsMsg **newMsg, natsConnection *nc, char *buf, int bufLen, int hdrL replyLen = natsBuf_Len(nc->ps->ma.reply); } - s = natsMsg_create(newMsg, + s = natsMsg_createWithPadding(newMsg, (const char*) natsBuf_Data(nc->ps->ma.subject), subjLen, (const char*) reply, replyLen, - (const char*) buf, bufLen, hdrLen); + (const char*) buf, bufLen, nc->opts->payloadPaddingSize, hdrLen); return s; } diff --git a/src/msg.c b/src/msg.c index 1eaba6f6b..fa23ec103 100644 --- a/src/msg.c +++ b/src/msg.c @@ -744,16 +744,18 @@ natsMsg_GetTime(natsMsg *msg) } natsStatus -natsMsg_create(natsMsg **newMsg, +natsMsg_createWithPadding(natsMsg **newMsg, const char *subject, int subjLen, const char *reply, int replyLen, - const char *buf, int bufLen, int hdrLen) + const char *buf, int bufLen, int bufPaddingSize, int hdrLen) { natsMsg *msg = NULL; char *ptr = NULL; int bufSize = 0; int dataLen = bufLen; bool hasHdrs = (hdrLen > 0 ? true : false); + // Make payload a null-terminated string and add at least one zero byte to the end + int padLen = (bufPaddingSize > 0 ? bufPaddingSize : 1); bufSize = subjLen; bufSize += 1; @@ -763,7 +765,7 @@ natsMsg_create(natsMsg **newMsg, bufSize += 1; } bufSize += bufLen; - bufSize += 1; + bufSize += padLen; if (hasHdrs) bufSize++; @@ -828,7 +830,7 @@ natsMsg_create(natsMsg **newMsg, if (buf != NULL) memcpy(ptr, buf, dataLen); ptr += dataLen; - *(ptr) = '\0'; + memset(ptr, 0, padLen); // This is essentially to match server's view of a message size // when sending messages to pull consumers and keeping track // of size in regards to a max_bytes setting. @@ -843,6 +845,16 @@ natsMsg_create(natsMsg **newMsg, return NATS_OK; } +natsStatus +natsMsg_create(natsMsg **newMsg, + const char *subject, int subjLen, + const char *reply, int replyLen, + const char *buf, int bufLen, int hdrLen) +{ + return natsMsg_createWithPadding(newMsg, subject, subjLen, reply, replyLen, + buf, bufLen, 0, hdrLen); +} + // Used internally to initialize a message structure, generally defined on the stack, // that will then be passed as a reference to publish functions. void diff --git a/src/msg.h b/src/msg.h index e01da05fa..2cbc9433c 100644 --- a/src/msg.h +++ b/src/msg.h @@ -105,6 +105,13 @@ natsMsg_create(natsMsg **newMsg, const char *reply, int replyLen, const char *buf, int bufLen, int hdrLen); +natsStatus +natsMsg_createWithPadding(natsMsg **newMsg, + const char *subject, int subjLen, + const char *reply, int replyLen, + const char *buf, int bufLen, int bufPaddingSize, + int hdrLen); + void natsMsg_freeHeaders(natsMsg *msg); diff --git a/src/nats.h b/src/nats.h index 79bceaaa9..5db9b4521 100644 --- a/src/nats.h +++ b/src/nats.h @@ -3116,6 +3116,22 @@ natsOptions_DisableNoResponders(natsOptions *opts, bool disabled); NATS_EXTERN natsStatus natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix); +/** \brief Sets a custom padding when allocating buffer for incoming messages + * + * By default library allocates natsMsg with payload buffer size + * equal to payload size. Sometimes it can be useful to add some + * padding to the end of the buffer which can be tweaked using + * this option. + * + * To clear the custom message buffer padding, call this function with 0. + * Changing this option has no effect on existing NATS connections. + * + * @param opts the pointer to the #natsOptions object. + * @param paddingSize the desired inbox prefix. + */ +NATS_EXTERN natsStatus +natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize); + /** \brief Destroys a #natsOptions object. * * Destroys the natsOptions object, freeing used memory. See the note in diff --git a/src/natsp.h b/src/natsp.h index 76a4b5d07..b524a6624 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -313,6 +313,9 @@ struct __natsOptions // Custom inbox prefix char *inboxPfx; + + // Custom message payload padding size + int payloadPaddingSize; }; typedef struct __nats_MsgList diff --git a/src/opts.c b/src/opts.c index f63872143..af2d66d93 100644 --- a/src/opts.c +++ b/src/opts.c @@ -1440,6 +1440,18 @@ natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize) +{ + LOCK_AND_CHECK_OPTIONS(opts, (paddingSize < 0)); + + opts->payloadPaddingSize = paddingSize; + + UNLOCK_OPTS(opts); + + return NATS_OK; +} + static void _freeOptions(natsOptions *opts) { diff --git a/test/list.txt b/test/list.txt index 99cca19f8..0429b024c 100644 --- a/test/list.txt +++ b/test/list.txt @@ -122,6 +122,7 @@ OldRequest SimultaneousRequests RequestClose CustomInbox +MessagePadding FlushInCb ReleaseFlush FlushErrOnDisconnect diff --git a/test/test.c b/test/test.c index 465e76692..b5aab9606 100644 --- a/test/test.c +++ b/test/test.c @@ -12559,6 +12559,63 @@ test_CustomInbox(void) _stopServer(serverPid); } +static void +test_MessageBufferPadding(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsPid serverPid = NATS_INVALID_PID; + const char *string = "Hello World"; + const char *servers[] = { "nats://127.0.0.1:4222" }; + int serversCount = 1; + int paddingSize = 32; + bool paddingIsZeros = true; + + serverPid = _startServer(servers[0], NULL, true); + CHECK_SERVER_STARTED(serverPid); + + test("Create options: "); + s = natsOptions_Create(&opts); + testCond(s == NATS_OK); + + test("Setting message buffer padding: "); + s = natsOptions_SetMessageBufferPadding(opts, paddingSize); + testCond(s == NATS_OK); + + test("Setting servers: "); + s = natsOptions_SetServers(opts, servers, serversCount); + testCond(s == NATS_OK); + + test("Test generating message for subscriber: ") + s = natsConnection_Connect(&nc, opts); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo")); + IFOK(s, natsConnection_PublishString(nc, "foo", string)); + IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000)); + testCond((s == NATS_OK) + && (msg != NULL) + && (strncmp(string, natsMsg_GetData(msg), natsMsg_GetDataLength(msg)) == 0)); + + test("Test access to memory in message buffer beyond data length: "); + // This test can pass even if padding doesn't work as excepted. + // But valgrind will show access to unallocated memory + for (int i=natsMsg_GetDataLength(msg); i