Skip to content

Commit

Permalink
Fix timers code
Browse files Browse the repository at this point in the history
* Move most of timers code in nats.c since we need the protection of the lib's timers' lock and the lock of the timer itself to avoid some race conditions.
* Added more tests for timers (that would fail without this change)
* Changed hashing performance test for VALGRIND runs
  • Loading branch information
kozlovic committed Oct 26, 2015
1 parent b03d44b commit eef8fa0
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 163 deletions.
2 changes: 1 addition & 1 deletion src/asynccb.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ _createAndPostCb(natsAsyncCbType type, natsConnection *nc, natsSubscription *sub

natsConn_retain(nc);

s = nats_PostAsyncCbInfo(cb);
s = nats_postAsyncCbInfo(cb);
if (s != NATS_OK)
{
_freeAsyncCbInfo(cb);
Expand Down
163 changes: 133 additions & 30 deletions src/nats.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ typedef struct __natsLibTimers
natsCondition *cond;
natsThread *thread;
natsTimer *timers;
int count;
bool changed;
bool shutdown;

Expand Down Expand Up @@ -167,14 +168,65 @@ _insertTimer(natsTimer *t)
gLib.timers.timers = t;
}

// Locks must be held before entering this function
static void
_removeTimer(natsLibTimers *timers, natsTimer *t)
{
// Switch flag
t->stopped = true;

// It the timer was in the callback, it has already been removed from the
// list, so skip that.
if (!(t->inCallback))
{
if (t->prev != NULL)
t->prev->next = t->next;
if (t->next != NULL)
t->next->prev = t->prev;

if (t == gLib.timers.timers)
gLib.timers.timers = t->next;

t->prev = NULL;
t->next = NULL;
}

// Decrease the global count of timers
timers->count--;
}

void
nats_AddTimer(natsTimer *t)
nats_resetTimer(natsTimer *t, int64_t newInterval)
{
natsLibTimers *timers = &(gLib.timers);

natsMutex_Lock(timers->lock);
natsMutex_Lock(t->mu);

_insertTimer(t);
// If timer is active, we need first to remove it. This call does the
// right thing if the timer is in the callback.
if (!(t->stopped))
_removeTimer(timers, t);

// Bump the timer's global count (it as decreased in the _removeTimers call
timers->count++;

// Switch stopped flag
t->stopped = false;

// Set the new interval (may be same than it was before, but that's ok)
t->interval = newInterval;

// If the timer is in the callback, the insertion and setting of the
// absolute time will be done by the timer thread when returning from
// the timer's callback.
if (!(t->inCallback))
{
t->absoluteTime = nats_Now() + t->interval;
_insertTimer(t);
}

natsMutex_Unlock(t->mu);

if (!(timers->changed))
natsCondition_Signal(timers->cond);
Expand All @@ -185,38 +237,82 @@ nats_AddTimer(natsTimer *t)
}

void
nats_RemoveTimer(natsTimer *t)
nats_stopTimer(natsTimer *t)
{
natsLibTimers *timers = &(gLib.timers);
natsLibTimers *timers = &(gLib.timers);
bool doCb = false;

natsMutex_Lock(timers->lock);
natsMutex_Lock(t->mu);

if (t->prev != NULL)
t->prev->next = t->next;
if (t->next != NULL)
t->next->prev = t->prev;
// If the timer was already stopped, nothing to do.
if (t->stopped)
{
natsMutex_Unlock(t->mu);
natsMutex_Unlock(timers->lock);

return;
}

if (t == gLib.timers.timers)
gLib.timers.timers = t->next;
_removeTimer(timers, t);

t->prev = NULL;
t->next = NULL;
doCb = (!(t->inCallback) && (t->stopCb != NULL));

natsMutex_Unlock(t->mu);

if (!(timers->changed))
natsCondition_Signal(timers->cond);

timers->changed = true;

natsMutex_Unlock(timers->lock);

if (doCb)
(*(t->stopCb))(t, t->closure);
}

int
nats_getTimersCount(void)
{
int count = 0;

natsMutex_Lock(gLib.timers.lock);

count = gLib.timers.count;

natsMutex_Unlock(gLib.timers.lock);

return count;
}

int
nats_getTimersCountInList(void)
{
int count = 0;
natsTimer *t;

natsMutex_Lock(gLib.timers.lock);

t = gLib.timers.timers;
while (t != NULL)
{
count++;
t = t->next;
}

natsMutex_Unlock(gLib.timers.lock);

return count;
}


static void
_timerThread(void *arg)
{
natsLibTimers *timers = &(gLib.timers);
natsTimer *t = NULL;
natsStatus s = NATS_OK;
bool doCb;
bool doStopCb;
int64_t target;

natsMutex_Lock(gLib.lock);
Expand Down Expand Up @@ -271,42 +367,47 @@ _timerThread(void *arg)
t->prev = NULL;
t->next = NULL;

doCb = (t->stopped ? false : true);
if (doCb)
t->inCallback = true;
t->inCallback = true;

// Retain the timer, since we are going to release the locks for the
// callback. The user may "destroy" the timer from there, so we need
// to be protected with reference counting.
t->refs++;

natsMutex_Unlock(t->mu);
natsMutex_Unlock(timers->lock);

if (doCb)
(*(t->cb))(t, t->closure);
(*(t->cb))(t, t->closure);

natsMutex_Lock(timers->lock);
natsMutex_Lock(t->mu);

t->inCallback = false;

// Timer may have been stopped from within the callback,
// or any time while the timer's lock was not held.
doCb = (t->stopped);
// Timer may have been stopped from within the callback, or during
// the window the locks were released.
doStopCb = (t->stopped && (t->stopCb != NULL));

if (!doCb)
// If not stopped, we need to put it back in our list
if (!doStopCb)
{
// Reset our view of what is the time this timer should
// fire.
// Reset our view of what is the time this timer should fire
// because:
// 1- the callback may have taken longer than it should
// 2- the user may have called Reset() with a new interval
t->absoluteTime = nats_Now() + t->interval;
_insertTimer(t);
}

natsMutex_Unlock(t->mu);
natsMutex_Unlock(timers->lock);

if (doCb)
{
if (doStopCb)
(*(t->stopCb))(t, t->closure);

natsTimer_Release(t);
}
// Compensate for the retain that we made before invoking the timer's
// callback
natsTimer_Release(t);

natsMutex_Lock(timers->lock);
}
Expand All @@ -318,11 +419,13 @@ _timerThread(void *arg)
natsMutex_Lock(t->mu);
t->stopped = true;
t->inCallback = false;
doStopCb = (t->stopCb != NULL);
natsMutex_Unlock(t->mu);

natsMutex_Unlock(timers->lock);

(*(t->stopCb))(t, t->closure);
if (doStopCb)
(*(t->stopCb))(t, t->closure);

natsTimer_Release(t);

Expand Down Expand Up @@ -408,7 +511,7 @@ _asyncCbsThread(void *arg)
}

natsStatus
nats_PostAsyncCbInfo(natsAsyncCbInfo *info)
nats_postAsyncCbInfo(natsAsyncCbInfo *info)
{
natsMutex_Lock(gLib.asyncCbs.lock);

Expand Down
16 changes: 13 additions & 3 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,23 @@ void
natsLib_Release(void);

void
nats_AddTimer(natsTimer *t);
nats_resetTimer(natsTimer *t, int64_t newInterval);

void
nats_RemoveTimer(natsTimer *t);
nats_stopTimer(natsTimer *t);

// Returns the number of timers that have been created and not stopped.
int
nats_getTimersCount(void);

// Returns the number of timers actually in the list. This should be
// equal to nats_getTimersCount() or nats_getTimersCount() - 1 when a
// timer thread is invoking a timer's callback.
int
nats_getTimersCountInList(void);

natsStatus
nats_PostAsyncCbInfo(natsAsyncCbInfo *info);
nats_postAsyncCbInfo(natsAsyncCbInfo *info);

natsStatus
natsInbox_Create(char **newInbox);
Expand Down
Loading

0 comments on commit eef8fa0

Please sign in to comment.