diff --git a/server/consumer.go b/server/consumer.go index 8ede0b92225..eb9d13ed71b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3736,9 +3736,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { var pmsg = getJSPubMsgFromPool() // Grab next message applicable to us. - // We will unlock here in case lots of contention, e.g. WQ. filters, subjf, fseq := o.filters, o.subjf, o.sseq - o.mu.Unlock() // Check if we are multi-filtered or not. if filters != nil { sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg) @@ -3753,8 +3751,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { pmsg.returnToPool() pmsg = nil } - // Re-acquire lock. - o.mu.Lock() // Check if we should move our o.sseq. if sseq >= o.sseq { // If we are moving step by step then sseq == o.sseq. diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 611d5738d11..28b6dc3d8c0 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" + "errors" "fmt" "math/rand" "slices" @@ -1402,6 +1403,120 @@ func TestJetStreamConsumerPedanticMode(t *testing.T) { } } } + +func TestJetStreamConsumerStuckAckPending(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + type ActiveWorkItem struct { + ID int + Expiry time.Time + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST_ACTIVE_WORK_ITEMS", + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Subjects: []string{"TEST_ACTIVE_WORK_ITEMS.>"}, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST_ACTIVE_WORK_ITEMS", &nats.ConsumerConfig{ + Durable: "testactiveworkitemsconsumer", + AckPolicy: nats.AckExplicitPolicy, + MaxAckPending: -1, + MaxWaiting: 20000, + AckWait: 15 * time.Second, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("TEST_ACTIVE_WORK_ITEMS.>", "testactiveworkitemsconsumer", nats.BindStream("TEST_ACTIVE_WORK_ITEMS")) + require_NoError(t, err) + + errs := make(chan error) + go func() { + for { + msgs, err := sub.Fetch(200) + if err != nil { + // test is done. stop the loop. + if errors.Is(err, nats.ErrSubscriptionClosed) || errors.Is(err, nats.ErrConnectionClosed) { + return + } + if !errors.Is(err, nats.ErrTimeout) { + errs <- err + return + } + continue + } + for _, msg := range msgs { + msg := msg + var workItem ActiveWorkItem + if err := json.Unmarshal(msg.Data, &workItem); err != nil { + errs <- err + return + } + + now := time.Now() + // If the work item has not expired, nak it with the respective delay. + if workItem.Expiry.After(now) { + msg.NakWithDelay(workItem.Expiry.Sub(now)) + } else { + msg.Ack() + } + } + } + }() + + for i := 0; i < 25_000; i++ { + // Publish item to TEST_ACTIVE_WORK_ITEMS stream with an expiry time. + workItem := ActiveWorkItem{ID: i, Expiry: time.Now().Add(30 * time.Second)} + data, err := json.Marshal(workItem) + require_NoError(t, err) + + _, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data) + require_NoError(t, err) + + // Update expiry time and republish item to TEST_ACTIVE_WORK_ITEMS stream. + workItem.Expiry = time.Now().Add(3 * time.Second) + data, err = json.Marshal(workItem) + require_NoError(t, err) + _, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data) + require_NoError(t, err) + } + noChange := false + lastNumAckPending := 0 + checkFor(t, 60*time.Second, 3*time.Second, func() error { + select { + case err := <-errs: + t.Fatalf("consumer goroutine failed: %v", err) + default: + } + ci, err := js.ConsumerInfo("TEST_ACTIVE_WORK_ITEMS", "testactiveworkitemsconsumer") + require_NoError(t, err) + + if lastNumAckPending != 0 && lastNumAckPending == ci.NumAckPending { + noChange = true + } + lastNumAckPending = ci.NumAckPending + + // If we have no change since last check, we can fail the test before `totalWait` timeout. + if ci.NumAckPending > 0 && ci.NumPending == 0 { + if noChange { + _, err := sub.Fetch(1) + if err != nil && errors.Is(err, nats.ErrTimeout) { + + t.Fatalf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending) + } + } + return fmt.Errorf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending) + } + return nil + }) +} + func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) { subject := "foo.bar.do.not.match.any.filter.subject" for n := 1; n <= 1024; n *= 2 {