Skip to content

Commit

Permalink
Fix race with store when fetching messages by a consumer (#6003)
Browse files Browse the repository at this point in the history
We were releasing and acquiring again lock while fetching messages to
reduce contention. This could cause a consumer getting stuck in certain
scenarios, like when `max messages per subject` was set to `1` and
message was republished while consumer was trying to `NAK` the first
one.

- [ ] add a proper test

Signed-off-by: Tomasz Pietrek <[email protected]>

---------

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema authored and neilalexander committed Oct 15, 2024
1 parent 61b67e3 commit 179805f
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
4 changes: 0 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3621,9 +3621,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
var pmsg = getJSPubMsgFromPool()

// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
filters, subjf, fseq := o.filters, o.subjf, o.sseq
o.mu.Unlock()
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
Expand All @@ -3638,8 +3636,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
pmsg.returnToPool()
pmsg = nil
}
// Re-acquire lock.
o.mu.Lock()
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
Expand Down
114 changes: 114 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"slices"
Expand Down Expand Up @@ -1224,6 +1225,119 @@ func TestJetStreamConsumerLongSubjectHang(t *testing.T) {
}
}

func TestJetStreamConsumerStuckAckPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

type ActiveWorkItem struct {
ID int
Expiry time.Time
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST_ACTIVE_WORK_ITEMS",
Discard: nats.DiscardOld,
MaxMsgsPerSubject: 1,
Subjects: []string{"TEST_ACTIVE_WORK_ITEMS.>"},
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST_ACTIVE_WORK_ITEMS", &nats.ConsumerConfig{
Durable: "testactiveworkitemsconsumer",
AckPolicy: nats.AckExplicitPolicy,
MaxAckPending: -1,
MaxWaiting: 20000,
AckWait: 15 * time.Second,
})
require_NoError(t, err)

sub, err := js.PullSubscribe("TEST_ACTIVE_WORK_ITEMS.>", "testactiveworkitemsconsumer", nats.BindStream("TEST_ACTIVE_WORK_ITEMS"))
require_NoError(t, err)

errs := make(chan error)
go func() {
for {
msgs, err := sub.Fetch(200)
if err != nil {
// test is done. stop the loop.
if errors.Is(err, nats.ErrSubscriptionClosed) || errors.Is(err, nats.ErrConnectionClosed) {
return
}
if !errors.Is(err, nats.ErrTimeout) {
errs <- err
return
}
continue
}
for _, msg := range msgs {
msg := msg
var workItem ActiveWorkItem
if err := json.Unmarshal(msg.Data, &workItem); err != nil {
errs <- err
return
}

now := time.Now()
// If the work item has not expired, nak it with the respective delay.
if workItem.Expiry.After(now) {
msg.NakWithDelay(workItem.Expiry.Sub(now))
} else {
msg.Ack()
}
}
}
}()

for i := 0; i < 25_000; i++ {
// Publish item to TEST_ACTIVE_WORK_ITEMS stream with an expiry time.
workItem := ActiveWorkItem{ID: i, Expiry: time.Now().Add(30 * time.Second)}
data, err := json.Marshal(workItem)
require_NoError(t, err)

_, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data)
require_NoError(t, err)

// Update expiry time and republish item to TEST_ACTIVE_WORK_ITEMS stream.
workItem.Expiry = time.Now().Add(3 * time.Second)
data, err = json.Marshal(workItem)
require_NoError(t, err)
_, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data)
require_NoError(t, err)
}
noChange := false
lastNumAckPending := 0
checkFor(t, 60*time.Second, 3*time.Second, func() error {
select {
case err := <-errs:
t.Fatalf("consumer goroutine failed: %v", err)
default:
}
ci, err := js.ConsumerInfo("TEST_ACTIVE_WORK_ITEMS", "testactiveworkitemsconsumer")
require_NoError(t, err)

if lastNumAckPending != 0 && lastNumAckPending == ci.NumAckPending {
noChange = true
}
lastNumAckPending = ci.NumAckPending

// If we have no change since last check, we can fail the test before `totalWait` timeout.
if ci.NumAckPending > 0 && ci.NumPending == 0 {
if noChange {
_, err := sub.Fetch(1)
if err != nil && errors.Is(err, nats.ErrTimeout) {

t.Fatalf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending)
}
}
return fmt.Errorf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending)
}
return nil
})
}

func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
subject := "foo.bar.do.not.match.any.filter.subject"
for n := 1; n <= 1024; n *= 2 {
Expand Down

0 comments on commit 179805f

Please sign in to comment.