Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Max delivered stuck consumers and mis-reported redelivered stats. #5995

Merged
merged 1 commit into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
)

const (
// reasons to supply when terminating messages using limits
ackTermLimitsReason = "Message deleted by stream limits"
ackTermUnackedLimitsReason = "Unacknowledged message was deleted"
Expand Down Expand Up @@ -1344,6 +1346,9 @@ func (o *consumer) setLeader(isLeader bool) {
pullMode := o.isPullMode()
o.mu.Unlock()

// Check if there are any pending we might need to clean up etc.
o.checkPending()

// Snapshot initial info.
o.infoWithSnap(true)

Expand Down Expand Up @@ -1774,12 +1779,39 @@ func (o *consumer) config() ConsumerConfig {
return o.cfg
}

// Check if we have hit max deliveries. If so do notification and cleanup.
// Return whether or not the max was hit.
// Lock should be held.
func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxdc == 0 {
return false
}
if dc := o.deliveryCount(seq); dc >= o.maxdc {
// We have hit our max deliveries for this sequence.
// Only send the advisory once.
if dc == o.maxdc {
o.notifyDeliveryExceeded(seq, dc)
}
// Determine if we signal to start flow of messages again.
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
}
return true
}
return false
}

// Force expiration of all pending.
// Lock should be held.
func (o *consumer) forceExpirePending() {
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
}
Expand Down Expand Up @@ -3531,6 +3563,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin
return acc, interest
}

// Return current delivery count for a given sequence.
func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
}

// Increase the delivery count for this message.
// ONLY used on redelivery semantics.
// Lock should be held.
Expand Down Expand Up @@ -4754,7 +4794,10 @@ func (o *consumer) checkPending() {
}
}
if elapsed >= deadline {
if !o.onRedeliverQueue(seq) {
// We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which
// worked well for push consumers, but with pull based consumers would require a new pull request to be
// present to process and redelivered could be reported incorrectly.
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
} else if deadline-elapsed < next {
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6442,6 +6442,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
require_NotEqual(t, ml, c.leader())
}

func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

js.Publish("foo.bar", []byte("HELLO"))

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)

msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Wait for redelivery to both consumers which will do nothing.
time.Sleep(250 * time.Millisecond)

// Now check that stream and consumer infos are correct.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
// Messages that are skipped due to max deliveries should NOT remove messages.
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.Consumers, 2)

for _, cname := range []string{"c1", "c2"} {
ci, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumPending, 0)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
34 changes: 22 additions & 12 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5422,17 +5422,19 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
}

// File based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("file"),
sub, err := js.PullSubscribe("foo", "file",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
)
require_NoError(t, err)

// Let first batch retry and expire.
msgs, err := sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch expire.
time.Sleep(1200 * time.Millisecond)

cia, err := js.ConsumerInfo("TEST", "file")
Expand All @@ -5450,6 +5452,12 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
// Also last activity for delivered can be slightly off so nil out as well.
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
t.Helper()
require_Equal(t, a.Delivered.Consumer, 10)
require_Equal(t, a.Delivered.Stream, 10)
require_Equal(t, a.AckFloor.Consumer, 10)
require_Equal(t, a.AckFloor.Stream, 10)
require_Equal(t, a.NumPending, 40)
require_Equal(t, a.NumRedelivered, 0)
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
Expand All @@ -5460,9 +5468,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
checkConsumerInfo(cia, cib)

// Memory based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("mem"),
sub, err = js.PullSubscribe("foo", "mem",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
Expand All @@ -5471,6 +5477,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
)
require_NoError(t, err)

msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

Expand All @@ -5488,9 +5498,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
checkConsumerInfo(cia, cib)

// Now file based but R1 and server restart.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("r1"),
sub, err = js.PullSubscribe("foo", "r1",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
Expand All @@ -5499,6 +5507,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
)
require_NoError(t, err)

msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

Expand All @@ -5517,8 +5529,6 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
// Created can skew a small bit due to server restart, this is expected.
now := time.Now()
cia.Created, cib.Created = now, now
// Clear any disagreement on push bound.
cia.PushBound, cib.PushBound = false, false
checkConsumerInfo(cia, cib)
}

Expand Down