Skip to content

Commit

Permalink
Merge pull request #2 from nats-io/pooling-and-gc
Browse files Browse the repository at this point in the history
Performance improvements
  • Loading branch information
kozlovic committed Oct 28, 2015
2 parents 2e4f2a2 + 3824045 commit ff01917
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 102 deletions.
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

- [ ] Plug with async/IO event libraries (such as libevent/libuv)
- [ ] Pooling/garbage collection of structures
- [x] Pooling/garbage collection of structures ~~(may add more later, and pool msgs structures and buffers, but that would introduce locking which may negate the benefits of the pooling)~~
- [x] ~~Revisit connect logic to match GO Client~~ *Will actually do the opposite, that is change the GO Client connect/reconnect logic. The async reconnect logic is flawed and could cause issues when getting authentication errors during the process.*
- [x] Revisit FlushTimeout to match GO Client
- [ ] Revisit Flusher for the aggregation of more data before socket write
Expand Down
56 changes: 17 additions & 39 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -1317,12 +1317,7 @@ _removeAllSubscriptions(natsConnection *nc)
{
(void) natsHashIter_RemoveCurrent(&iter);

natsSub_Lock(sub);

sub->closed = true;
natsCondition_Signal(sub->cond);

natsSub_Unlock(sub);
natsSub_close(sub);

natsSub_release(sub);
}
Expand Down Expand Up @@ -1418,30 +1413,22 @@ static natsStatus
_createMsg(natsMsg **newMsg, natsConnection *nc, char *buf, int bufLen)
{
natsStatus s = NATS_OK;
natsMsg *msg = NULL;
char *subject = NULL;
int subjLen = 0;
char *reply = NULL;
int replyLen = 0;

s = nats_CreateStringFromBuffer(&subject, nc->ps->ma.subject);
if (s == NATS_OK)
s = nats_CreateStringFromBuffer(&reply, nc->ps->ma.reply);
if (s == NATS_OK)
s = natsMsg_create(&msg, buf, bufLen);
subjLen = natsBuf_Len(nc->ps->ma.subject);

if (s == NATS_OK)
if (nc->ps->ma.reply != NULL)
{
msg->subject = subject;
msg->reply = reply;

*newMsg = msg;
}
else
{
NATS_FREE(subject);
NATS_FREE(reply);
natsMsg_Destroy(msg);
reply = natsBuf_Data(nc->ps->ma.reply);
replyLen = natsBuf_Len(nc->ps->ma.reply);
}

s = natsMsg_create(newMsg,
(const char*) natsBuf_Data(nc->ps->ma.subject), subjLen,
(const char*) reply, replyLen,
(const char*) buf, bufLen);
return s;
}

Expand Down Expand Up @@ -1504,10 +1491,10 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)

sub->msgList.count++;

if (!(sub->signaled))
if (sub->signalTimerInterval != 1)
{
sub->signaled = true;
natsCondition_Signal(sub->cond);
sub->signalTimerInterval = 1;
natsTimer_Reset(sub->signalTimer, 1);
}
}

Expand Down Expand Up @@ -1606,15 +1593,7 @@ natsConn_removeSubscription(natsConnection *nc, natsSubscription *removedSub, bo
// Note that the sub may have already been removed, so 'sub == NULL'
// is not an error.
if (sub != NULL)
{
natsSub_Lock(sub);

// Kick out the deliverMsgs thread.
sub->closed = true;
natsCondition_Signal(sub->cond);

natsSub_Unlock(sub);
}
natsSub_close(sub);

if (needsLock)
natsConn_Unlock(nc);
Expand Down Expand Up @@ -1700,8 +1679,7 @@ natsConn_subscribe(natsSubscription **newSub,
// A delivery thread may have been started, but the subscription not
// added to the connection's subscription map. So this is necessary
// for the delivery thread to unroll.
sub->closed = true;
natsCondition_Signal(sub->cond);
natsSub_close(sub);

natsConn_removeSubscription(nc, sub, false);

Expand Down Expand Up @@ -1832,7 +1810,7 @@ natsConn_create(natsConnection **newConn, natsOptions *options)
if (s == NATS_OK)
*newConn = nc;
else
_freeConn(nc);
natsConn_release(nc);

return s;
}
Expand Down
27 changes: 27 additions & 0 deletions src/gc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2015 Apcera Inc. All rights reserved.


#ifndef GC_H_
#define GC_H_

// This callback implements the specific code to free the given object.
// This is invoked by the garbage collector.
typedef void (*nats_FreeObjectCb)(void *object);

// This structure should be included as the first field of any object
// that needs to be garbage collected.
typedef struct __natsGCItem
{
struct __natsGCItem *next;
nats_FreeObjectCb freeCb;

} natsGCItem;

// Gives the object to the garbage collector.
// Returns 'true' if the GC takes ownership, 'false' otherwise (in this case,
// the caller is responsible for freeing the object).
bool
natsGC_collect(natsGCItem *item);


#endif /* GC_H_ */
109 changes: 71 additions & 38 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,36 @@

#include "natsp.h"

// Do this after including natsp.h in order to have some of the
// GNU specific flag set first.
#include <string.h>

#include "mem.h"

void
natsMsg_free(void *object)
{
natsMsg *msg;

if (object == NULL)
return;

msg = (natsMsg*) object;

NATS_FREE(msg->buffer);
NATS_FREE(msg);
}

void
natsMsg_Destroy(natsMsg *msg)
{
if (msg == NULL)
return;

NATS_FREE(msg->subject);
NATS_FREE(msg->reply);
NATS_FREE(msg->data);
NATS_FREE(msg);
if (natsGC_collect((natsGCItem *) msg))
return;

natsMsg_free((void*) msg);
}

const char*
Expand Down Expand Up @@ -55,26 +71,55 @@ natsMsg_GetDataLength(natsMsg *msg)
}

natsStatus
natsMsg_create(natsMsg **newMsg, char *buf, int bufLen)
natsMsg_create(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen)
{
natsMsg *msg = NULL;
natsMsg *msg = NULL;
char *ptr = NULL;
int totalSize = 0;

msg = (natsMsg*) NATS_CALLOC(1, sizeof(natsMsg));
msg = NATS_CALLOC(1, sizeof(natsMsg));
if (msg == NULL)
return NATS_NO_MEMORY;

if (bufLen > 0)
totalSize = subjLen;
totalSize += 1;
totalSize += replyLen;
totalSize += 1;
totalSize += bufLen;
totalSize += 1;

msg->buffer = NATS_MALLOC(totalSize);
if (msg->buffer == NULL)
{
NATS_FREE(msg);
return NATS_NO_MEMORY;
}
ptr = msg->buffer;

msg->subject = (const char*) ptr;
memcpy(ptr, subject, subjLen);
ptr += subjLen;
*(ptr++) = '\0';

msg->reply = (const char*) ptr;
if (replyLen > 0)
{
msg->data = NATS_MALLOC(bufLen);
if (msg->data == NULL)
{
NATS_FREE(msg);
return NATS_NO_MEMORY;
}

memcpy(msg->data, buf, bufLen);
msg->dataLen = bufLen;
memcpy(ptr, reply, replyLen);
ptr += replyLen;
}
*(ptr++) = '\0';

msg->data = (const char*) ptr;
msg->dataLen = bufLen;
memcpy(ptr, buf, bufLen);
ptr += bufLen;
*(ptr) = '\0';

// Setting the callback will trigger garbage collection
msg->gc.freeCb = natsMsg_free;

*newMsg = msg;

Expand All @@ -85,31 +130,19 @@ natsStatus
natsMsg_Create(natsMsg **newMsg, const char *subj, const char *reply,
const char *data, int dataLen)
{
natsStatus s = NATS_OK;
natsMsg *msg = NULL;
natsStatus s = NATS_OK;

if ((newMsg == NULL) || (subj == NULL) || (strlen(subj) == 0))
return NATS_INVALID_ARG;

s = natsMsg_create(&msg, (char*) data, dataLen);
if (s == NATS_OK)
{
msg->subject = NATS_STRDUP(subj);
if (msg->subject == NULL)
s = NATS_NO_MEMORY;
}
if ((s == NATS_OK) && (reply != NULL) && (strlen(reply) > 0))
if ((subj == NULL)
|| (subj[0] == '\0')
|| ((reply != NULL) && (reply[0] == '\0')))
{
msg->reply = NATS_STRDUP(reply);
if (msg->reply == NULL)
s = NATS_NO_MEMORY;
return NATS_INVALID_ARG;
}

if (s == NATS_OK)
*newMsg = msg;
else
natsMsg_Destroy(msg);

s = natsMsg_create(newMsg,
subj, strlen(subj),
reply, (reply == NULL ? 0 : strlen(reply)),
data, dataLen);
return s;
}

24 changes: 20 additions & 4 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@
#define MSG_H_

#include "status.h"
#include "gc.h"

struct __natsMsg;

typedef struct __natsMsg
{
char *subject;
char *reply;
char *data;
natsGCItem gc;

// This holds subject, reply and payload.
char *buffer;

// These points to specific locations in 'buffer'. They must not be freed.
// (note that we could do without 'subject' since as of now, it is
// equivalent to 'buffer').
const char *subject;
const char *reply;
const char *data;
int dataLen;

struct __natsMsg *next;
Expand All @@ -21,7 +30,14 @@ typedef struct __natsMsg
// PRIVATE

natsStatus
natsMsg_create(natsMsg **newMsg, char *buf, int bufLen);
natsMsg_create(natsMsg **newMsg,
const char *subject, int subjLen,
const char *reply, int replyLen,
const char *buf, int bufLen);

// This needs to follow the nats_FreeObjectCb prototype (see gc.h)
void
natsMsg_free(void *object);


// PUBLIC
Expand Down
Loading

0 comments on commit ff01917

Please sign in to comment.