From bd3be750413476e24ddfa7c5f7b6bf53489999bc Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 12 Oct 2024 11:01:01 -0700 Subject: [PATCH] Process max delivery boundaries when expiring vs always putting back on the redelivered queue. Previously we would always re-queue and do this on getNextMsg() which worked well for push consumers, but with pull based consumers would require a new pull request to be present to process any redelivered and this could report redelivered status incorrectly on max deliver of 1. Signed-off-by: Derek Collison --- server/consumer.go | 47 ++++++++++++++++++++++-- server/jetstream_cluster_1_test.go | 58 ++++++++++++++++++++++++++++++ server/jetstream_cluster_3_test.go | 34 +++++++++++------- 3 files changed, 125 insertions(+), 14 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3b633270e75..8ede0b92225 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -285,7 +285,9 @@ var ( AckNext = []byte("+NXT") // Terminate delivery of the message. AckTerm = []byte("+TERM") +) +const ( // reasons to supply when terminating messages using limits ackTermLimitsReason = "Message deleted by stream limits" ackTermUnackedLimitsReason = "Unacknowledged message was deleted" @@ -1344,6 +1346,9 @@ func (o *consumer) setLeader(isLeader bool) { pullMode := o.isPullMode() o.mu.Unlock() + // Check if there are any pending we might need to clean up etc. + o.checkPending() + // Snapshot initial info. o.infoWithSnap(true) @@ -1774,12 +1779,39 @@ func (o *consumer) config() ConsumerConfig { return o.cfg } +// Check if we have hit max deliveries. If so do notification and cleanup. +// Return whether or not the max was hit. +// Lock should be held. +func (o *consumer) hasMaxDeliveries(seq uint64) bool { + if o.maxdc == 0 { + return false + } + if dc := o.deliveryCount(seq); dc >= o.maxdc { + // We have hit our max deliveries for this sequence. + // Only send the advisory once. + if dc == o.maxdc { + o.notifyDeliveryExceeded(seq, dc) + } + // Determine if we signal to start flow of messages again. + if o.maxp > 0 && len(o.pending) >= o.maxp { + o.signalNewMessages() + } + // Cleanup our tracking. + delete(o.pending, seq) + if o.rdc != nil { + delete(o.rdc, seq) + } + return true + } + return false +} + // Force expiration of all pending. // Lock should be held. func (o *consumer) forceExpirePending() { var expired []uint64 for seq := range o.pending { - if !o.onRedeliverQueue(seq) { + if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) { expired = append(expired, seq) } } @@ -3531,6 +3563,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin return acc, interest } +// Return current delivery count for a given sequence. +func (o *consumer) deliveryCount(seq uint64) uint64 { + if o.rdc == nil { + return 1 + } + return o.rdc[seq] +} + // Increase the delivery count for this message. // ONLY used on redelivery semantics. // Lock should be held. @@ -4754,7 +4794,10 @@ func (o *consumer) checkPending() { } } if elapsed >= deadline { - if !o.onRedeliverQueue(seq) { + // We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which + // worked well for push consumers, but with pull based consumers would require a new pull request to be + // present to process and redelivered could be reported incorrectly. + if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) { expired = append(expired, seq) } } else if deadline-elapsed < next { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index d93badc7827..971a76fb07e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6442,6 +6442,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) { require_NotEqual(t, ml, c.leader()) } +func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1)) + require_NoError(t, err) + + sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1)) + require_NoError(t, err) + + js.Publish("foo.bar", []byte("HELLO")) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 1) + + msgs, err := sub1.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + msgs, err = sub2.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + // Wait for redelivery to both consumers which will do nothing. + time.Sleep(250 * time.Millisecond) + + // Now check that stream and consumer infos are correct. + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + // Messages that are skipped due to max deliveries should NOT remove messages. + require_Equal(t, si.State.Msgs, 1) + require_Equal(t, si.State.Consumers, 2) + + for _, cname := range []string{"c1", "c2"} { + ci, err := js.ConsumerInfo("TEST", cname) + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) + require_Equal(t, ci.NumAckPending, 0) + require_Equal(t, ci.NumRedelivered, 0) + require_Equal(t, ci.NumPending, 0) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 266125ad3da..68b411bbcf1 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5422,9 +5422,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { } // File based. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("file"), + sub, err := js.PullSubscribe("foo", "file", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5432,7 +5430,11 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) - // Let first batch retry and expire. + msgs, err := sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + // Let first batch expire. time.Sleep(1200 * time.Millisecond) cia, err := js.ConsumerInfo("TEST", "file") @@ -5450,6 +5452,12 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Also last activity for delivered can be slightly off so nil out as well. checkConsumerInfo := func(a, b *nats.ConsumerInfo) { t.Helper() + require_Equal(t, a.Delivered.Consumer, 10) + require_Equal(t, a.Delivered.Stream, 10) + require_Equal(t, a.AckFloor.Consumer, 10) + require_Equal(t, a.AckFloor.Stream, 10) + require_Equal(t, a.NumPending, 40) + require_Equal(t, a.NumRedelivered, 0) a.Cluster, b.Cluster = nil, nil a.Delivered.Last, b.Delivered.Last = nil, nil if !reflect.DeepEqual(a, b) { @@ -5460,9 +5468,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { checkConsumerInfo(cia, cib) // Memory based. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("mem"), + sub, err = js.PullSubscribe("foo", "mem", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5471,6 +5477,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + // Let first batch retry and expire. time.Sleep(1200 * time.Millisecond) @@ -5488,9 +5498,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { checkConsumerInfo(cia, cib) // Now file based but R1 and server restart. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("r1"), + sub, err = js.PullSubscribe("foo", "r1", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5499,6 +5507,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + // Let first batch retry and expire. time.Sleep(1200 * time.Millisecond) @@ -5517,8 +5529,6 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Created can skew a small bit due to server restart, this is expected. now := time.Now() cia.Created, cib.Created = now, now - // Clear any disagreement on push bound. - cia.PushBound, cib.PushBound = false, false checkConsumerInfo(cia, cib) }