Skip to content

Commit

Permalink
Merge pull request #404 from mreiferson/dont_decr_rdy_404
Browse files Browse the repository at this point in the history
nsqd: don't decrement RDY
  • Loading branch information
jehiah committed Sep 15, 2014
2 parents 23a4b88 + c85eaca commit 2d44fc3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
20 changes: 8 additions & 12 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ type identifyEvent struct {

type clientV2 struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
ReadyCount int64
LastReadyCount int64
InFlightCount int64
MessageCount uint64
FinishCount uint64
RequeueCount uint64
ReadyCount int64
InFlightCount int64
MessageCount uint64
FinishCount uint64
RequeueCount uint64

sync.RWMutex

Expand Down Expand Up @@ -325,15 +324,14 @@ func (c *clientV2) IsReadyForMessages() bool {
}

readyCount := atomic.LoadInt64(&c.ReadyCount)
lastReadyCount := atomic.LoadInt64(&c.LastReadyCount)
inFlightCount := atomic.LoadInt64(&c.InFlightCount)

if c.ctx.nsqd.opts.Verbose {
c.ctx.nsqd.logf("[%s] state rdy: %4d lastrdy: %4d inflt: %4d", c,
readyCount, lastReadyCount, inFlightCount)
c.ctx.nsqd.logf("[%s] state rdy: %4d inflt: %4d",
c, readyCount, inFlightCount)
}

if inFlightCount >= lastReadyCount || readyCount <= 0 {
if inFlightCount >= readyCount || readyCount <= 0 {
return false
}

Expand All @@ -342,7 +340,6 @@ func (c *clientV2) IsReadyForMessages() bool {

func (c *clientV2) SetReadyCount(count int64) {
atomic.StoreInt64(&c.ReadyCount, count)
atomic.StoreInt64(&c.LastReadyCount, count)
c.tryUpdateReadyState()
}

Expand All @@ -368,7 +365,6 @@ func (c *clientV2) Empty() {
}

func (c *clientV2) SendingMessage() {
atomic.AddInt64(&c.ReadyCount, -1)
atomic.AddInt64(&c.InFlightCount, 1)
atomic.AddUint64(&c.MessageCount, 1)
}
Expand Down
3 changes: 3 additions & 0 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,9 @@ func TestClientMsgTimeout(t *testing.T) {
equal(t, msgOut.ID, msg.ID)
equal(t, msgOut.Body, msg.Body)

_, err = nsq.Ready(0).WriteTo(conn)
equal(t, err, nil)

time.Sleep(1100 * time.Millisecond)

_, err = nsq.Finish(nsq.MessageID(msgOut.ID)).WriteTo(conn)
Expand Down

0 comments on commit 2d44fc3

Please sign in to comment.