Skip to content

Commit

Permalink
Add option to support padding when allocating buffer for incoming mes…
Browse files Browse the repository at this point in the history
…sages
  • Loading branch information
dmitrmax authored and dmitrmax committed Jan 19, 2023
1 parent 6d22e57 commit 0ac8146
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2600,10 +2600,10 @@ _createMsg(natsMsg **newMsg, natsConnection *nc, char *buf, int bufLen, int hdrL
replyLen = natsBuf_Len(nc->ps->ma.reply);
}

s = natsMsg_create(newMsg,
s = natsMsg_createWithPadding(newMsg,
(const char*) natsBuf_Data(nc->ps->ma.subject), subjLen,
(const char*) reply, replyLen,
(const char*) buf, bufLen, hdrLen);
(const char*) buf, bufLen, nc->opts->payloadPaddingSize, hdrLen);
return s;
}

Expand Down
20 changes: 16 additions & 4 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -744,16 +744,18 @@ natsMsg_GetTime(natsMsg *msg)
}

natsStatus
natsMsg_create(natsMsg **newMsg,
natsMsg_createWithPadding(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen)
const char *buf, int bufLen, int bufPaddingSize, int hdrLen)
{
natsMsg *msg = NULL;
char *ptr = NULL;
int bufSize = 0;
int dataLen = bufLen;
bool hasHdrs = (hdrLen > 0 ? true : false);
// Make payload a null-terminated string and add at least one zero byte to the end
int padLen = (bufPaddingSize > 0 ? bufPaddingSize : 1);

bufSize = subjLen;
bufSize += 1;
Expand All @@ -763,7 +765,7 @@ natsMsg_create(natsMsg **newMsg,
bufSize += 1;
}
bufSize += bufLen;
bufSize += 1;
bufSize += padLen;
if (hasHdrs)
bufSize++;

Expand Down Expand Up @@ -828,7 +830,7 @@ natsMsg_create(natsMsg **newMsg,
if (buf != NULL)
memcpy(ptr, buf, dataLen);
ptr += dataLen;
*(ptr) = '\0';
memset(ptr, 0, padLen);
// This is essentially to match server's view of a message size
// when sending messages to pull consumers and keeping track
// of size in regards to a max_bytes setting.
Expand All @@ -843,6 +845,16 @@ natsMsg_create(natsMsg **newMsg,
return NATS_OK;
}

natsStatus
natsMsg_create(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen)
{
return natsMsg_createWithPadding(newMsg, subject, subjLen, reply, replyLen,
buf, bufLen, 0, hdrLen);
}

// Used internally to initialize a message structure, generally defined on the stack,
// that will then be passed as a reference to publish functions.
void
Expand Down
7 changes: 7 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ natsMsg_create(natsMsg **newMsg,
const char *reply, int replyLen,
const char *buf, int bufLen, int hdrLen);

natsStatus
natsMsg_createWithPadding(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen, int bufPaddingSize,
int hdrLen);

void
natsMsg_freeHeaders(natsMsg *msg);

Expand Down
16 changes: 16 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3116,6 +3116,22 @@ natsOptions_DisableNoResponders(natsOptions *opts, bool disabled);
NATS_EXTERN natsStatus
natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix);

/** \brief Sets a custom padding when allocating buffer for incoming messages
*
* By default library allocates natsMsg with payload buffer size
* equal to payload size. Sometimes it can be useful to add some
* padding to the end of the buffer which can be tweaked using
* this option.
*
* To clear the custom message buffer padding, call this function with 0.
* Changing this option has no effect on existing NATS connections.
*
* @param opts the pointer to the #natsOptions object.
* @param paddingSize the desired inbox prefix.
*/
NATS_EXTERN natsStatus
natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize);

/** \brief Destroys a #natsOptions object.
*
* Destroys the natsOptions object, freeing used memory. See the note in
Expand Down
3 changes: 3 additions & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ struct __natsOptions

// Custom inbox prefix
char *inboxPfx;

// Custom message payload padding size
int payloadPaddingSize;
};

typedef struct __nats_MsgList
Expand Down
12 changes: 12 additions & 0 deletions src/opts.c
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,18 @@ natsOptions_SetCustomInboxPrefix(natsOptions *opts, const char *inboxPrefix)
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
natsOptions_SetMessageBufferPadding(natsOptions *opts, int paddingSize)
{
LOCK_AND_CHECK_OPTIONS(opts, (paddingSize < 0));

opts->payloadPaddingSize = paddingSize;

UNLOCK_OPTS(opts);

return NATS_OK;
}

static void
_freeOptions(natsOptions *opts)
{
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ OldRequest
SimultaneousRequests
RequestClose
CustomInbox
MessagePadding
FlushInCb
ReleaseFlush
FlushErrOnDisconnect
Expand Down
58 changes: 58 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -12559,6 +12559,63 @@ test_CustomInbox(void)
_stopServer(serverPid);
}

static void
test_MessageBufferPadding(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsPid serverPid = NATS_INVALID_PID;
const char *string = "Hello World";
const char *servers[] = { "nats://127.0.0.1:4222" };
int serversCount = 1;
int paddingSize = 32;
bool paddingIsZeros = true;

serverPid = _startServer(servers[0], NULL, true);
CHECK_SERVER_STARTED(serverPid);

test("Create options: ");
s = natsOptions_Create(&opts);
testCond(s == NATS_OK);

test("Setting message buffer padding: ");
s = natsOptions_SetMessageBufferPadding(opts, paddingSize);
testCond(s == NATS_OK);

test("Setting servers: ");
s = natsOptions_SetServers(opts, servers, serversCount);
testCond(s == NATS_OK);

test("Test generating message for subscriber: ")
s = natsConnection_Connect(&nc, opts);
IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo"));
IFOK(s, natsConnection_PublishString(nc, "foo", string));
IFOK(s, natsSubscription_NextMsg(&msg, sub, 1000));
testCond((s == NATS_OK)
&& (msg != NULL)
&& (strncmp(string, natsMsg_GetData(msg), natsMsg_GetDataLength(msg)) == 0));

test("Test access to memory in message buffer beyond data length: ");
// This test can pass even if padding doesn't work as excepted.
// But valgrind will show access to unallocated memory
for (int i=natsMsg_GetDataLength(msg); i<natsMsg_GetDataLength(msg)+paddingSize; i++) {
if (natsMsg_GetData(msg)[i])
paddingIsZeros = false;
}

testCond(paddingIsZeros);

natsMsg_Destroy(msg);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
natsOptions_Destroy(opts);

_stopServer(serverPid);
}

static void
test_FlushInCb(void)
{
Expand Down Expand Up @@ -34224,6 +34281,7 @@ static testInfo allTests[] =
{"SimultaneousRequests", test_SimultaneousRequest},
{"RequestClose", test_RequestClose},
{"CustomInbox", test_CustomInbox},
{"MessagePadding", test_MessageBufferPadding},
{"FlushInCb", test_FlushInCb},
{"ReleaseFlush", test_ReleaseFlush},
{"FlushErrOnDisconnect", test_FlushErrOnDisconnect},
Expand Down

0 comments on commit 0ac8146

Please sign in to comment.