From c9287bb579761b104e9719f939da093daa1eabf5 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 20 Jul 2014 18:46:09 +0200 Subject: [PATCH 1/2] nsqd: don't decrement RDY count --- nsqd/client_v2.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 6705b53a8..20ebe5502 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -53,12 +53,11 @@ type identifyEvent struct { type clientV2 struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms - ReadyCount int64 - LastReadyCount int64 - InFlightCount int64 - MessageCount uint64 - FinishCount uint64 - RequeueCount uint64 + ReadyCount int64 + InFlightCount int64 + MessageCount uint64 + FinishCount uint64 + RequeueCount uint64 sync.RWMutex @@ -325,15 +324,14 @@ func (c *clientV2) IsReadyForMessages() bool { } readyCount := atomic.LoadInt64(&c.ReadyCount) - lastReadyCount := atomic.LoadInt64(&c.LastReadyCount) inFlightCount := atomic.LoadInt64(&c.InFlightCount) if c.ctx.nsqd.opts.Verbose { - c.ctx.nsqd.logf("[%s] state rdy: %4d lastrdy: %4d inflt: %4d", c, - readyCount, lastReadyCount, inFlightCount) + c.ctx.nsqd.logf("[%s] state rdy: %4d inflt: %4d", + c, readyCount, inFlightCount) } - if inFlightCount >= lastReadyCount || readyCount <= 0 { + if inFlightCount >= readyCount || readyCount <= 0 { return false } @@ -342,7 +340,6 @@ func (c *clientV2) IsReadyForMessages() bool { func (c *clientV2) SetReadyCount(count int64) { atomic.StoreInt64(&c.ReadyCount, count) - atomic.StoreInt64(&c.LastReadyCount, count) c.tryUpdateReadyState() } @@ -368,7 +365,6 @@ func (c *clientV2) Empty() { } func (c *clientV2) SendingMessage() { - atomic.AddInt64(&c.ReadyCount, -1) atomic.AddInt64(&c.InFlightCount, 1) atomic.AddUint64(&c.MessageCount, 1) } From c85eaca656943e2c271730bc5088575757c187e6 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sun, 20 Jul 2014 18:49:02 +0200 Subject: [PATCH 2/2] nsqd: fix timeout test race --- nsqd/protocol_v2_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index 28d1eb785..1e1e3298d 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -1270,6 +1270,9 @@ func TestClientMsgTimeout(t *testing.T) { equal(t, msgOut.ID, msg.ID) equal(t, msgOut.Body, msg.Body) + _, err = nsq.Ready(0).WriteTo(conn) + equal(t, err, nil) + time.Sleep(1100 * time.Millisecond) _, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn)