From 0d5b03e7ab96936a96102794fbc5b8b9a85a19ab Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 9 Oct 2024 16:40:30 +0200 Subject: [PATCH 1/8] Fix desync after errCatchupAbortedNoLeader Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 5 +- server/jetstream_cluster_4_test.go | 151 ++++++++++++++++++----------- server/store.go | 2 +- 3 files changed, 98 insertions(+), 60 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5a44e83c21c..b7cc09cd218 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2851,7 +2851,7 @@ func (mset *stream) resetClusteredState(err error) bool { } if node != nil { - if err == errCatchupTooManyRetries { + if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries { // Don't delete all state, could've just been temporarily unable to reach the leader. node.Stop() } else { @@ -8186,6 +8186,7 @@ var ( errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away. errCatchupBadMsg = errors.New("bad catchup msg") errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg") + errCatchupAbortedNoLeader = errors.New("catchup aborted, no leader") errCatchupTooManyRetries = errors.New("catchup failed, too many retries") ) @@ -8289,7 +8290,7 @@ RETRY: releaseSyncOutSem() if n.GroupLeader() == _EMPTY_ { - return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name()) + return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name()) } // If we have a sub clear that here. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index ec483a0c7d2..4d8402c59b1 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3701,75 +3701,112 @@ func TestJetStreamConsumerReplicasAfterScale(t *testing.T) { require_Equal(t, len(ci.Cluster.Replicas), 2) } -func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() +func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) { + tests := []struct { + title string + onErrorCondition func(server *Server, mset *stream) + }{ + { + title: "TooManyRetries", + onErrorCondition: func(server *Server, mset *stream) { + // Too many retries while processing snapshot is considered a cluster reset. + // If a leader is temporarily unavailable we shouldn't blow away our state. + require_True(t, isClusterResetErr(errCatchupTooManyRetries)) + mset.resetClusteredState(errCatchupTooManyRetries) + }, + }, + { + title: "AbortedNoLeader", + onErrorCondition: func(server *Server, mset *stream) { + for _, n := range server.raftNodes { + rn := n.(*raft) + if rn.accName == "$G" { + rn.updateLeader(noLeader) + } + } - nc, js := jsClientConnect(t, c.randomServer()) - defer nc.Close() + // Processing a snapshot while there's no leader elected is considered a cluster reset. + // If a leader is temporarily unavailable we shouldn't blow away our state. + var snap StreamReplicatedState + snap.LastSeq = 1_000 // ensure we can catchup based on the snapshot + err := mset.processSnapshot(&snap) + require_True(t, errors.Is(err, errCatchupAbortedNoLeader)) + require_True(t, isClusterResetErr(err)) + mset.resetClusteredState(err) + }, + }, + } - si, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo"}, - Replicas: 3, - }) - require_NoError(t, err) + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() - streamLeader := si.Cluster.Leader - streamLeaderServer := c.serverByName(streamLeader) - nc.Close() - nc, js = jsClientConnect(t, streamLeaderServer) - defer nc.Close() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() - servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { - return s == streamLeader - }) + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) - // Publish 10 messages. - for i := 0; i < 10; i++ { - pubAck, err := js.Publish("foo", []byte("ok")) - require_NoError(t, err) - require_Equal(t, pubAck.Sequence, uint64(i+1)) - } + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() - outdatedServerName := servers[0] - clusterResetServerName := servers[1] + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader + }) - outdatedServer := c.serverByName(outdatedServerName) - outdatedServer.Shutdown() - outdatedServer.WaitForShutdown() + // Publish 10 messages. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+1)) + } - // Publish 10 more messages, one server will be behind. - for i := 0; i < 10; i++ { - pubAck, err := js.Publish("foo", []byte("ok")) - require_NoError(t, err) - require_Equal(t, pubAck.Sequence, uint64(i+11)) - } + outdatedServerName := servers[0] + clusterResetServerName := servers[1] + + outdatedServer := c.serverByName(outdatedServerName) + outdatedServer.Shutdown() + outdatedServer.WaitForShutdown() - // We will not need the client anymore. - nc.Close() + // Publish 10 more messages, one server will be behind. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+11)) + } - // Shutdown stream leader so one server remains. - streamLeaderServer.Shutdown() - streamLeaderServer.WaitForShutdown() + // We will not need the client anymore. + nc.Close() - clusterResetServer := c.serverByName(clusterResetServerName) - acc, err := clusterResetServer.lookupAccount(globalAccountName) - require_NoError(t, err) - mset, err := acc.lookupStream("TEST") - require_NoError(t, err) + // Shutdown stream leader so one server remains. + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() - // Too many retries while processing snapshot is considered a cluster reset. - // If a leader is temporarily unavailable we shouldn't blow away our state. - require_True(t, isClusterResetErr(errCatchupTooManyRetries)) - mset.resetClusteredState(errCatchupTooManyRetries) + clusterResetServer := c.serverByName(clusterResetServerName) + acc, err := clusterResetServer.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) - // Stream leader stays offline, we only start the server with missing stream data. - // We expect that the reset server must not allow the outdated server to become leader, as that would result in desync. - c.restartServer(outdatedServer) - c.waitOnStreamLeader(globalAccountName, "TEST") + // Run error condition. + test.onErrorCondition(clusterResetServer, mset) + + // Stream leader stays offline, we only start the server with missing stream data. + // We expect that the reset server must not allow the outdated server to become leader, as that would result in desync. + c.restartServer(outdatedServer) + c.waitOnStreamLeader(globalAccountName, "TEST") - // Outdated server must NOT become the leader. - newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") - require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) + // Outdated server must NOT become the leader. + newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") + require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) + }) + } } diff --git a/server/store.go b/server/store.go index abbc06f8998..661959d172b 100644 --- a/server/store.go +++ b/server/store.go @@ -721,7 +721,7 @@ func isOutOfSpaceErr(err error) bool { var errFirstSequenceMismatch = errors.New("first sequence mismatch") func isClusterResetErr(err error) bool { - return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || err == errCatchupTooManyRetries + return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries } // Copy all fields. From 8e86624c945c2027ced536a546b5d2ff2aa76968 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 12 Oct 2024 11:01:01 -0700 Subject: [PATCH 2/8] Process max delivery boundaries when expiring vs always putting back on the redelivered queue. Previously we would always re-queue and do this on getNextMsg() which worked well for push consumers, but with pull based consumers would require a new pull request to be present to process any redelivered and this could report redelivered status incorrectly on max deliver of 1. Signed-off-by: Derek Collison --- server/consumer.go | 47 ++++++++++++++++++++++-- server/jetstream_cluster_1_test.go | 58 ++++++++++++++++++++++++++++++ server/jetstream_cluster_3_test.go | 34 +++++++++++------- 3 files changed, 125 insertions(+), 14 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 35cad6848d1..991b0d9a728 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -279,7 +279,9 @@ var ( AckNext = []byte("+NXT") // Terminate delivery of the message. AckTerm = []byte("+TERM") +) +const ( // reasons to supply when terminating messages using limits ackTermLimitsReason = "Message deleted by stream limits" ackTermUnackedLimitsReason = "Unacknowledged message was deleted" @@ -1290,6 +1292,9 @@ func (o *consumer) setLeader(isLeader bool) { pullMode := o.isPullMode() o.mu.Unlock() + // Check if there are any pending we might need to clean up etc. + o.checkPending() + // Snapshot initial info. o.infoWithSnap(true) @@ -1687,12 +1692,39 @@ func (o *consumer) config() ConsumerConfig { return o.cfg } +// Check if we have hit max deliveries. If so do notification and cleanup. +// Return whether or not the max was hit. +// Lock should be held. +func (o *consumer) hasMaxDeliveries(seq uint64) bool { + if o.maxdc == 0 { + return false + } + if dc := o.deliveryCount(seq); dc >= o.maxdc { + // We have hit our max deliveries for this sequence. + // Only send the advisory once. + if dc == o.maxdc { + o.notifyDeliveryExceeded(seq, dc) + } + // Determine if we signal to start flow of messages again. + if o.maxp > 0 && len(o.pending) >= o.maxp { + o.signalNewMessages() + } + // Cleanup our tracking. + delete(o.pending, seq) + if o.rdc != nil { + delete(o.rdc, seq) + } + return true + } + return false +} + // Force expiration of all pending. // Lock should be held. func (o *consumer) forceExpirePending() { var expired []uint64 for seq := range o.pending { - if !o.onRedeliverQueue(seq) { + if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) { expired = append(expired, seq) } } @@ -3416,6 +3448,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin return acc, interest } +// Return current delivery count for a given sequence. +func (o *consumer) deliveryCount(seq uint64) uint64 { + if o.rdc == nil { + return 1 + } + return o.rdc[seq] +} + // Increase the delivery count for this message. // ONLY used on redelivery semantics. // Lock should be held. @@ -4630,7 +4670,10 @@ func (o *consumer) checkPending() { } } if elapsed >= deadline { - if !o.onRedeliverQueue(seq) { + // We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which + // worked well for push consumers, but with pull based consumers would require a new pull request to be + // present to process and redelivered could be reported incorrectly. + if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) { expired = append(expired, seq) } } else if deadline-elapsed < next { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index f5f345e484f..983d6adf397 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6440,6 +6440,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) { require_NotEqual(t, ml, c.leader()) } +func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1)) + require_NoError(t, err) + + sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1)) + require_NoError(t, err) + + js.Publish("foo.bar", []byte("HELLO")) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 1) + + msgs, err := sub1.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + msgs, err = sub2.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + // Wait for redelivery to both consumers which will do nothing. + time.Sleep(250 * time.Millisecond) + + // Now check that stream and consumer infos are correct. + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + // Messages that are skipped due to max deliveries should NOT remove messages. + require_Equal(t, si.State.Msgs, 1) + require_Equal(t, si.State.Consumers, 2) + + for _, cname := range []string{"c1", "c2"} { + ci, err := js.ConsumerInfo("TEST", cname) + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) + require_Equal(t, ci.NumAckPending, 0) + require_Equal(t, ci.NumRedelivered, 0) + require_Equal(t, ci.NumPending, 0) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index bedcccb0044..25fcab9fc05 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5415,9 +5415,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { } // File based. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("file"), + sub, err := js.PullSubscribe("foo", "file", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5425,7 +5423,11 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) - // Let first batch retry and expire. + msgs, err := sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + + // Let first batch expire. time.Sleep(1200 * time.Millisecond) cia, err := js.ConsumerInfo("TEST", "file") @@ -5443,6 +5445,12 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Also last activity for delivered can be slightly off so nil out as well. checkConsumerInfo := func(a, b *nats.ConsumerInfo) { t.Helper() + require_Equal(t, a.Delivered.Consumer, 10) + require_Equal(t, a.Delivered.Stream, 10) + require_Equal(t, a.AckFloor.Consumer, 10) + require_Equal(t, a.AckFloor.Stream, 10) + require_Equal(t, a.NumPending, 40) + require_Equal(t, a.NumRedelivered, 0) a.Cluster, b.Cluster = nil, nil a.Delivered.Last, b.Delivered.Last = nil, nil if !reflect.DeepEqual(a, b) { @@ -5453,9 +5461,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { checkConsumerInfo(cia, cib) // Memory based. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("mem"), + sub, err = js.PullSubscribe("foo", "mem", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5464,6 +5470,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + // Let first batch retry and expire. time.Sleep(1200 * time.Millisecond) @@ -5481,9 +5491,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { checkConsumerInfo(cia, cib) // Now file based but R1 and server restart. - _, err = js.Subscribe("foo", - func(msg *nats.Msg) {}, - nats.Durable("r1"), + sub, err = js.PullSubscribe("foo", "r1", nats.ManualAck(), nats.MaxDeliver(1), nats.AckWait(time.Second), @@ -5492,6 +5500,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { ) require_NoError(t, err) + msgs, err = sub.Fetch(10) + require_NoError(t, err) + require_Equal(t, len(msgs), 10) + // Let first batch retry and expire. time.Sleep(1200 * time.Millisecond) @@ -5510,8 +5522,6 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { // Created can skew a small bit due to server restart, this is expected. now := time.Now() cia.Created, cib.Created = now, now - // Clear any disagreement on push bound. - cia.PushBound, cib.PushBound = false, false checkConsumerInfo(cia, cib) } From e817deaded5cebba15f66bb92eefa8669d8bc16a Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 14 Oct 2024 08:18:21 -0600 Subject: [PATCH 3/8] Update code coverage GitHub Action [ci skip] Signed-off-by: Ivan Kozlovic --- .github/workflows/cov.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/cov.yaml b/.github/workflows/cov.yaml index 7f49bc81bed..137e88e3411 100644 --- a/.github/workflows/cov.yaml +++ b/.github/workflows/cov.yaml @@ -34,8 +34,8 @@ jobs: - name: Convert coverage.out to coverage.lcov # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit c680c0f7c7442485f1749eb2a13e54a686e76eb5 = tag v1.0.9 - uses: jandelgado/gcov2lcov-action@c680c0f7c7442485f1749eb2a13e54a686e76eb5 + # Commit 69ef3d59a24cc6e062516a73d8be123e85b15cc0 = tag v1.1.0 + uses: jandelgado/gcov2lcov-action@69ef3d59a24cc6e062516a73d8be123e85b15cc0 with: infile: acc.out working-directory: src/github.com/nats-io/nats-server From 790266e78f4f6083917da09a91e316b491274af0 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 14 Oct 2024 19:01:28 +0200 Subject: [PATCH 4/8] Fix nil pointer dereference on Placement (#5996) The check for empty object exposed a lack of check for `*Placement` before accessing its fields. This commit properly fixes it. Signed-off-by: Tomasz Pietrek --------- Signed-off-by: Tomasz Pietrek --- server/jetstream_api.go | 50 +++++++++++++++++++++------------------- server/jetstream_test.go | 31 +++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 24 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 7bcc7d37ebf..27e8f4b626e 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2794,33 +2794,35 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - if len(req.Placement.Tags) > 0 { - // Tags currently not supported. - resp.Error = NewJSClusterTagsError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - cn := req.Placement.Cluster - var peers []string - ourID := cc.meta.ID() - for _, p := range cc.meta.Peers() { - if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil { - if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID { - continue + if req.Placement != nil { + if len(req.Placement.Tags) > 0 { + // Tags currently not supported. + resp.Error = NewJSClusterTagsError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + cn := req.Placement.Cluster + var peers []string + ourID := cc.meta.ID() + for _, p := range cc.meta.Peers() { + if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil { + if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID { + continue + } + peers = append(peers, p.ID) } - peers = append(peers, p.ID) } + if len(peers) == 0 { + resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + // Randomize and select. + if len(peers) > 1 { + rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) + } + preferredLeader = peers[0] } - if len(peers) == 0 { - resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Randomize and select. - if len(peers) > 1 { - rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) - } - preferredLeader = peers[0] } // Call actual stepdown. diff --git a/server/jetstream_test.go b/server/jetstream_test.go index aff9dcd9213..fe95d5009ce 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -6712,6 +6712,37 @@ func TestJetStreamSystemLimitsPlacement(t *testing.T) { t.Fatalf("unexpected memory stream create success, maxBytes=%d, replicas=%d", si.Config.MaxBytes, si.Config.Replicas) } + + t.Run("meta info placement in request - empty request", func(t *testing.T) { + nc, err = nats.Connect(largeSrv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + var resp JSApiLeaderStepDownResponse + ncResp, err := nc.Request(JSApiLeaderStepDown, []byte("{}"), 3*time.Second) + require_NoError(t, err) + err = json.Unmarshal(ncResp.Data, &resp) + require_NoError(t, err) + require_True(t, resp.Error == nil) + require_True(t, resp.Success) + + }) + + t.Run("meta info placement in request - uninitialized fields", func(t *testing.T) { + nc, err = nats.Connect(largeSrv.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + cluster.waitOnClusterReadyWithNumPeers(3) + var resp JSApiLeaderStepDownResponse + req, err := json.Marshal(JSApiLeaderStepdownRequest{Placement: nil}) + require_NoError(t, err) + ncResp, err := nc.Request(JSApiLeaderStepDown, req, 10*time.Second) + require_NoError(t, err) + err = json.Unmarshal(ncResp.Data, &resp) + require_NoError(t, err) + require_True(t, resp.Error == nil) + }) } func TestJetStreamStreamLimitUpdate(t *testing.T) { From 61b67e36e17fa2c79b15132df2535dbe40432db5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 14 Oct 2024 09:54:43 -0700 Subject: [PATCH 5/8] Update for github.com/klauspost/compress Signed-off-by: Derek Collison --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index dd56502c6e9..2887522b904 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2 go 1.21.0 require ( - github.com/klauspost/compress v1.17.10 + github.com/klauspost/compress v1.17.11 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.5.8 github.com/nats-io/nats.go v1.36.0 diff --git a/go.sum b/go.sum index ef85af00ba5..20db5711c1d 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= -github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= From 179805fa04d0b31216394f290a62d865a372a764 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 15 Oct 2024 15:57:40 +0200 Subject: [PATCH 6/8] Fix race with store when fetching messages by a consumer (#6003) We were releasing and acquiring again lock while fetching messages to reduce contention. This could cause a consumer getting stuck in certain scenarios, like when `max messages per subject` was set to `1` and message was republished while consumer was trying to `NAK` the first one. - [ ] add a proper test Signed-off-by: Tomasz Pietrek --------- Signed-off-by: Tomasz Pietrek --- server/consumer.go | 4 -- server/jetstream_consumer_test.go | 114 ++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 991b0d9a728..9ef8fa1b7ab 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3621,9 +3621,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { var pmsg = getJSPubMsgFromPool() // Grab next message applicable to us. - // We will unlock here in case lots of contention, e.g. WQ. filters, subjf, fseq := o.filters, o.subjf, o.sseq - o.mu.Unlock() // Check if we are multi-filtered or not. if filters != nil { sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg) @@ -3638,8 +3636,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { pmsg.returnToPool() pmsg = nil } - // Re-acquire lock. - o.mu.Lock() // Check if we should move our o.sseq. if sseq >= o.sseq { // If we are moving step by step then sseq == o.sseq. diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 46ce0851f10..d4c9b349e7b 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" + "errors" "fmt" "math/rand" "slices" @@ -1224,6 +1225,119 @@ func TestJetStreamConsumerLongSubjectHang(t *testing.T) { } } +func TestJetStreamConsumerStuckAckPending(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + type ActiveWorkItem struct { + ID int + Expiry time.Time + } + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST_ACTIVE_WORK_ITEMS", + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Subjects: []string{"TEST_ACTIVE_WORK_ITEMS.>"}, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST_ACTIVE_WORK_ITEMS", &nats.ConsumerConfig{ + Durable: "testactiveworkitemsconsumer", + AckPolicy: nats.AckExplicitPolicy, + MaxAckPending: -1, + MaxWaiting: 20000, + AckWait: 15 * time.Second, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("TEST_ACTIVE_WORK_ITEMS.>", "testactiveworkitemsconsumer", nats.BindStream("TEST_ACTIVE_WORK_ITEMS")) + require_NoError(t, err) + + errs := make(chan error) + go func() { + for { + msgs, err := sub.Fetch(200) + if err != nil { + // test is done. stop the loop. + if errors.Is(err, nats.ErrSubscriptionClosed) || errors.Is(err, nats.ErrConnectionClosed) { + return + } + if !errors.Is(err, nats.ErrTimeout) { + errs <- err + return + } + continue + } + for _, msg := range msgs { + msg := msg + var workItem ActiveWorkItem + if err := json.Unmarshal(msg.Data, &workItem); err != nil { + errs <- err + return + } + + now := time.Now() + // If the work item has not expired, nak it with the respective delay. + if workItem.Expiry.After(now) { + msg.NakWithDelay(workItem.Expiry.Sub(now)) + } else { + msg.Ack() + } + } + } + }() + + for i := 0; i < 25_000; i++ { + // Publish item to TEST_ACTIVE_WORK_ITEMS stream with an expiry time. + workItem := ActiveWorkItem{ID: i, Expiry: time.Now().Add(30 * time.Second)} + data, err := json.Marshal(workItem) + require_NoError(t, err) + + _, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data) + require_NoError(t, err) + + // Update expiry time and republish item to TEST_ACTIVE_WORK_ITEMS stream. + workItem.Expiry = time.Now().Add(3 * time.Second) + data, err = json.Marshal(workItem) + require_NoError(t, err) + _, err = js.Publish(fmt.Sprintf("TEST_ACTIVE_WORK_ITEMS.%d", i), data) + require_NoError(t, err) + } + noChange := false + lastNumAckPending := 0 + checkFor(t, 60*time.Second, 3*time.Second, func() error { + select { + case err := <-errs: + t.Fatalf("consumer goroutine failed: %v", err) + default: + } + ci, err := js.ConsumerInfo("TEST_ACTIVE_WORK_ITEMS", "testactiveworkitemsconsumer") + require_NoError(t, err) + + if lastNumAckPending != 0 && lastNumAckPending == ci.NumAckPending { + noChange = true + } + lastNumAckPending = ci.NumAckPending + + // If we have no change since last check, we can fail the test before `totalWait` timeout. + if ci.NumAckPending > 0 && ci.NumPending == 0 { + if noChange { + _, err := sub.Fetch(1) + if err != nil && errors.Is(err, nats.ErrTimeout) { + + t.Fatalf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending) + } + } + return fmt.Errorf("num ack pending: %d\t num pending: %v\n", ci.NumAckPending, ci.NumPending) + } + return nil + }) +} + func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) { subject := "foo.bar.do.not.match.any.filter.subject" for n := 1; n <= 1024; n *= 2 { From c4f32514a0735ec385d027b3e64b7b7809bb8e34 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 15 Oct 2024 09:13:57 -0700 Subject: [PATCH 7/8] Fix possible panic on monitorStream when shutting down Signed-off-by: Waldemar Quevedo --- server/jetstream_cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index b7cc09cd218..9d7fc0550da 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2266,7 +2266,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // from underneath the one that is running since it will be the same raft node. defer func() { // We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots. - if !mset.closed.Load() { + if mset != nil && !mset.closed.Load() { n.Stop() } }() From 05331c3b25868d79057bcaeeada8d56ab42dacef Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 15 Oct 2024 09:31:17 -0700 Subject: [PATCH 8/8] Move two performance tests on checkInterest to NoRace suite. Signed-off-by: Derek Collison --- server/jetstream_cluster_2_test.go | 183 ---------------------------- server/norace_test.go | 186 +++++++++++++++++++++++++++++ 2 files changed, 186 insertions(+), 183 deletions(-) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index a78c35dc7f3..0388b0817f4 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7408,189 +7408,6 @@ func TestJetStreamClusterR1ConsumerAdvisory(t *testing.T) { checkSubsPending(t, sub, 2) } -func TestJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3F", 3) - defer c.shutdown() - - s := c.randomServer() - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo.*"}, - Retention: nats.WorkQueuePolicy, - }) - require_NoError(t, err) - - // Load up a bunch of messages for three different subjects. - msg := bytes.Repeat([]byte("Z"), 4096) - for i := 0; i < 100_000; i++ { - js.PublishAsync("foo.foo", msg) - } - for i := 0; i < 5_000; i++ { - js.PublishAsync("foo.bar", msg) - js.PublishAsync("foo.baz", msg) - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } - - // We will not process this one and leave it as "offline". - _, err = js.PullSubscribe("foo.foo", "A") - require_NoError(t, err) - subB, err := js.PullSubscribe("foo.bar", "B") - require_NoError(t, err) - subC, err := js.PullSubscribe("foo.baz", "C") - require_NoError(t, err) - - // Now catch up both B and C but let A simulate being offline of very behind. - for i := 0; i < 5; i++ { - for _, sub := range []*nats.Subscription{subB, subC} { - msgs, err := sub.Fetch(1000) - require_NoError(t, err) - require_Equal(t, len(msgs), 1000) - for _, m := range msgs { - m.Ack() - } - } - } - // Let acks process. - nc.Flush() - time.Sleep(200 * time.Millisecond) - - // Now test the check checkInterestState() on the stream. - sl := c.streamLeader(globalAccountName, "TEST") - mset, err := sl.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - - expireAllBlks := func() { - mset.mu.RLock() - fs := mset.store.(*fileStore) - mset.mu.RUnlock() - fs.mu.RLock() - for _, mb := range fs.blks { - mb.tryForceExpireCache() - } - fs.mu.RUnlock() - } - - // First expire all the blocks. - expireAllBlks() - - start := time.Now() - mset.checkInterestState() - elapsed := time.Since(start) - // This is actually ~300 microseconds but due to travis and race flags etc. - // Was > 30 ms before fix for comparison, M2 macbook air. - require_True(t, elapsed < 5*time.Millisecond) - - // Make sure we set the chkflr correctly. - checkFloor := func(o *consumer) uint64 { - require_True(t, o != nil) - o.mu.RLock() - defer o.mu.RUnlock() - return o.chkflr - } - - require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001) - require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001) - - // Expire all the blocks again. - expireAllBlks() - - // This checks the chkflr state. - start = time.Now() - mset.checkInterestState() - require_True(t, time.Since(start) < elapsed) -} - -func TestJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3F", 3) - defer c.shutdown() - - s := c.randomServer() - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"foo.*"}, - Retention: nats.InterestPolicy, - }) - require_NoError(t, err) - - // We will not process this one and leave it as "offline". - _, err = js.PullSubscribe("foo.foo", "A") - require_NoError(t, err) - _, err = js.PullSubscribe("foo.*", "B") - require_NoError(t, err) - // Make subC multi-subject. - _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ - Durable: "C", - FilterSubjects: []string{"foo.foo", "foo.bar", "foo.baz"}, - AckPolicy: nats.AckExplicitPolicy, - }) - require_NoError(t, err) - - // Load up a bunch of messages for three different subjects. - msg := bytes.Repeat([]byte("Z"), 4096) - for i := 0; i < 90_000; i++ { - js.PublishAsync("foo.foo", msg) - } - for i := 0; i < 5_000; i++ { - js.PublishAsync("foo.bar", msg) - js.PublishAsync("foo.baz", msg) - } - select { - case <-js.PublishAsyncComplete(): - case <-time.After(5 * time.Second): - t.Fatalf("Did not receive completion signal") - } - - // Now catch up both B and C but let A simulate being offline of very behind. - // Will do this manually here to speed up tests. - sl := c.streamLeader(globalAccountName, "TEST") - mset, err := sl.GlobalAccount().lookupStream("TEST") - require_NoError(t, err) - - for _, cname := range []string{"B", "C"} { - o := mset.lookupConsumer(cname) - o.mu.Lock() - o.setStoreState(&ConsumerState{ - Delivered: SequencePair{100_000, 100_000}, - AckFloor: SequencePair{100_000, 100_000}, - }) - o.mu.Unlock() - } - - // Now test the check checkInterestState() on the stream. - start := time.Now() - mset.checkInterestState() - elapsed := time.Since(start) - - // Make sure we set the chkflr correctly. - checkFloor := func(o *consumer) uint64 { - require_True(t, o != nil) - o.mu.RLock() - defer o.mu.RUnlock() - return o.chkflr - } - - require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) - require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) - - // This checks the chkflr state. For this test this should be much faster, - // two orders of magnitude then the first time. - start = time.Now() - mset.checkInterestState() - require_True(t, time.Since(start) < elapsed/100) -} - // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/norace_test.go b/server/norace_test.go index c2ddf6aa669..9914c57abaf 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10964,3 +10964,189 @@ func TestNoRaceFileStoreMsgLimitsAndOldRecoverState(t *testing.T) { }) } } + +func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + // Load up a bunch of messages for three different subjects. + msg := bytes.Repeat([]byte("Z"), 4096) + for i := 0; i < 100_000; i++ { + js.PublishAsync("foo.foo", msg) + } + for i := 0; i < 5_000; i++ { + js.PublishAsync("foo.bar", msg) + js.PublishAsync("foo.baz", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // We will not process this one and leave it as "offline". + _, err = js.PullSubscribe("foo.foo", "A") + require_NoError(t, err) + subB, err := js.PullSubscribe("foo.bar", "B") + require_NoError(t, err) + subC, err := js.PullSubscribe("foo.baz", "C") + require_NoError(t, err) + + // Now catch up both B and C but let A simulate being offline of very behind. + for i := 0; i < 5; i++ { + for _, sub := range []*nats.Subscription{subB, subC} { + msgs, err := sub.Fetch(1000) + require_NoError(t, err) + require_Equal(t, len(msgs), 1000) + for _, m := range msgs { + m.Ack() + } + } + } + // Let acks process. + nc.Flush() + time.Sleep(200 * time.Millisecond) + + // Now test the check checkInterestState() on the stream. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + expireAllBlks := func() { + mset.mu.RLock() + fs := mset.store.(*fileStore) + mset.mu.RUnlock() + fs.mu.RLock() + for _, mb := range fs.blks { + mb.tryForceExpireCache() + } + fs.mu.RUnlock() + } + + // First expire all the blocks. + expireAllBlks() + + start := time.Now() + mset.checkInterestState() + elapsed := time.Since(start) + // This is actually ~300 microseconds but due to travis and race flags etc. + // Was > 30 ms before fix for comparison, M2 macbook air. + require_True(t, elapsed < 5*time.Millisecond) + + // Make sure we set the chkflr correctly. + checkFloor := func(o *consumer) uint64 { + require_True(t, o != nil) + o.mu.RLock() + defer o.mu.RUnlock() + return o.chkflr + } + + require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001) + require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001) + + // Expire all the blocks again. + expireAllBlks() + + // This checks the chkflr state. + start = time.Now() + mset.checkInterestState() + require_True(t, time.Since(start) < elapsed) +} + +func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // We will not process this one and leave it as "offline". + _, err = js.PullSubscribe("foo.foo", "A") + require_NoError(t, err) + _, err = js.PullSubscribe("foo.*", "B") + require_NoError(t, err) + // Make subC multi-subject. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C", + FilterSubjects: []string{"foo.foo", "foo.bar", "foo.baz"}, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Load up a bunch of messages for three different subjects. + msg := bytes.Repeat([]byte("Z"), 4096) + for i := 0; i < 90_000; i++ { + js.PublishAsync("foo.foo", msg) + } + for i := 0; i < 5_000; i++ { + js.PublishAsync("foo.bar", msg) + js.PublishAsync("foo.baz", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + // This is so we do not asynchronously update our consumer state after we set the state due to notifications + // from new messages for the stream. + time.Sleep(250 * time.Millisecond) + + // Now catch up both B and C but let A simulate being offline of very behind. + // Will do this manually here to speed up tests. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + for _, cname := range []string{"B", "C"} { + o := mset.lookupConsumer(cname) + o.mu.Lock() + o.setStoreState(&ConsumerState{ + Delivered: SequencePair{100_000, 100_000}, + AckFloor: SequencePair{100_000, 100_000}, + }) + o.mu.Unlock() + } + + // Now test the check checkInterestState() on the stream. + start := time.Now() + mset.checkInterestState() + elapsed := time.Since(start) + + // Make sure we set the chkflr correctly. + checkFloor := func(o *consumer) uint64 { + require_True(t, o != nil) + o.mu.RLock() + defer o.mu.RUnlock() + return o.chkflr + } + + require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) + + // This checks the chkflr state. For this test this should be much faster, + // two orders of magnitude then the first time. + start = time.Now() + mset.checkInterestState() + require_True(t, time.Since(start) < elapsed/100) +}