From 580a39e25248a4b19aba4d44b9916b6da6edf0bd Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 14 Oct 2024 17:38:09 +0200 Subject: [PATCH 01/26] Updated NRG test helpers from #5987 Signed-off-by: Neil Twigg --- server/raft.go | 14 ++++++++++++-- server/raft_helpers_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/server/raft.go b/server/raft.go index 5397296d2f1..ce42130fb57 100644 --- a/server/raft.go +++ b/server/raft.go @@ -342,8 +342,8 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined}) } -// startRaftNode will start the raft node. -func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) { +// initRaftNode will initialize the raft node, to be used by startRaftNode or when testing to not run the Go routine. +func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (*raft, error) { if cfg == nil { return nil, errNilCfg } @@ -525,6 +525,16 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe labels["group"] = n.group s.registerRaftNode(n.group, n) + return n, nil +} + +// startRaftNode will start the raft node. +func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) { + n, err := s.initRaftNode(accName, cfg, labels) + if err != nil { + return nil, err + } + // Start the run goroutine for the Raft state machine. s.startGoRoutine(n.run, labels) diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 127a3642c45..837b434a7c8 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -317,3 +317,34 @@ func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine { return &stateAdder{s: s, n: n, cfg: cfg, lch: make(chan bool, 1)} } + +func initSingleMemRaftNode(t *testing.T) (*raft, func()) { + t.Helper() + c := createJetStreamClusterExplicit(t, "R3S", 3) + s := c.servers[0] // RunBasicJetStreamServer not available + + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms} + + err = s.bootstrapRaftNode(cfg, nil, false) + require_NoError(t, err) + n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + cleanup := func() { + c.shutdown() + } + return n, cleanup +} + +// Encode an AppendEntry. +// An AppendEntry is encoded into a buffer and that's stored into the WAL. +// This is a helper function to generate that buffer. +func encode(t *testing.T, ae *appendEntry) *appendEntry { + t.Helper() + buf, err := ae.encode(nil) + require_NoError(t, err) + ae.buf = buf + return ae +} From 1620a27ec375a068a8f043ce613a250ce7eb0083 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 29 Oct 2024 22:02:05 +0100 Subject: [PATCH 02/26] [FIXED] Don't remove snapshot if truncate to applied Signed-off-by: Maurice van Veen --- server/raft.go | 2 +- server/raft_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/server/raft.go b/server/raft.go index ce42130fb57..a0dc6402b33 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3108,7 +3108,7 @@ func (n *raft) truncateWAL(term, index uint64) { defer func() { // Check to see if we invalidated any snapshots that might have held state // from the entries we are truncating. - if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex >= index { + if snap, _ := n.loadLastSnapshot(); snap != nil && snap.lastIndex > index { os.Remove(n.snapfile) n.snapfile = _EMPTY_ } diff --git a/server/raft_test.go b/server/raft_test.go index 545561eec97..27a1aae4d30 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -18,6 +18,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "testing" "time" @@ -601,3 +602,49 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) { n.ResumeApply() checkState(false, false) } + +func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline. + aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeMsg, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 1) + + // Simulate upper layer calling down to apply. + n.Applied(1) + + // Install snapshot and check it exists. + err = n.InstallSnapshot(nil) + require_NoError(t, err) + + snapshots := path.Join(n.sd, snapshotsDir) + files, err := os.ReadDir(snapshots) + require_NoError(t, err) + require_Equal(t, len(files), 1) + + // Truncate and check snapshot is kept. + n.truncateWAL(n.pterm, n.applied) + + files, err = os.ReadDir(snapshots) + require_NoError(t, err) + require_Equal(t, len(files), 1) +} From 8a271b2bad7f610ba59eff94a1716b576ccc0a02 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 31 Oct 2024 11:26:05 +0100 Subject: [PATCH 03/26] NRG: Don't delete RAFT state if stream/consumer creation failed during shutdown Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 17 ++++++++++- server/jetstream_cluster_4_test.go | 48 ++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ca6763b9cbf..0effb10b96e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2834,7 +2834,7 @@ func (mset *stream) resetClusteredState(err error) bool { // If we detect we are shutting down just return. if js != nil && js.isShuttingDown() { - s.Debugf("Will not reset stream, jetstream shutting down") + s.Debugf("Will not reset stream, JetStream shutting down") return false } @@ -3835,6 +3835,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // This is an error condition. if err != nil { + // If we're shutting down we could get a variety of errors, for example: + // 'JetStream not enabled for account' when looking up the stream. + // Normally we can continue and delete state, but need to be careful when shutting down. + if js.isShuttingDown() { + s.Debugf("Could not create stream, JetStream shutting down") + return + } + if IsNatsErr(err, JSStreamStoreFailedF) { s.Warnf("Stream create failed for '%s > %s': %v", sa.Client.serviceAccount(), sa.Config.Name, err) err = errStreamStoreFailed @@ -4426,6 +4434,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } if err != nil { + // If we're shutting down we could get a variety of errors. + // Normally we can continue and delete state, but need to be careful when shutting down. + if js.isShuttingDown() { + s.Debugf("Could not create consumer, JetStream shutting down") + return + } + if IsNatsErr(err, JSConsumerStoreFailedErrF) { s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err) err = errConsumerStoreFailed diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 6a4e24a02b0..bfbd622fdfe 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3891,3 +3891,51 @@ func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) return checkState(t, c, globalAccountName, "TEST") }) } + +func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + nc.Close() + + // Capture RAFT storage directory and JetStream handle before shutdown. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + sd := mset.node.(*raft).sd + jss := s.getJetStream() + + // Shutdown the server. + // Normally there are no actions taken anymore after shutdown completes, + // but still do so to simulate actions taken while shutdown is in progress. + s.Shutdown() + s.WaitForShutdown() + + // Check RAFT state is kept. + files, err := os.ReadDir(sd) + require_NoError(t, err) + require_True(t, len(files) > 0) + + // Simulate server shutting down, JetStream being disabled and a stream being created. + sa := &streamAssignment{ + Config: &StreamConfig{Name: "TEST"}, + Group: &raftGroup{node: &raft{}}, + } + jss.processClusterCreateStream(acc, sa) + + // Check RAFT state is not deleted due to failing stream creation. + files, err = os.ReadDir(sd) + require_NoError(t, err) + require_True(t, len(files) > 0) +} From 60513a6afff725175e2ce0fa0fa65891082f6550 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 31 Oct 2024 22:09:33 +0100 Subject: [PATCH 04/26] Make 'accountName > streamName' logs consistent Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 27e8f4b626e..de014e74b72 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2556,7 +2556,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ cfg.Placement = origPlacement s.Noticef("Requested move for stream '%s > %s' R=%d from %+v to %+v", - streamName, accName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) + accName, streamName, cfg.Replicas, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) // We will always have peers and therefore never do a callout, therefore it is safe to call inline s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) @@ -2662,7 +2662,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli } s.Noticef("Requested cancel of move: R=%d '%s > %s' to peer set %+v and restore previous peer set %+v", - cfg.Replicas, streamName, accName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) + cfg.Replicas, accName, streamName, s.peerSetToNames(currPeers), s.peerSetToNames(peers)) // We will always have peers and therefore never do a callout, therefore it is safe to call inline s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) @@ -3557,7 +3557,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC if err != nil { resp.Error = NewJSStreamRestoreError(err, Unless(err)) s.Warnf("Restore failed for %s for stream '%s > %s' in %v", - friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start)) + friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start)) } else { resp.StreamInfo = &StreamInfo{ Created: mset.createdTime(), @@ -3566,7 +3566,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC TimeStamp: time.Now().UTC(), } s.Noticef("Completed restore of %s for stream '%s > %s' in %v", - friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start).Round(time.Millisecond)) + friendlyBytes(int64(total)), acc.Name, streamName, end.Sub(start).Round(time.Millisecond)) } // On the last EOF, send back the stream info or error status. From 5d5c22a98b8690fe32aa6700236a2847b381deae Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 25 Oct 2024 09:58:14 +0100 Subject: [PATCH 05/26] NRG: Use configured sync intervals for group stores Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0effb10b96e..e94b07789fe 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -782,8 +782,12 @@ func (js *jetStream) setupMetaGroup() error { sysAcc := s.SystemAccount() storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) + js.srv.optsMu.RLock() + syncAlways := js.srv.opts.SyncAlways + syncInterval := js.srv.opts.SyncInterval + js.srv.optsMu.RUnlock() fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, SyncAlways: syncAlways, SyncInterval: syncInterval, srv: s}, StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName), @@ -2082,8 +2086,13 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, rg.Name) var store StreamStore if storage == FileStorage { + // If the server is set to sync always, do the same for the Raft log. + js.srv.optsMu.RLock() + syncAlways := js.srv.opts.SyncAlways + syncInterval := js.srv.opts.SyncInterval + js.srv.optsMu.RUnlock() fs, err := newFileStoreWithCreated( - FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s}, + FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncAlways: syncAlways, SyncInterval: syncInterval, srv: s}, StreamConfig{Name: rg.Name, Storage: FileStorage, Metadata: labels}, time.Now().UTC(), s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name), From 143aeed238a0674a4ab1ea0b96d8ff9c483849d5 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 31 Oct 2024 15:39:46 +0000 Subject: [PATCH 06/26] Spread out first call of filestore sync intervals Signed-off-by: Neil Twigg --- server/filestore.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index 2208cd7859b..020026377aa 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -29,6 +29,7 @@ import ( "io" "io/fs" "math" + mrand "math/rand" "net" "os" "path/filepath" @@ -7833,7 +7834,11 @@ func (fs *fileStore) setSyncTimer() { if fs.syncTmr != nil { fs.syncTmr.Reset(fs.fcfg.SyncInterval) } else { - fs.syncTmr = time.AfterFunc(fs.fcfg.SyncInterval, fs.syncBlocks) + // First time this fires will be any time up to the fs.fcfg.SyncInterval, + // so that different stores are spread out, rather than having many of + // them trying to all sync at once, causing blips and contending dios. + start := time.Duration(mrand.Int63n(int64(fs.fcfg.SyncInterval))) + fs.syncTmr = time.AfterFunc(min(start, time.Second), fs.syncBlocks) } } From 141c8871d2a396ea7e0f19609e89117d32494f67 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 1 Nov 2024 10:53:20 +0000 Subject: [PATCH 07/26] Group metalayer consumer assignments by stream during recovery Signed-off-by: Neil Twigg Co-authored-by: Maurice van Veen --- server/jetstream_cluster.go | 59 +++++++++++++++++++++++------- server/jetstream_cluster_1_test.go | 44 ++++++++++++++++++++++ 2 files changed, 90 insertions(+), 13 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e94b07789fe..fa38ded5949 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1135,9 +1135,9 @@ func (js *jetStream) isMetaRecovering() bool { // During recovery track any stream and consumer delete and update operations. type recoveryUpdates struct { removeStreams map[string]*streamAssignment - removeConsumers map[string]*consumerAssignment + removeConsumers map[string]map[string]*consumerAssignment updateStreams map[string]*streamAssignment - updateConsumers map[string]*consumerAssignment + updateConsumers map[string]map[string]*consumerAssignment } // Called after recovery of the cluster on startup to check for any orphans. @@ -1342,9 +1342,9 @@ func (js *jetStream) monitorCluster() { ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), - removeConsumers: make(map[string]*consumerAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), updateStreams: make(map[string]*streamAssignment), - updateConsumers: make(map[string]*consumerAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), } // Make sure to cancel any pending checkForOrphans calls if the @@ -1371,8 +1371,10 @@ func (js *jetStream) monitorCluster() { // Signals we have replayed all of our metadata. js.clearMetaRecovering() // Process any removes that are still valid after recovery. - for _, ca := range ru.removeConsumers { - js.processConsumerRemoval(ca) + for _, cas := range ru.removeConsumers { + for _, ca := range cas { + js.processConsumerRemoval(ca) + } } for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) @@ -1382,8 +1384,10 @@ func (js *jetStream) monitorCluster() { js.processUpdateStreamAssignment(sa) } // Now consumers. - for _, ca := range ru.updateConsumers { - js.processConsumerAssignment(ca) + for _, cas := range ru.updateConsumers { + for _, ca := range cas { + js.processConsumerAssignment(ca) + } } // Clear. ru = nil @@ -1634,6 +1638,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa + delete(ru.updateConsumers, key) delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) @@ -1669,7 +1674,11 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove js.setConsumerAssignmentRecovering(ca) if isRecovering { key := ca.recoveryKey() - ru.removeConsumers[key] = ca + skey := ca.streamRecoveryKey() + if _, ok := ru.removeConsumers[skey]; !ok { + ru.removeConsumers[skey] = map[string]*consumerAssignment{} + } + ru.removeConsumers[skey][key] = ca delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) @@ -1679,8 +1688,12 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove js.setConsumerAssignmentRecovering(ca) if isRecovering { key := ca.recoveryKey() + skey := ca.streamRecoveryKey() delete(ru.removeConsumers, key) - ru.updateConsumers[key] = ca + if _, ok := ru.updateConsumers[skey]; !ok { + ru.updateConsumers[skey] = map[string]*consumerAssignment{} + } + ru.updateConsumers[skey][key] = ca } else { js.processConsumerAssignment(ca) } @@ -1893,6 +1906,13 @@ func (sa *streamAssignment) recoveryKey() string { return sa.Client.serviceAccount() + ksep + sa.Config.Name } +func (ca *consumerAssignment) streamRecoveryKey() string { + if ca == nil { + return _EMPTY_ + } + return ca.Client.serviceAccount() + ksep + ca.Stream +} + func (ca *consumerAssignment) recoveryKey() string { if ca == nil { return _EMPTY_ @@ -1943,6 +1963,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo key := sa.recoveryKey() ru.removeStreams[key] = sa delete(ru.updateStreams, key) + delete(ru.updateConsumers, key) } else { js.processStreamRemoval(sa) didRemoveStream = true @@ -1956,8 +1977,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() + skey := ca.streamRecoveryKey() delete(ru.removeConsumers, key) - ru.updateConsumers[key] = ca + if _, ok := ru.updateConsumers[skey]; !ok { + ru.updateConsumers[skey] = map[string]*consumerAssignment{} + } + ru.updateConsumers[skey][key] = ca } else { js.processConsumerAssignment(ca) } @@ -1970,8 +1995,12 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() + skey := ca.streamRecoveryKey() delete(ru.removeConsumers, key) - ru.updateConsumers[key] = ca + if _, ok := ru.updateConsumers[skey]; !ok { + ru.updateConsumers[skey] = map[string]*consumerAssignment{} + } + ru.updateConsumers[skey][key] = ca } else { js.processConsumerAssignment(ca) } @@ -1984,7 +2013,11 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo if isRecovering { js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() - ru.removeConsumers[key] = ca + skey := ca.streamRecoveryKey() + if _, ok := ru.removeConsumers[skey]; !ok { + ru.removeConsumers[skey] = map[string]*consumerAssignment{} + } + ru.removeConsumers[skey][key] = ca delete(ru.updateConsumers, key) } else { js.processConsumerRemoval(ca) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 983d6adf397..0d1279f62f4 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6498,6 +6498,50 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { } } +func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + js := c.leader().getJetStream() + + create := []*Entry{ + {EntryNormal, encodeAddStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + {EntryNormal, encodeAddConsumerAssignment(&consumerAssignment{ + Stream: "TEST", + Config: &ConsumerConfig{Name: "consumer"}, + })}, + } + + delete := []*Entry{ + {EntryNormal, encodeDeleteStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + // Need to be recovering so that we accumulate recoveryUpdates. + js.setMetaRecovering() + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Push recovery entries that create the stream & consumer. + _, _, _, err := js.applyMetaEntries(create, ru) + require_NoError(t, err) + require_Len(t, len(ru.updateConsumers), 1) + + // Now push another recovery entry that deletes the stream. The + // entry that creates the consumer should now be gone. + _, _, _, err = js.applyMetaEntries(delete, ru) + require_NoError(t, err) + require_Len(t, len(ru.removeStreams), 1) + require_Len(t, len(ru.updateConsumers), 0) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From 91c4c667d77a0a3d29463d96f29cfff29224af92 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 1 Nov 2024 11:53:37 +0000 Subject: [PATCH 08/26] Improve metalayer snapshot on shutdown We can't know whether `s.quitCh` or `qch` will fire first, since the `select` order is indeterminate. Try to snapshot either way. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index fa38ded5949..16b0925be65 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1355,6 +1355,8 @@ func (js *jetStream) monitorCluster() { for { select { case <-s.quitCh: + // Server shutting down, but we might receive this before qch, so try to snapshot. + doSnapshot() return case <-rqch: return From 0428c7fc78bb69561981b90d6f92a09fcbc3c9a9 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 1 Nov 2024 15:10:35 +0100 Subject: [PATCH 09/26] [FIXED] Handle recreating file-based stream to be memory on meta recovery Signed-off-by: Maurice van Veen Co-authored-by: Neil Twigg --- server/jetstream_cluster.go | 17 ++++++-- server/jetstream_cluster_1_test.go | 70 ++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 16b0925be65..acd661a595a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1136,6 +1136,7 @@ func (js *jetStream) isMetaRecovering() bool { type recoveryUpdates struct { removeStreams map[string]*streamAssignment removeConsumers map[string]map[string]*consumerAssignment + addStreams map[string]*streamAssignment updateStreams map[string]*streamAssignment updateConsumers map[string]map[string]*consumerAssignment } @@ -1343,6 +1344,7 @@ func (js *jetStream) monitorCluster() { ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), updateStreams: make(map[string]*streamAssignment), updateConsumers: make(map[string]map[string]*consumerAssignment), } @@ -1381,6 +1383,10 @@ func (js *jetStream) monitorCluster() { for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) } + // Process stream additions. + for _, sa := range ru.addStreams { + js.processStreamAssignment(sa) + } // Process pending updates. for _, sa := range ru.updateStreams { js.processUpdateStreamAssignment(sa) @@ -1641,6 +1647,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove key := sa.recoveryKey() ru.removeStreams[key] = sa delete(ru.updateConsumers, key) + delete(ru.addStreams, key) delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) @@ -1665,6 +1672,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := sa.recoveryKey() ru.updateStreams[key] = sa + delete(ru.addStreams, key) delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) @@ -1949,9 +1957,10 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - delete(ru.removeStreams, sa.recoveryKey()) - } - if js.processStreamAssignment(sa) { + key := sa.recoveryKey() + ru.addStreams[key] = sa + delete(ru.removeStreams, key) + } else if js.processStreamAssignment(sa) { didRemoveStream = true } case removeStreamOp: @@ -1964,6 +1973,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.removeStreams[key] = sa + delete(ru.addStreams, key) delete(ru.updateStreams, key) delete(ru.updateConsumers, key) } else { @@ -2035,6 +2045,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.updateStreams[key] = sa + delete(ru.addStreams, key) delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 0d1279f62f4..730a9346a0e 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6525,6 +6525,7 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), updateStreams: make(map[string]*streamAssignment), updateConsumers: make(map[string]map[string]*consumerAssignment), } @@ -6542,6 +6543,75 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { require_Len(t, len(ru.updateConsumers), 0) } +func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + js := c.leader().getJetStream() + + createFileStream := []*Entry{ + {EntryNormal, encodeAddStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + deleteFileStream := []*Entry{ + {EntryNormal, encodeDeleteStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + createMemoryStream := []*Entry{ + {EntryNormal, encodeAddStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + createConsumer := []*Entry{ + {EntryNormal, encodeAddConsumerAssignment(&consumerAssignment{ + Stream: "TEST", + Config: &ConsumerConfig{Name: "consumer"}, + })}, + } + + // Need to be recovering so that we accumulate recoveryUpdates. + js.setMetaRecovering() + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // We created a file-based stream first, but deleted it shortly after. + _, _, _, err := js.applyMetaEntries(createFileStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + + // Now push another recovery entry that deletes the stream. + // The file-based stream should not have been created. + _, _, _, err = js.applyMetaEntries(deleteFileStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 0) + require_Len(t, len(ru.removeStreams), 1) + + // Now stage a memory-based stream to be created. + _, _, _, err = js.applyMetaEntries(createMemoryStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + require_Len(t, len(ru.updateConsumers), 0) + + // Also create a consumer on that memory-based stream. + _, _, _, err = js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + require_Len(t, len(ru.updateConsumers), 1) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From 5f63c276ca2bcd492fb434b6c9632ff170a41882 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 4 Nov 2024 20:17:49 +0100 Subject: [PATCH 10/26] [FIXED] Panic on nil sysAcc Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index acd661a595a..f8040da853e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -780,6 +780,9 @@ func (js *jetStream) setupMetaGroup() error { // Setup our WAL for the metagroup. sysAcc := s.SystemAccount() + if sysAcc == nil { + return ErrNoSysAccount + } storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName) js.srv.optsMu.RLock() From 29959b3231082c18db3232185d970d4d33b6c35b Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 6 Nov 2024 18:23:49 +0100 Subject: [PATCH 11/26] Fix consumer with start sequence and multiple filters Signed-off-by: Tomasz Pietrek --- server/consumer.go | 2 +- server/jetstream_consumer_test.go | 51 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index a336c544dcf..c79a42480d0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4903,7 +4903,7 @@ func (o *consumer) selectStartingSeqNo() { for _, filter := range o.subjf { // Use first sequence since this is more optimized atm. ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject) - if ss.First > o.sseq && ss.First < nseq { + if ss.First >= o.sseq && ss.First < nseq { nseq = ss.First } } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index d4c9b349e7b..23cd38c1d47 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1338,6 +1338,57 @@ func TestJetStreamConsumerStuckAckPending(t *testing.T) { }) } +func TestJetStreamConsumerMultipleFitersWithStartDate(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + past := time.Now().Add(-90 * time.Second) + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "events.foo", "msg-1") + sendStreamMsg(t, nc, "events.bar", "msg-2") + sendStreamMsg(t, nc, "events.baz", "msg-3") + sendStreamMsg(t, nc, "events.biz", "msg-4") + sendStreamMsg(t, nc, "events.faz", "msg-5") + sendStreamMsg(t, nc, "events.foo", "msg-6") + sendStreamMsg(t, nc, "events.biz", "msg-7") + + for _, test := range []struct { + name string + filterSubjects []string + startTime time.Time + expectedMessages uint64 + expectedStreamSequence uint64 + }{ + {"Single-Filter-first-sequence", []string{"events.foo"}, past, 2, 0}, + {"Multiple-Filter-first-sequence", []string{"events.foo", "events.bar", "events.baz"}, past, 4, 0}, + {"Multiple-Filters-second-subject", []string{"events.bar", "events.baz"}, past, 2, 1}, + {"Multiple-Filters-first-last-subject", []string{"events.foo", "events.biz"}, past, 4, 0}, + {"Multiple-Filters-in-future", []string{"events.foo", "events.biz"}, time.Now().Add(1 * time.Minute), 0, 7}, + } { + t.Run(test.name, func(t *testing.T) { + info, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: test.name, + FilterSubjects: test.filterSubjects, + DeliverPolicy: nats.DeliverByStartTimePolicy, + OptStartTime: &test.startTime, + }) + require_NoError(t, err) + require_Equal(t, test.expectedStreamSequence, info.Delivered.Stream) + require_Equal(t, test.expectedMessages, info.NumPending) + }) + } + +} + func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) { subject := "foo.bar.do.not.match.any.filter.subject" for n := 1; n <= 1024; n *= 2 { From a4c8d3ce7c97134ab5fe78086da1415e2668efb1 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 7 Nov 2024 16:21:53 +0100 Subject: [PATCH 12/26] [FIXED] Freeze n.Applied for cluster when shutting down Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f8040da853e..0d6bc49e222 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1409,7 +1409,11 @@ func (js *jetStream) monitorCluster() { continue } if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { - _, nb := n.Applied(ce.Index) + var nb uint64 + // Some entries can fail without an error when shutting down, don't move applied forward. + if !js.isShuttingDown() { + _, nb = n.Applied(ce.Index) + } if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { doSnapshot() } else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 { From 9582085b11112300dad51b96a624a0501a4dd7bb Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 7 Nov 2024 15:16:01 +0000 Subject: [PATCH 13/26] Update params in `TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting` This may not fix this test entirely, but right now all of the different R=3 permutations are doing different things, so the results that it gives us are not comparatively useful at all. Instead this PR harmonises all the R=3 tests. Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index bfbd622fdfe..99d49dd1092 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -821,7 +821,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { ldmRestart: true, rolloutRestart: false, restarts: 1, - checkHealthz: false, + checkHealthz: true, } test(t, params, &nats.StreamConfig{ Name: "OWQTEST_R3M", @@ -847,6 +847,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { ldmRestart: true, rolloutRestart: false, restarts: 1, + checkHealthz: true, } test(t, params, &nats.StreamConfig{ Name: "OWQTEST_R3F_DN", @@ -871,6 +872,7 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { ldmRestart: true, rolloutRestart: false, restarts: 1, + checkHealthz: true, } test(t, params, &nats.StreamConfig{ Name: "OWQTEST_R3F_DO", @@ -891,13 +893,11 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) { // Clustered file based with discard old policy and no limits. t.Run("R3F_DO_NOLIMIT", func(t *testing.T) { params := &testParams{ - restartAny: false, - ldmRestart: true, - rolloutRestart: true, - restarts: 3, - checkHealthz: true, - reconnectRoutes: true, - reconnectClients: true, + restartAny: true, + ldmRestart: true, + rolloutRestart: false, + restarts: 1, + checkHealthz: true, } test(t, params, &nats.StreamConfig{ Name: "OWQTEST_R3F_DO_NOLIMIT", From ce3adb1afe6ce0e97801c95b4b517504bdfbef11 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 7 Nov 2024 15:05:44 +0100 Subject: [PATCH 14/26] [FIXED] Ghost consumers after failed meta proposal Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 29 +++++++++++++++++------------ server/jetstream_cluster_2_test.go | 6 ++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0d6bc49e222..149c4386e5e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,10 +138,11 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + pendingConsumers map[string]struct{} + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -4233,6 +4234,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca + delete(sa.pendingConsumers, ca.Name) + if len(sa.pendingConsumers) == 0 { + sa.pendingConsumers = nil + } js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -7369,7 +7374,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } if maxc > 0 { // Don't count DIRECTS. - total := 0 + total := len(sa.pendingConsumers) for cn, ca := range sa.consumers { if action == ActionCreateOrUpdate { // If the consumer name is specified and we think it already exists, then @@ -7609,14 +7614,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec ca = nca } - // Mark this as pending. - if sa.consumers == nil { - sa.consumers = make(map[string]*consumerAssignment) - } - sa.consumers[ca.Name] = ca - // Do formal proposal. - cc.meta.Propose(encodeAddConsumerAssignment(ca)) + if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil { + // Mark this as pending. + if sa.pendingConsumers == nil { + sa.pendingConsumers = make(map[string]struct{}) + } + sa.pendingConsumers[ca.Name] = struct{}{} + } } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 0388b0817f4..2935a232095 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2072,6 +2072,12 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { if nc := len(names); nc > 1 { t.Fatalf("Expected only 1 consumer, got %d", nc) } + + metaLeader := c.leader() + mjs := metaLeader.getJetStream() + sa := mjs.streamAssignment(globalAccountName, "MAXCC") + require_NotNil(t, sa) + require_True(t, sa.pendingConsumers == nil) } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) { From 61283c1fbc7a99a4fbf2c64871c49c1cb7e64fff Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 8 Nov 2024 10:11:43 -0800 Subject: [PATCH 15/26] Introduce a NumPendingMulti that uses a sublist for efficient selection. (#6089) Consumers with many filtered subjects and large streams could suffer from slow creation times due to NumPending() being called for all filtered subjects. This improvement is similar to the LoadNextMsgMulti for consumers with large number of filtered subjects. Also fixed a bug in normal NumPending for memstore. Signed-off-by: Derek Collison --------- Signed-off-by: Derek Collison --- server/consumer.go | 42 ++---- server/filestore.go | 306 ++++++++++++++++++++++++++++++++++++++- server/filestore_test.go | 50 +++++++ server/memstore.go | 166 ++++++++++++++++++++- server/memstore_test.go | 84 +++++++++++ server/store.go | 1 + 6 files changed, 609 insertions(+), 40 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index c79a42480d0..1d8b9fb6889 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -950,7 +950,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri // If we have multiple filter subjects, create a sublist which we will use // in calling store.LoadNextMsgMulti. if len(o.cfg.FilterSubjects) > 0 { - o.filters = NewSublistWithCache() + o.filters = NewSublistNoCache() for _, filter := range o.cfg.FilterSubjects { o.filters.Insert(&subscription{subject: []byte(filter)}) } @@ -1940,7 +1940,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { if len(o.subjf) == 1 { o.filters = nil } else { - o.filters = NewSublistWithCache() + o.filters = NewSublistNoCache() for _, filter := range o.subjf { o.filters.Insert(&subscription{subject: []byte(filter.subject)}) } @@ -3638,7 +3638,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { // Check if we are multi-filtered or not. if filters != nil { sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg) - } else if subjf != nil { // Means single filtered subject since o.filters means > 1. + } else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1. filter, wc := subjf[0].subject, subjf[0].hasWildcard sm, sseq, err = store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg) } else { @@ -4283,37 +4283,15 @@ func (o *consumer) calculateNumPending() (npc, npf uint64) { } isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject + filters, subjf := o.filters, o.subjf - // Deliver Last Per Subject calculates num pending differently. - if isLastPerSubject { - // Consumer without filters. - if o.subjf == nil { - return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) - } - // Consumer with filters. - for _, filter := range o.subjf { - lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject) - npc += lnpc - if lnpf > npf { - npf = lnpf // Always last - } - } - return npc, npf - } - // Every other Delivery Policy is handled here. - // Consumer without filters. - if o.subjf == nil { - return o.mset.store.NumPending(o.sseq, _EMPTY_, false) - } - // Consumer with filters. - for _, filter := range o.subjf { - lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, false) - npc += lnpc - if lnpf > npf { - npf = lnpf // Always last - } + if filters != nil { + return o.mset.store.NumPendingMulti(o.sseq, filters, isLastPerSubject) + } else if len(subjf) > 0 { + filter := subjf[0].subject + return o.mset.store.NumPending(o.sseq, filter, isLastPerSubject) } - return npc, npf + return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject) } func convertToHeadersOnly(pmsg *jsPubMsg) { diff --git a/server/filestore.go b/server/filestore.go index 020026377aa..789a7e38dca 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2819,7 +2819,9 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) _tsa, _fsa := [32]string{}, [32]string{} tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + if wc { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + } isMatch := func(subj string) bool { if isAll { @@ -2913,7 +2915,6 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) mb := fs.blks[i] // Hold write lock in case we need to load cache. mb.mu.Lock() - var t uint64 if isAll && sseq <= atomic.LoadUint64(&mb.first.seq) { total += mb.msgs mb.mu.Unlock() @@ -2928,6 +2929,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) // Mark fss activity. mb.lsts = time.Now().UnixNano() + var t uint64 var havePartial bool mb.fss.Match(stringToBytes(filter), func(bsubj []byte, ss *SimpleState) { if havePartial { @@ -2955,8 +2957,12 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) } // Clear on partial. t = 0 + start := sseq + if fseq := atomic.LoadUint64(&mb.first.seq); fseq > start { + start = fseq + } var smv StoreMsg - for seq, lseq := sseq, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { t++ } @@ -3061,6 +3067,300 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return total, validThrough } +// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence. +// Optimized for stream num pending calculations for consumers with lots of filtered subjects. +// Subjects should not overlap, this property is held when doing multi-filtered consumers. +func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) { + fs.mu.RLock() + defer fs.mu.RUnlock() + + // This can always be last for these purposes. + validThrough = fs.state.LastSeq + + if fs.state.Msgs == 0 || sseq > fs.state.LastSeq { + return 0, validThrough + } + + // If sseq is less then our first set to first. + if sseq < fs.state.FirstSeq { + sseq = fs.state.FirstSeq + } + // Track starting for both block for the sseq and staring block that matches any subject. + var seqStart int + // See if we need to figure out starting block per sseq. + if sseq > fs.state.FirstSeq { + // This should not, but can return -1, so make sure we check to avoid panic below. + if seqStart, _ = fs.selectMsgBlockWithIndex(sseq); seqStart < 0 { + seqStart = 0 + } + } + + isAll := sl == nil + + // See if filter was provided but its the only subject. + if !isAll && fs.psim.Size() == 1 { + fs.psim.Iter(func(subject []byte, _ *psi) bool { + isAll = sl.HasInterest(bytesToString(subject)) + return true + }) + } + // If we are isAll and have no deleted we can do a simpler calculation. + if !lastPerSubject && isAll && (fs.state.LastSeq-fs.state.FirstSeq+1) == fs.state.Msgs { + if sseq == 0 { + return fs.state.Msgs, validThrough + } + return fs.state.LastSeq - sseq + 1, validThrough + } + // Setup the isMatch function. + isMatch := func(subj string) bool { + if isAll { + return true + } + return sl.HasInterest(subj) + } + + // Handle last by subject a bit differently. + // We will scan PSIM since we accurately track the last block we have seen the subject in. This + // allows us to only need to load at most one block now. + // For the last block, we need to track the subjects that we know are in that block, and track seen + // while in the block itself, but complexity there worth it. + if lastPerSubject { + // If we want all and our start sequence is equal or less than first return number of subjects. + if isAll && sseq <= fs.state.FirstSeq { + return uint64(fs.psim.Size()), validThrough + } + // If we are here we need to scan. We are going to scan the PSIM looking for lblks that are >= seqStart. + // This will build up a list of all subjects from the selected block onward. + lbm := make(map[string]bool) + mb := fs.blks[seqStart] + bi := mb.index + + subs := make([]*subscription, 0, sl.Count()) + sl.All(&subs) + for _, sub := range subs { + fs.psim.Match(sub.subject, func(subj []byte, psi *psi) { + // If the select blk start is greater than entry's last blk skip. + if bi > psi.lblk { + return + } + total++ + // We will track the subjects that are an exact match to the last block. + // This is needed for last block processing. + if psi.lblk == bi { + lbm[string(subj)] = true + } + }) + } + + // Now check if we need to inspect the seqStart block. + // Grab write lock in case we need to load in msgs. + mb.mu.Lock() + var shouldExpire bool + // We need to walk this block to correct accounting from above. + if sseq > mb.first.seq { + // Track the ones we add back in case more than one. + seen := make(map[string]bool) + // We need to discount the total by subjects seen before sseq, but also add them right back in if they are >= sseq for this blk. + // This only should be subjects we know have the last blk in this block. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var smv StoreMsg + for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil || sm.subj == _EMPTY_ || !lbm[sm.subj] { + continue + } + if isMatch(sm.subj) { + // If less than sseq adjust off of total as long as this subject matched the last block. + if seq < sseq { + if !seen[sm.subj] { + total-- + seen[sm.subj] = true + } + } else if seen[sm.subj] { + // This is equal or more than sseq, so add back in. + total++ + // Make sure to not process anymore. + delete(seen, sm.subj) + } + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + return total, validThrough + } + + // If we would need to scan more from the beginning, revert back to calculating directly here. + if seqStart >= (len(fs.blks) / 2) { + for i := seqStart; i < len(fs.blks); i++ { + var shouldExpire bool + mb := fs.blks[i] + // Hold write lock in case we need to load cache. + mb.mu.Lock() + if isAll && sseq <= atomic.LoadUint64(&mb.first.seq) { + total += mb.msgs + mb.mu.Unlock() + continue + } + // If we are here we need to at least scan the subject fss. + // Make sure we have fss loaded. + if mb.fssNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Mark fss activity. + mb.lsts = time.Now().UnixNano() + + var t uint64 + var havePartial bool + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + subj := bytesToString(bsubj) + if havePartial || !sl.HasInterest(subj) { + // If we already found a partial then don't do anything else. + return !havePartial + } + if ss.firstNeedsUpdate { + mb.recalculateFirstForSubj(subj, ss.First, ss) + } + if sseq <= ss.First { + t += ss.Msgs + } else if sseq <= ss.Last { + // We matched but its a partial. + havePartial = true + } + return !havePartial + }) + + // See if we need to scan msgs here. + if havePartial { + // Make sure we have the cache loaded. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Clear on partial. + t = 0 + start := sseq + if fseq := atomic.LoadUint64(&mb.first.seq); fseq > start { + start = fseq + } + var smv StoreMsg + for seq, lseq := start, atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ { + if sm, _ := mb.cacheLookup(seq, &smv); sm != nil && isMatch(sm.subj) { + t++ + } + } + } + // If we loaded this block for this operation go ahead and expire it here. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + total += t + } + return total, validThrough + } + + // If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks. + start := uint32(math.MaxUint32) + subs := make([]*subscription, 0, sl.Count()) + sl.All(&subs) + for _, sub := range subs { + fs.psim.Match(sub.subject, func(_ []byte, psi *psi) { + total += psi.total + // Keep track of start index for this subject. + if psi.fblk < start { + start = psi.fblk + } + }) + } + // See if we were asked for all, if so we are done. + if sseq <= fs.state.FirstSeq { + return total, validThrough + } + + // If we are here we need to calculate partials for the first blocks. + firstSubjBlk := fs.bim[start] + var firstSubjBlkFound bool + // Adjust in case not found. + if firstSubjBlk == nil { + firstSubjBlkFound = true + } + + // Track how many we need to adjust against the total. + var adjust uint64 + for i := 0; i <= seqStart; i++ { + mb := fs.blks[i] + // We can skip blks if we know they are below the first one that has any subject matches. + if !firstSubjBlkFound { + if firstSubjBlkFound = (mb == firstSubjBlk); !firstSubjBlkFound { + continue + } + } + // We need to scan this block. + var shouldExpire bool + mb.mu.Lock() + // Check if we should include all of this block in adjusting. If so work with metadata. + if sseq > atomic.LoadUint64(&mb.last.seq) { + if isAll { + adjust += mb.msgs + } else { + // We need to adjust for all matches in this block. + // Make sure we have fss loaded. This loads whole block now. + if mb.fssNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + // Mark fss activity. + mb.lsts = time.Now().UnixNano() + mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + if sl.HasInterest(bytesToString(bsubj)) { + adjust += ss.Msgs + } + return true + }) + } + } else { + // This is the last block. We need to scan per message here. + if mb.cacheNotLoaded() { + mb.loadMsgsWithLock() + shouldExpire = true + } + var last = atomic.LoadUint64(&mb.last.seq) + if sseq < last { + last = sseq + } + // We need to walk all messages in this block + var smv StoreMsg + for seq := atomic.LoadUint64(&mb.first.seq); seq < last; seq++ { + sm, _ := mb.cacheLookup(seq, &smv) + if sm == nil || sm.subj == _EMPTY_ { + continue + } + // Check if it matches our filter. + if sm.seq < sseq && isMatch(sm.subj) { + adjust++ + } + } + } + // If we loaded the block try to force expire. + if shouldExpire { + mb.tryForceExpireCacheLocked() + } + mb.mu.Unlock() + } + // Make final adjustment. + total -= adjust + + return total, validThrough +} + // SubjectsTotal return message totals per subject. func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 { fs.mu.RLock() diff --git a/server/filestore_test.go b/server/filestore_test.go index 4f409d2e5af..1be968f8b11 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -40,6 +40,7 @@ import ( "time" "github.com/klauspost/compress/s2" + "github.com/nats-io/nuid" ) func testFileStoreAllPermutations(t *testing.T, fn func(t *testing.T, fcfg FileStoreConfig)) { @@ -8144,3 +8145,52 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) { err = fs.recoverFullState() require_Error(t, err, errCorruptState) } + +func TestFileStoreNumPendingMulti(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"ev.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + totalMsgs := 100_000 + totalSubjects := 10_000 + numFiltered := 5000 + startSeq := uint64(5_000 + rand.Intn(90_000)) + + subjects := make([]string, 0, totalSubjects) + for i := 0; i < totalSubjects; i++ { + subjects = append(subjects, fmt.Sprintf("ev.%s", nuid.Next())) + } + + // Put in 100k msgs with random subjects. + msg := bytes.Repeat([]byte("ZZZ"), 333) + for i := 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg(subjects[rand.Intn(totalSubjects)], nil, msg) + require_NoError(t, err) + } + + // Now we want to do a calculate NumPendingMulti. + filters := NewSublistNoCache() + for filters.Count() < uint32(numFiltered) { + filter := subjects[rand.Intn(totalSubjects)] + if !filters.HasInterest(filter) { + filters.Insert(&subscription{subject: []byte(filter)}) + } + } + + // Use new function. + total, _ := fs.NumPendingMulti(startSeq, filters, false) + + // Check our results. + var checkTotal uint64 + var smv StoreMsg + for seq := startSeq; seq <= uint64(totalMsgs); seq++ { + sm, err := fs.LoadMsg(seq, &smv) + require_NoError(t, err) + if filters.HasInterest(sm.subj) { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} diff --git a/server/memstore.go b/server/memstore.go index 8cd9070eb7a..64966f4455d 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -359,15 +359,13 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { } func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubject bool) SimpleState { - var ss SimpleState - if sseq < ms.state.FirstSeq { sseq = ms.state.FirstSeq } // If past the end no results. if sseq > ms.state.LastSeq { - return ss + return SimpleState{} } if filter == _EMPTY_ { @@ -391,9 +389,10 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje _tsa, _fsa := [32]string{}, [32]string{} tsa, fsa := _tsa[:0], _fsa[:0] - fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) wc := subjectHasWildcard(filter) - + if wc { + fsa = tokenizeSubjectIntoSlice(fsa[:0], filter) + } // 1. See if we match any subs from fss. // 2. If we match and the sseq is past ss.Last then we can use meta only. // 3. If we match we need to do a partial, break and clear any totals and do a full scan like num pending. @@ -409,6 +408,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje return isSubsetMatchTokenized(tsa, fsa) } + var ss SimpleState update := func(fss *SimpleState) { msgs, first, last := fss.Msgs, fss.First, fss.Last if lastPerSubject { @@ -424,6 +424,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje } var havePartial bool + var totalSkipped uint64 // We will track start and end sequences as we go. ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) { if fss.firstNeedsUpdate { @@ -436,6 +437,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje havePartial = true // Don't break here, we will update to keep tracking last. update(fss) + } else { + totalSkipped += fss.Msgs } }) @@ -492,6 +495,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje } else { // We will adjust from the totals above by scanning what we need to exclude. ss.First = first + ss.Msgs += totalSkipped var adjust uint64 var tss *SimpleState @@ -630,6 +634,158 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool) return ss.Msgs, ms.state.LastSeq } +// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence. +func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) { + if sl == nil { + return ms.NumPending(sseq, fwcs, lastPerSubject) + } + + // This needs to be a write lock, as we can mutate the per-subject state. + ms.mu.Lock() + defer ms.mu.Unlock() + + var ss SimpleState + if sseq < ms.state.FirstSeq { + sseq = ms.state.FirstSeq + } + // If past the end no results. + if sseq > ms.state.LastSeq { + return 0, ms.state.LastSeq + } + + update := func(fss *SimpleState) { + msgs, first, last := fss.Msgs, fss.First, fss.Last + if lastPerSubject { + msgs, first = 1, last + } + ss.Msgs += msgs + if ss.First == 0 || first < ss.First { + ss.First = first + } + if last > ss.Last { + ss.Last = last + } + } + + var havePartial bool + var totalSkipped uint64 + // We will track start and end sequences as we go. + ms.fss.Iter(func(subj []byte, fss *SimpleState) bool { + if !sl.HasInterest(bytesToString(subj)) { + return true + } + if fss.firstNeedsUpdate { + ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) + } + if sseq <= fss.First { + update(fss) + } else if sseq <= fss.Last { + // We matched but it is a partial. + havePartial = true + // Don't break here, we will update to keep tracking last. + update(fss) + } else { + totalSkipped += fss.Msgs + } + return true + }) + + // If we did not encounter any partials we can return here. + if !havePartial { + return ss.Msgs, ms.state.LastSeq + } + + // If we are here we need to scan the msgs. + // Capture first and last sequences for scan and then clear what we had. + first, last := ss.First, ss.Last + // To track if we decide to exclude we need to calculate first. + if first < sseq { + first = sseq + } + + // Now we want to check if it is better to scan inclusive and recalculate that way + // or leave and scan exclusive and adjust our totals. + // ss.Last is always correct here. + toScan, toExclude := last-first, first-ms.state.FirstSeq+ms.state.LastSeq-ss.Last + var seen map[string]bool + if lastPerSubject { + seen = make(map[string]bool) + } + if toScan < toExclude { + ss.Msgs, ss.First = 0, 0 + + update := func(sm *StoreMsg) { + ss.Msgs++ + if ss.First == 0 { + ss.First = sm.seq + } + if seen != nil { + seen[sm.subj] = true + } + } + // Check if easier to just scan msgs vs the sequence range. + // This can happen with lots of interior deletes. + if last-first > uint64(len(ms.msgs)) { + for _, sm := range ms.msgs { + if sm.seq >= first && sm.seq <= last && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } else { + for seq := first; seq <= last; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } + } else { + // We will adjust from the totals above by scanning what we need to exclude. + ss.First = first + ss.Msgs += totalSkipped + var adjust uint64 + var tss *SimpleState + + update := func(sm *StoreMsg) { + if lastPerSubject { + tss, _ = ms.fss.Find(stringToBytes(sm.subj)) + } + // If we are last per subject, make sure to only adjust if all messages are before our first. + if tss == nil || tss.Last < first { + adjust++ + } + if seen != nil { + seen[sm.subj] = true + } + } + // Check if easier to just scan msgs vs the sequence range. + if first-ms.state.FirstSeq > uint64(len(ms.msgs)) { + for _, sm := range ms.msgs { + if sm.seq < first && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } else { + for seq := ms.state.FirstSeq; seq < first; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + update(sm) + } + } + } + // Now do range at end. + for seq := last + 1; seq < ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok && !seen[sm.subj] && sl.HasInterest(sm.subj) { + adjust++ + if seen != nil { + seen[sm.subj] = true + } + } + } + ss.Msgs -= adjust + } + + return ss.Msgs, ms.state.LastSeq +} + // Will check the msg limit for this tracked subject. // Lock should be held. func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) { diff --git a/server/memstore_test.go b/server/memstore_test.go index b1232d4f301..68e2e2b55da 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -24,6 +24,8 @@ import ( "reflect" "testing" "time" + + "github.com/nats-io/nuid" ) func TestMemStoreBasics(t *testing.T) { @@ -968,6 +970,88 @@ func TestMemStoreDeleteAllFirstSequenceCheck(t *testing.T) { require_Equal(t, state.Msgs, 0) } +func TestMemStoreNumPendingMulti(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"ev.*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + totalMsgs := 100_000 + totalSubjects := 10_000 + numFiltered := 5000 + startSeq := uint64(5_000 + rand.Intn(90_000)) + + subjects := make([]string, 0, totalSubjects) + for i := 0; i < totalSubjects; i++ { + subjects = append(subjects, fmt.Sprintf("ev.%s", nuid.Next())) + } + + // Put in 100k msgs with random subjects. + msg := bytes.Repeat([]byte("ZZZ"), 333) + for i := 0; i < totalMsgs; i++ { + _, _, err = ms.StoreMsg(subjects[rand.Intn(totalSubjects)], nil, msg) + require_NoError(t, err) + } + + // Now we want to do a calculate NumPendingMulti. + filters := NewSublistNoCache() + for filters.Count() < uint32(numFiltered) { + filter := subjects[rand.Intn(totalSubjects)] + if !filters.HasInterest(filter) { + filters.Insert(&subscription{subject: []byte(filter)}) + } + } + + // Use new function. + total, _ := ms.NumPendingMulti(startSeq, filters, false) + + // Check our results. + var checkTotal uint64 + var smv StoreMsg + for seq := startSeq; seq <= uint64(totalMsgs); seq++ { + sm, err := ms.LoadMsg(seq, &smv) + require_NoError(t, err) + if filters.HasInterest(sm.subj) { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} + +func TestMemStoreNumPendingBug(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"foo.*"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + // 12 msgs total + for _, subj := range []string{"foo.foo", "foo.bar", "foo.baz", "foo.zzz"} { + ms.StoreMsg("foo.aaa", nil, nil) + ms.StoreMsg(subj, nil, nil) + ms.StoreMsg(subj, nil, nil) + } + total, _ := ms.NumPending(4, "foo.*", false) + + var checkTotal uint64 + var smv StoreMsg + for seq := 4; seq <= 12; seq++ { + sm, err := ms.LoadMsg(uint64(seq), &smv) + require_NoError(t, err) + if subjectIsSubsetMatch(sm.subj, "foo.*") { + checkTotal++ + } + } + require_Equal(t, total, checkTotal) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/store.go b/server/store.go index 661959d172b..2bd1fb945a2 100644 --- a/server/store.go +++ b/server/store.go @@ -101,6 +101,7 @@ type StreamStore interface { SubjectsState(filterSubject string) map[string]SimpleState SubjectsTotals(filterSubject string) map[string]uint64 NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64) + NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) State() StreamState FastState(*StreamState) EncodedStreamState(failed uint64) (enc []byte, err error) From debe8088e93581739e6c1b331de7dc909431598f Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sat, 9 Nov 2024 03:39:15 +0100 Subject: [PATCH 16/26] [FIXED] Ghost consumers during meta recovery (#6092) During meta recovery `ru.updateConsumers` and `ru.removeConsumers` would not be properly cleared since the move from `map[string]*consumerAssignment` to `map[string]map[string]*consumerAssignment`. Which meant that consumers that needed to be removed were both in `ru.removeConsumers` and left in `ru.updateConsumers`. Resulting in a ghost consumer. Also don't clear recovering state while we still have items to process as part of recovery. De-flakes `TestJetStreamClusterLostConsumers`, and makes `TestJetStreamClusterConsumerLeak` more reliable by re-introducing the `ca.pending` flag. Since the consumer leader responds for consumer creation, but meta leader responds for consumer deletion, so need to have the consumer assignment available so meta leader can respond successfully. Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 57 +++++++++++++++++++----------- server/jetstream_cluster_1_test.go | 50 ++++++++++++++++++++++++++ server/jetstream_cluster_2_test.go | 4 ++- server/jetstream_cluster_3_test.go | 7 ++-- server/jetstream_cluster_4_test.go | 51 ++++++++++++++++++++++++++ server/norace_test.go | 47 +++++++++++++++++------- 6 files changed, 178 insertions(+), 38 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 149c4386e5e..e8357f91d78 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,11 +138,10 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - pendingConsumers map[string]struct{} - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -159,6 +158,7 @@ type consumerAssignment struct { // Internal responded bool recovering bool + pending bool deleted bool err error } @@ -1376,8 +1376,6 @@ func (js *jetStream) monitorCluster() { ces := aq.pop() for _, ce := range ces { if ce == nil { - // Signals we have replayed all of our metadata. - js.clearMetaRecovering() // Process any removes that are still valid after recovery. for _, cas := range ru.removeConsumers { for _, ca := range cas { @@ -1401,6 +1399,8 @@ func (js *jetStream) monitorCluster() { js.processConsumerAssignment(ca) } } + // Signals we have replayed all of our metadata. + js.clearMetaRecovering() // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") @@ -1556,6 +1556,11 @@ func (js *jetStream) metaSnapshot() []byte { Consumers: make([]*consumerAssignment, 0, len(sa.consumers)), } for _, ca := range sa.consumers { + // Skip if the consumer is pending, we can't include it in our snapshot. + // If the proposal fails after we marked it pending, it would result in a ghost consumer. + if ca.pending { + continue + } wsa.Consumers = append(wsa.Consumers, ca) } streams = append(streams, wsa) @@ -1654,9 +1659,10 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa - delete(ru.updateConsumers, key) delete(ru.addStreams, key) delete(ru.updateStreams, key) + delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) } @@ -1697,7 +1703,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove ru.removeConsumers[skey] = map[string]*consumerAssignment{} } ru.removeConsumers[skey][key] = ca - delete(ru.updateConsumers, key) + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } } else { js.processConsumerRemoval(ca) } @@ -1707,7 +1715,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -1984,6 +1994,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.addStreams, key) delete(ru.updateStreams, key) delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) didRemoveStream = true @@ -1998,7 +2009,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -2016,7 +2029,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -2038,7 +2053,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ru.removeConsumers[skey] = map[string]*consumerAssignment{} } ru.removeConsumers[skey][key] = ca - delete(ru.updateConsumers, key) + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } } else { js.processConsumerRemoval(ca) didRemoveConsumer = true @@ -4234,10 +4251,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca - delete(sa.pendingConsumers, ca.Name) - if len(sa.pendingConsumers) == 0 { - sa.pendingConsumers = nil - } + ca.pending = false js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -7374,7 +7388,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } if maxc > 0 { // Don't count DIRECTS. - total := len(sa.pendingConsumers) + total := 0 for cn, ca := range sa.consumers { if action == ActionCreateOrUpdate { // If the consumer name is specified and we think it already exists, then @@ -7617,10 +7631,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // Do formal proposal. if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil { // Mark this as pending. - if sa.pendingConsumers == nil { - sa.pendingConsumers = make(map[string]struct{}) + if sa.consumers == nil { + sa.consumers = make(map[string]*consumerAssignment) } - sa.pendingConsumers[ca.Name] = struct{}{} + ca.pending = true + sa.consumers[ca.Name] = ca } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 730a9346a0e..46d691c2ad9 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6612,6 +6612,56 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { require_Len(t, len(ru.updateConsumers), 1) } +func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { + tests := []struct { + title string + encodeAddConsumerAssignment func(ca *consumerAssignment) []byte + }{ + {title: "simple", encodeAddConsumerAssignment: encodeAddConsumerAssignment}, + {title: "compressed", encodeAddConsumerAssignment: encodeAddConsumerAssignmentCompressed}, + } + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + js := c.leader().getJetStream() + + ca := &consumerAssignment{Stream: "TEST", Name: "consumer"} + createConsumer := []*Entry{{EntryNormal, test.encodeAddConsumerAssignment(ca)}} + deleteConsumer := []*Entry{{EntryNormal, encodeDeleteConsumerAssignment(ca)}} + + // Need to be recovering so that we accumulate recoveryUpdates. + js.setMetaRecovering() + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Creating the consumer should append to update consumers list. + _, _, _, err := js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.updateConsumers[":TEST"]), 1) + require_Len(t, len(ru.removeConsumers), 0) + + // Deleting the consumer should append to remove consumers list and remove from update list. + _, _, _, err = js.applyMetaEntries(deleteConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.removeConsumers[":TEST"]), 1) + require_Len(t, len(ru.updateConsumers[":TEST"]), 0) + + // When re-creating the consumer, add to update list and remove from remove list. + _, _, _, err = js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.updateConsumers[":TEST"]), 1) + require_Len(t, len(ru.removeConsumers[":TEST"]), 0) + }) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 2935a232095..9a3d8abbb3b 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2077,7 +2077,9 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { mjs := metaLeader.getJetStream() sa := mjs.streamAssignment(globalAccountName, "MAXCC") require_NotNil(t, sa) - require_True(t, sa.pendingConsumers == nil) + for _, ca := range sa.consumers { + require_False(t, ca.pending) + } } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 25fcab9fc05..a1089bd182a 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2376,6 +2376,7 @@ func TestJetStreamClusterLostConsumers(t *testing.T) { Stream: "TEST", Config: ConsumerConfig{ AckPolicy: AckExplicit, + Replicas: 1, }, } req, err := json.Marshal(cc) @@ -2383,11 +2384,11 @@ func TestJetStreamClusterLostConsumers(t *testing.T) { reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST") - // Now create 50 consumers. We do not wait for the answer. + // Now create 50 consumers. Ensure they are successfully created, so they're included in our snapshot. for i := 0; i < 50; i++ { - nc.Publish(reqSubj, req) + _, err = nc.Request(reqSubj, req, time.Second) + require_NoError(t, err) } - nc.Flush() // Grab the meta leader. ml := c.leader() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 99d49dd1092..ce17ddf51b2 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3939,3 +3939,54 @@ func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *te require_NoError(t, err) require_True(t, len(files) > 0) } + +func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}) + require_NoError(t, err) + + // We're creating an R3 consumer, just so we can copy its state and turn it into pending below. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Name: "consumer", Replicas: 3}) + require_NoError(t, err) + nc.Close() + + // Bypass normal API so we can simulate having a consumer pending to be created. + // A snapshot should never create pending consumers, as that would result + // in ghost consumers if the meta proposal failed. + ml := c.leader() + mjs := ml.getJetStream() + cc := mjs.cluster + consumers := cc.streams[globalAccountName]["TEST"].consumers + sampleCa := *consumers["consumer"] + sampleCa.Name, sampleCa.pending = "pending-consumer", true + consumers[sampleCa.Name] = &sampleCa + + // Create snapshot, this should not contain pending consumers. + snap := mjs.metaSnapshot() + + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + err = mjs.applyMetaSnapshot(snap, ru, true) + require_NoError(t, err) + require_Len(t, len(ru.updateStreams), 1) + for _, sa := range ru.updateStreams { + for _, ca := range sa.consumers { + require_NotEqual(t, ca.Name, "pending-consumer") + } + } + for _, cas := range ru.updateConsumers { + for _, ca := range cas { + require_NotEqual(t, ca.Name, "pending-consumer") + } + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 9914c57abaf..4ee14748a02 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6670,22 +6670,43 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { time.Sleep(5 * time.Second) cancel() - getMissing := func() []string { - m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) - require_NoError(t, err) - + // Check we don't report missing consumers. + subj := fmt.Sprintf(JSApiConsumerListT, "TEST") + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + // Request will take at most 4 seconds if some consumers can't be found. + m, err := nc.Request(subj, nil, 5*time.Second) + if err != nil { + return err + } var resp JSApiConsumerListResponse - err = json.Unmarshal(m.Data, &resp) - require_NoError(t, err) - return resp.Missing - } - - checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { - missing := getMissing() - if len(missing) == 0 { + require_NoError(t, json.Unmarshal(m.Data, &resp)) + if len(resp.Missing) == 0 { return nil } - return fmt.Errorf("Still have missing: %+v", missing) + return fmt.Errorf("Still have missing: %+v", resp.Missing) + }) + + // Also check all servers agree on the available consumer assignments. + // It could be the above check passes, i.e. our meta leader thinks all is okay, but other servers actually drifted. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var previousConsumers []string + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.Lock() + cc := sjs.cluster + sa := cc.streams[globalAccountName]["TEST"] + var consumers []string + for cName := range sa.consumers { + consumers = append(consumers, cName) + } + sjs.mu.Unlock() + slices.Sort(consumers) + if previousConsumers != nil && !slices.Equal(previousConsumers, consumers) { + return fmt.Errorf("Consumer mismatch:\n- previous: %v\n- actual : %v\n", previousConsumers, consumers) + } + previousConsumers = consumers + } + return nil }) } From 239e13b92c76362ea15731b64c62dd3a5da389c4 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sun, 10 Nov 2024 17:02:36 +0100 Subject: [PATCH 17/26] De-flake TestJetStreamClusterInterestLeakOnDisableJetStream, RAFT groups close asynchronously Signed-off-by: Maurice van Veen --- server/jetstream_cluster_3_test.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index a1089bd182a..5499881c091 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -3338,20 +3338,23 @@ func TestJetStreamClusterInterestLeakOnDisableJetStream(t *testing.T) { server.DisableJetStream() - var sublist []*subscription - account.sl.localSubs(&sublist, false) - - var danglingJSC, danglingRaft int - for _, sub := range sublist { - if strings.HasPrefix(string(sub.subject), "$JSC.") { - danglingJSC++ - } else if strings.HasPrefix(string(sub.subject), "$NRG.") { - danglingRaft++ + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + var sublist []*subscription + account.sl.localSubs(&sublist, false) + + var danglingJSC, danglingRaft int + for _, sub := range sublist { + if strings.HasPrefix(string(sub.subject), "$JSC.") { + danglingJSC++ + } else if strings.HasPrefix(string(sub.subject), "$NRG.") { + danglingRaft++ + } } - } - if danglingJSC > 0 || danglingRaft > 0 { - t.Fatalf("unexpected dangling interests for JetStream assets after shutdown (%d $JSC, %d $NRG)", danglingJSC, danglingRaft) - } + if danglingJSC > 0 || danglingRaft > 0 { + return fmt.Errorf("unexpected dangling interests for JetStream assets after shutdown (%d $JSC, %d $NRG)", danglingJSC, danglingRaft) + } + return nil + }) } func TestJetStreamClusterNoLeadersDuringLameDuck(t *testing.T) { From 2e4fbade503d37542ea2ea342adeca4cf74d8c29 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sun, 10 Nov 2024 22:05:46 +0100 Subject: [PATCH 18/26] De-flake TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ Signed-off-by: Maurice van Veen --- server/norace_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/norace_test.go b/server/norace_test.go index 4ee14748a02..5932a207073 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11063,7 +11063,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { elapsed := time.Since(start) // This is actually ~300 microseconds but due to travis and race flags etc. // Was > 30 ms before fix for comparison, M2 macbook air. - require_True(t, elapsed < 5*time.Millisecond) + require_LessThan(t, elapsed, 5*time.Millisecond) // Make sure we set the chkflr correctly. checkFloor := func(o *consumer) uint64 { @@ -11083,7 +11083,8 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { // This checks the chkflr state. start = time.Now() mset.checkInterestState() - require_True(t, time.Since(start) < elapsed) + elapsed = time.Since(start) + require_LessThan(t, elapsed, 5*time.Millisecond) } func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.T) { From 1910f0511e3d6741ed521bc4ee058882f5bb3745 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sun, 10 Nov 2024 21:30:03 +0100 Subject: [PATCH 19/26] De-flake TestJetStreamSuperClusterMovingStreamAndMoveBack Signed-off-by: Maurice van Veen --- server/jetstream_super_cluster_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 0e095bd30d4..80fd87a6202 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -2122,12 +2122,16 @@ func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) { checkMove("C2") - _, err = js.UpdateStream(&nats.StreamConfig{ - Name: "TEST", - Replicas: test.replicas, - Placement: &nats.Placement{Tags: []string{"cloud:aws"}}, + // The move could be completed when looking at the stream info, but the meta leader could still + // deny move updates for a short time while state is cleaned up. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: test.replicas, + Placement: &nats.Placement{Tags: []string{"cloud:aws"}}, + }) + return err }) - require_NoError(t, err) checkMove("C1") }) From 1012e7a98cfff0e12502e25e9301f7bf333603ae Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 11 Nov 2024 15:20:29 +0000 Subject: [PATCH 20/26] Update dependencies Signed-off-by: Neil Twigg --- go.mod | 6 +++--- go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 2887522b904..a9fa41d8be1 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 go.uber.org/automaxprocs v1.6.0 - golang.org/x/crypto v0.28.0 - golang.org/x/sys v0.26.0 - golang.org/x/time v0.7.0 + golang.org/x/crypto v0.29.0 + golang.org/x/sys v0.27.0 + golang.org/x/time v0.8.0 ) diff --git a/go.sum b/go.sum index 20db5711c1d..afb9d291c01 100644 --- a/go.sum +++ b/go.sum @@ -20,12 +20,12 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From f4444ac37917e35a166668a91bc7ac1f83cd33ad Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 11 Nov 2024 13:47:51 +0100 Subject: [PATCH 21/26] [FIXED] Backoff not respected with multiple inflight redeliveries Signed-off-by: Maurice van Veen --- server/consumer.go | 76 ++++++++++++++++++------------- server/jetstream_consumer_test.go | 70 ++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 31 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1d8b9fb6889..ac15804332c 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -345,6 +345,7 @@ type consumer struct { outq *jsOutQ pending map[uint64]*Pending ptmr *time.Timer + ptmrEnd time.Time rdq []uint64 rdqi avl.SequenceSet rdc map[uint64]uint64 @@ -1349,7 +1350,7 @@ func (o *consumer) setLeader(isLeader bool) { stopAndClearTimer(&o.dtmr) // Make sure to clear out any re-deliver queues - stopAndClearTimer(&o.ptmr) + o.stopAndClearPtmr() o.rdq = nil o.rdqi.Empty() o.pending = nil @@ -1739,7 +1740,7 @@ func (o *consumer) forceExpirePending() { p.Timestamp += off } } - o.ptmr.Reset(o.ackWait(0)) + o.resetPtmr(o.ackWait(0)) } o.signalNewMessages() } @@ -1882,7 +1883,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { // AckWait if cfg.AckWait != o.cfg.AckWait { if o.ptmr != nil { - o.ptmr.Reset(100 * time.Millisecond) + o.resetPtmr(100 * time.Millisecond) } } // Rate Limit @@ -2413,7 +2414,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) { if o.ptmr != nil { // Want checkPending to run and figure out the next timer ttl. // TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire. - o.ptmr.Reset(10 * time.Millisecond) + o.resetPtmr(10 * time.Millisecond) } } // Nothing else for use to do now so return. @@ -2547,11 +2548,7 @@ func (o *consumer) applyState(state *ConsumerState) { if o.cfg.AckWait < delay { delay = o.ackWait(0) } - if o.ptmr == nil { - o.ptmr = time.AfterFunc(delay, o.checkPending) - } else { - o.ptmr.Reset(delay) - } + o.resetPtmr(delay) } } @@ -4456,9 +4453,24 @@ func (o *consumer) trackPending(sseq, dseq uint64) { if o.pending == nil { o.pending = make(map[uint64]*Pending) } - if o.ptmr == nil { - o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending) + + // We could have a backoff that set a timer higher than what we need for this message. + // In that case, reset to lowest backoff required for a message redelivery. + minDelay := o.ackWait(0) + if l := len(o.cfg.BackOff); l > 0 { + bi := int(o.rdc[sseq]) + if bi < 0 { + bi = 0 + } else if bi >= l { + bi = l - 1 + } + minDelay = o.ackWait(o.cfg.BackOff[bi]) } + minDeadline := time.Now().Add(minDelay) + if o.ptmr == nil || o.ptmrEnd.After(minDeadline) { + o.resetPtmr(minDelay) + } + if p, ok := o.pending[sseq]; ok { // Update timestamp but keep original consumer delivery sequence. // So do not update p.Sequence. @@ -4581,24 +4593,21 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool { // Checks the pending messages. func (o *consumer) checkPending() { - o.mu.RLock() + o.mu.Lock() + defer o.mu.Unlock() + mset := o.mset // On stop, mset and timer will be nil. if o.closed || mset == nil || o.ptmr == nil { - stopAndClearTimer(&o.ptmr) - o.mu.RUnlock() + o.stopAndClearPtmr() return } - o.mu.RUnlock() var shouldUpdateState bool var state StreamState mset.store.FastState(&state) fseq := state.FirstSeq - o.mu.Lock() - defer o.mu.Unlock() - now := time.Now().UnixNano() ttl := int64(o.cfg.AckWait) next := int64(o.ackWait(0)) @@ -4614,11 +4623,7 @@ func (o *consumer) checkPending() { check := len(o.pending) > 1024 for seq, p := range o.pending { if check && atomic.LoadInt64(&o.awl) > 0 { - if o.ptmr == nil { - o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending) - } else { - o.ptmr.Reset(100 * time.Millisecond) - } + o.resetPtmr(100 * time.Millisecond) return } // Check if these are no longer valid. @@ -4685,15 +4690,10 @@ func (o *consumer) checkPending() { } if len(o.pending) > 0 { - delay := time.Duration(next) - if o.ptmr == nil { - o.ptmr = time.AfterFunc(delay, o.checkPending) - } else { - o.ptmr.Reset(o.ackWait(delay)) - } + o.resetPtmr(time.Duration(next)) } else { // Make sure to stop timer and clear out any re delivery queues - stopAndClearTimer(&o.ptmr) + o.stopAndClearPtmr() o.rdq = nil o.rdqi.Empty() o.pending = nil @@ -5179,7 +5179,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.client = nil sysc := o.sysc o.sysc = nil - stopAndClearTimer(&o.ptmr) + o.stopAndClearPtmr() stopAndClearTimer(&o.dtmr) stopAndClearTimer(&o.gwdtmr) delivery := o.cfg.DeliverSubject @@ -5601,3 +5601,17 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } return nil } + +func (o *consumer) resetPtmr(delay time.Duration) { + if o.ptmr == nil { + o.ptmr = time.AfterFunc(delay, o.checkPending) + } else { + o.ptmr.Reset(delay) + } + o.ptmrEnd = time.Now().Add(delay) +} + +func (o *consumer) stopAndClearPtmr() { + stopAndClearTimer(&o.ptmr) + o.ptmrEnd = time.Time{} +} diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 23cd38c1d47..7ecdc454999 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1399,3 +1399,73 @@ func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) { }) } } + +// https://github.com/nats-io/nats-server/issues/6085 +func TestJetStreamConsumerBackoffNotRespectedWithMultipleInflightRedeliveries(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"events.>"}, + }) + require_NoError(t, err) + + maxDeliver := 3 + backoff := []time.Duration{2 * time.Second, 4 * time.Second} + sub, err := js.SubscribeSync( + "events.>", + nats.MaxDeliver(maxDeliver), + nats.BackOff(backoff), + nats.AckExplicit(), + ) + require_NoError(t, err) + + calculateExpectedBackoff := func(numDelivered int) time.Duration { + expectedBackoff := 500 * time.Millisecond + for i := 0; i < numDelivered-1 && i < len(backoff); i++ { + expectedBackoff += backoff[i] + } + return expectedBackoff + } + + // We get one message to be redelivered using the final backoff duration. + firstMsgSent := time.Now() + sendStreamMsg(t, nc, "events.first", "msg-1") + _, err = sub.NextMsg(time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(1)) + _, err = sub.NextMsg(5 * time.Second) + require_NoError(t, err) + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(2)) + // This message will be redelivered with the final/highest backoff below. + + // If we now send a new message, the pending timer should be reset to the first backoff. + // Otherwise, if it remains at the final backoff duration we'll get this message redelivered too late. + sendStreamMsg(t, nc, "events.second", "msg-2") + + for { + msg, err := sub.NextMsg(5 * time.Second) + require_NoError(t, err) + if msg.Subject == "events.first" { + require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3)) + continue + } + + // We expect the second message to be redelivered using the specified backoff strategy. + // Before, the first redelivery of the second message would be sent after the highest backoff duration. + metadata, err := msg.Metadata() + require_NoError(t, err) + numDelivered := int(metadata.NumDelivered) + expectedBackoff := calculateExpectedBackoff(numDelivered) + require_LessThan(t, time.Since(metadata.Timestamp), expectedBackoff) + + // We've received all message, test passed. + if numDelivered >= maxDeliver { + break + } + } +} From 4a77bebb5170b30be51b11d651d93505c28162ff Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 11 Nov 2024 15:18:35 +0000 Subject: [PATCH 22/26] Add `node10` to stree Although we probably don't want too many different node sizes here, the `node10` case is particularly interesting because it perfectly fits the full 0-9 numeric range without wasting bytes. In fact it saves 96 bytes (208 bytes instead of 304) compared to using `node16` for the same purpose. This means memory savings for tracking subjects which are either mostly numerical throughout, or have tokens that are primarily numerical. For a subject space that is mostly numerical, this can be several GBs less at the half-billion subjects mark and the saving can grow above that. Signed-off-by: Neil Twigg --- server/stree/dump.go | 1 + server/stree/node10.go | 106 +++++++++++++++++++++++++++++++++++++ server/stree/node16.go | 4 +- server/stree/node4.go | 2 +- server/stree/stree_test.go | 34 ++++++++++-- 5 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 server/stree/node10.go diff --git a/server/stree/dump.go b/server/stree/dump.go index 60f03e4aad1..12c62f3beff 100644 --- a/server/stree/dump.go +++ b/server/stree/dump.go @@ -50,6 +50,7 @@ func (t *SubjectTree[T]) dump(w io.Writer, n node, depth int) { // For individual node/leaf dumps. func (n *leaf[T]) kind() string { return "LEAF" } func (n *node4) kind() string { return "NODE4" } +func (n *node10) kind() string { return "NODE10" } func (n *node16) kind() string { return "NODE16" } func (n *node48) kind() string { return "NODE48" } func (n *node256) kind() string { return "NODE256" } diff --git a/server/stree/node10.go b/server/stree/node10.go new file mode 100644 index 00000000000..37cd2cc946a --- /dev/null +++ b/server/stree/node10.go @@ -0,0 +1,106 @@ +// Copyright 2023-2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stree + +// Node with 10 children +// This node size is for the particular case that a part of the subject is numeric +// in nature, i.e. it only needs to satisfy the range 0-9 without wasting bytes +// Order of struct fields for best memory alignment (as per govet/fieldalignment) +type node10 struct { + child [10]node + meta + key [10]byte +} + +func newNode10(prefix []byte) *node10 { + nn := &node10{} + nn.setPrefix(prefix) + return nn +} + +// Currently we do not keep node10 sorted or use bitfields for traversal so just add to the end. +// TODO(dlc) - We should revisit here with more detailed benchmarks. +func (n *node10) addChild(c byte, nn node) { + if n.size >= 10 { + panic("node10 full!") + } + n.key[n.size] = c + n.child[n.size] = nn + n.size++ +} + +func (n *node10) findChild(c byte) *node { + for i := uint16(0); i < n.size; i++ { + if n.key[i] == c { + return &n.child[i] + } + } + return nil +} + +func (n *node10) isFull() bool { return n.size >= 10 } + +func (n *node10) grow() node { + nn := newNode16(n.prefix) + for i := 0; i < 10; i++ { + nn.addChild(n.key[i], n.child[i]) + } + return nn +} + +// Deletes a child from the node. +func (n *node10) deleteChild(c byte) { + for i, last := uint16(0), n.size-1; i < n.size; i++ { + if n.key[i] == c { + // Unsorted so just swap in last one here, else nil if last. + if i < last { + n.key[i] = n.key[last] + n.child[i] = n.child[last] + n.key[last] = 0 + n.child[last] = nil + } else { + n.key[i] = 0 + n.child[i] = nil + } + n.size-- + return + } + } +} + +// Shrink if needed and return new node, otherwise return nil. +func (n *node10) shrink() node { + if n.size > 4 { + return nil + } + nn := newNode4(nil) + for i := uint16(0); i < n.size; i++ { + nn.addChild(n.key[i], n.child[i]) + } + return nn +} + +// Iterate over all children calling func f. +func (n *node10) iter(f func(node) bool) { + for i := uint16(0); i < n.size; i++ { + if !f(n.child[i]) { + return + } + } +} + +// Return our children as a slice. +func (n *node10) children() []node { + return n.child[:n.size] +} diff --git a/server/stree/node16.go b/server/stree/node16.go index c0c12aafd57..e2dc97908df 100644 --- a/server/stree/node16.go +++ b/server/stree/node16.go @@ -79,10 +79,10 @@ func (n *node16) deleteChild(c byte) { // Shrink if needed and return new node, otherwise return nil. func (n *node16) shrink() node { - if n.size > 4 { + if n.size > 10 { return nil } - nn := newNode4(nil) + nn := newNode10(nil) for i := uint16(0); i < n.size; i++ { nn.addChild(n.key[i], n.child[i]) } diff --git a/server/stree/node4.go b/server/stree/node4.go index 6aeb024abff..4eddf11b83a 100644 --- a/server/stree/node4.go +++ b/server/stree/node4.go @@ -49,7 +49,7 @@ func (n *node4) findChild(c byte) *node { func (n *node4) isFull() bool { return n.size >= 4 } func (n *node4) grow() node { - nn := newNode16(n.prefix) + nn := newNode10(n.prefix) for i := 0; i < 4; i++ { nn.addChild(n.key[i], n.child[i]) } diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index cf6d08512b9..8bf24181d2a 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -79,9 +79,22 @@ func TestSubjectTreeNodeGrow(t *testing.T) { old, updated := st.Insert(b("foo.bar.E"), 22) require_True(t, old == nil) require_False(t, updated) + _, ok = st.root.(*node10) + require_True(t, ok) + for i := 5; i < 10; i++ { + subj := b(fmt.Sprintf("foo.bar.%c", 'A'+i)) + old, updated := st.Insert(subj, 22) + require_True(t, old == nil) + require_False(t, updated) + } + // This one will trigger us to grow. + old, updated = st.Insert(b("foo.bar.K"), 22) + require_True(t, old == nil) + require_False(t, updated) + // We have filled a node10. _, ok = st.root.(*node16) require_True(t, ok) - for i := 5; i < 16; i++ { + for i := 11; i < 16; i++ { subj := b(fmt.Sprintf("foo.bar.%c", 'A'+i)) old, updated := st.Insert(subj, 22) require_True(t, old == nil) @@ -164,18 +177,33 @@ func TestSubjectTreeNodeDelete(t *testing.T) { require_True(t, found) require_Equal(t, *v, 11) require_Equal(t, st.root, nil) - // Now pop up to a node16 and make sure we can shrink back down. + // Now pop up to a node10 and make sure we can shrink back down. for i := 0; i < 5; i++ { subj := fmt.Sprintf("foo.bar.%c", 'A'+i) st.Insert(b(subj), 22) } - _, ok := st.root.(*node16) + _, ok := st.root.(*node10) require_True(t, ok) v, found = st.Delete(b("foo.bar.A")) require_True(t, found) require_Equal(t, *v, 22) _, ok = st.root.(*node4) require_True(t, ok) + // Now pop up to node16 + for i := 0; i < 11; i++ { + subj := fmt.Sprintf("foo.bar.%c", 'A'+i) + st.Insert(b(subj), 22) + } + _, ok = st.root.(*node16) + require_True(t, ok) + v, found = st.Delete(b("foo.bar.A")) + require_True(t, found) + require_Equal(t, *v, 22) + _, ok = st.root.(*node10) + require_True(t, ok) + v, found = st.Find(b("foo.bar.B")) + require_True(t, found) + require_Equal(t, *v, 22) // Now pop up to node48 st = NewSubjectTree[int]() for i := 0; i < 17; i++ { From 0eca790a0f41b6de3c37aec847c72853b9d5cb31 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Nov 2024 11:57:33 -0800 Subject: [PATCH 23/26] Detect if we receive an ack past our last stream sequence. We also no longer register pre-acks when we detect this from a consumer snapshot since we properly handle this now and this could lead to excessive memory usage. Signed-off-by: Derek Collison --- server/consumer.go | 24 +++++++++---- server/jetstream_cluster.go | 16 +++------ server/jetstream_cluster_1_test.go | 57 ++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ac15804332c..25e98abfd40 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2783,18 +2783,30 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b return false } - // Check if this ack is above the current pointer to our next to deliver. - // This could happen on a cooperative takeover with high speed deliveries. - if sseq >= o.sseq { - o.sseq = sseq + 1 - } - mset := o.mset if mset == nil || mset.closed.Load() { o.mu.Unlock() return false } + // Check if this ack is above the current pointer to our next to deliver. + // This could happen on a cooperative takeover with high speed deliveries. + if sseq >= o.sseq { + // Let's make sure this is valid. + // This is only received on the consumer leader, so should never be higher + // than the last stream sequence. + var ss StreamState + mset.store.FastState(&ss) + if sseq > ss.LastSeq { + o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d", + o.acc.Name, o.stream, o.name, sseq, ss.LastSeq) + // FIXME(dlc) - For 2.11 onwards should we return an error here to the caller? + o.mu.Unlock() + return false + } + o.sseq = sseq + 1 + } + // Let the owning stream know if we are interest or workqueue retention based. // If this consumer is clustered (o.node != nil) this will be handled by // processReplicatedAck after the ack has propagated. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index e8357f91d78..829533e62b6 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -5058,6 +5058,7 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } panic(err.Error()) } + if err = o.store.Update(state); err != nil { o.mu.RLock() s, acc, mset, name := o.srv, o.acc, o.mset, o.name @@ -5070,17 +5071,10 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea if mset := o.getStream(); mset != nil { var ss StreamState mset.store.FastState(&ss) - if err := o.checkStateForInterestStream(&ss); err == errAckFloorHigherThanLastSeq { - // Register pre-acks unless no state at all for the stream and we would create alot of pre-acks. - mset.mu.Lock() - // Only register if we have a valid FirstSeq. - if ss.FirstSeq > 0 { - for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ { - mset.registerPreAck(o, seq) - } - } - mset.mu.Unlock() - } + // We used to register preacks here if our ack floor was higher than the last sequence. + // Now when streams catch up they properly call checkInterestState() and periodically run this as well. + // If our states drift this could have allocated lots of pre-acks. + o.checkStateForInterestStream(&ss) } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 46d691c2ad9..2c8c1ca5566 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6662,6 +6662,63 @@ func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { } } +// Make sure if we received acks that are out of bounds, meaning past our +// last sequence or before our first that they are ignored and errored if applicable. +func TestJetStreamConsumerAckOutOfBounds(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err = js.Publish("foo.bar", []byte("OK")) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe("foo.*", "C") + require_NoError(t, err) + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + msgs[0].AckSync() + + // Now ack way past the last sequence. + _, err = nc.Request("$JS.ACK.TEST.C.1.10000000000.0.0.0", nil, 250*time.Millisecond) + require_Error(t, err, nats.ErrTimeout) + + // Make sure that now changes happened to our state. + ci, err := js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) + + s := c.consumerLeader("$G", "TEST", "C") + s.Shutdown() + s.WaitForShutdown() + c.restartServer(s) + c.waitOnConsumerLeader(globalAccountName, "TEST", "C") + + // Confirm new leader has same state for delivered and ack floor. + ci, err = js.ConsumerInfo("TEST", "C") + require_NoError(t, err) + require_Equal(t, ci.Delivered.Consumer, 1) + require_Equal(t, ci.Delivered.Stream, 1) + require_Equal(t, ci.AckFloor.Consumer, 1) + require_Equal(t, ci.AckFloor.Stream, 1) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. From 59faa57d3adbe383d4c0497fef1aae935d5ee6d1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Nov 2024 11:51:24 -0800 Subject: [PATCH 24/26] Use stream's cfg lock to avoid contention on stream list, etc Signed-off-by: Derek Collison --- server/stream.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/stream.go b/server/stream.go index 7aaf4e6a6fc..eb800ecb900 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1580,8 +1580,8 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi // Config returns the stream's configuration. func (mset *stream) config() StreamConfig { - mset.mu.RLock() - defer mset.mu.RUnlock() + mset.cfgMu.RLock() + defer mset.cfgMu.RUnlock() return mset.cfg } @@ -3536,7 +3536,6 @@ func (mset *stream) resetSourceInfo() { } } -// Lock should be held. // This will do a reverse scan on startup or leader election // searching for the starting sequence number. // This can be slow in degenerative cases. From 3e8b016a77bc2c4ffff0d4b5370cde92aa70d1d4 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 12 Nov 2024 11:10:32 +0100 Subject: [PATCH 25/26] [FIXED] DeleteRange deletes one message too much Signed-off-by: Maurice van Veen --- server/store.go | 8 ++++++-- server/store_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/server/store.go b/server/store.go index 2bd1fb945a2..72e039816e9 100644 --- a/server/store.go +++ b/server/store.go @@ -292,12 +292,16 @@ type DeleteRange struct { } func (dr *DeleteRange) State() (first, last, num uint64) { - return dr.First, dr.First + dr.Num, dr.Num + deletesAfterFirst := dr.Num + if deletesAfterFirst > 0 { + deletesAfterFirst-- + } + return dr.First, dr.First + deletesAfterFirst, dr.Num } // Range will range over all the deleted sequences represented by this block. func (dr *DeleteRange) Range(f func(uint64) bool) { - for seq := dr.First; seq <= dr.First+dr.Num; seq++ { + for seq := dr.First; seq < dr.First+dr.Num; seq++ { if !f(seq) { return } diff --git a/server/store_test.go b/server/store_test.go index fdf4690a993..f7832974b5b 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -109,3 +109,35 @@ func TestStoreMsgLoadNextMsgMulti(t *testing.T) { }, ) } + +func TestStoreDeleteSlice(t *testing.T) { + ds := DeleteSlice{2} + var deletes []uint64 + ds.Range(func(seq uint64) bool { + deletes = append(deletes, seq) + return true + }) + require_Len(t, len(deletes), 1) + require_Equal(t, deletes[0], 2) + + first, last, num := ds.State() + require_Equal(t, first, 2) + require_Equal(t, last, 2) + require_Equal(t, num, 1) +} + +func TestStoreDeleteRange(t *testing.T) { + dr := DeleteRange{First: 2, Num: 1} + var deletes []uint64 + dr.Range(func(seq uint64) bool { + deletes = append(deletes, seq) + return true + }) + require_Len(t, len(deletes), 1) + require_Equal(t, deletes[0], 2) + + first, last, num := dr.State() + require_Equal(t, first, 2) + require_Equal(t, last, 2) + require_Equal(t, num, 1) +} From d942fc6771459e38f2f3ea340c51150efc78eda8 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 12 Nov 2024 12:35:45 +0000 Subject: [PATCH 26/26] Use interest-based intersection for `NumPendingMulti` This optimises #6089 by ensuring that we don't over-walk either the sublist or the subject tree, instead only matching the subject tree on subjects for which the sublist expresses interest. Signed-off-by: Neil Twigg --- server/filestore.go | 14 ++--- server/memstore.go | 6 +- server/sublist.go | 43 ++++++++++++++ server/sublist_test.go | 123 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+), 14 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 789a7e38dca..62f94dc0134 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3219,11 +3219,11 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo var t uint64 var havePartial bool - mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { + IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) { subj := bytesToString(bsubj) - if havePartial || !sl.HasInterest(subj) { + if havePartial { // If we already found a partial then don't do anything else. - return !havePartial + return } if ss.firstNeedsUpdate { mb.recalculateFirstForSubj(subj, ss.First, ss) @@ -3234,7 +3234,6 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo // We matched but its a partial. havePartial = true } - return !havePartial }) // See if we need to scan msgs here. @@ -3319,11 +3318,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo } // Mark fss activity. mb.lsts = time.Now().UnixNano() - mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool { - if sl.HasInterest(bytesToString(bsubj)) { - adjust += ss.Msgs - } - return true + IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) { + adjust += ss.Msgs }) } } else { diff --git a/server/memstore.go b/server/memstore.go index 64966f4455d..5baf42870be 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -670,10 +670,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo var havePartial bool var totalSkipped uint64 // We will track start and end sequences as we go. - ms.fss.Iter(func(subj []byte, fss *SimpleState) bool { - if !sl.HasInterest(bytesToString(subj)) { - return true - } + IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) { if fss.firstNeedsUpdate { ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss) } @@ -687,7 +684,6 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo } else { totalSkipped += fss.Msgs } - return true }) // If we did not encounter any partials we can return here. diff --git a/server/sublist.go b/server/sublist.go index 5c1325cc681..b7650ede6f2 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -20,6 +20,8 @@ import ( "sync" "sync/atomic" "unicode/utf8" + + "github.com/nats-io/nats-server/v2/server/stree" ) // Sublist is a routing mechanism to handle subject distribution and @@ -1731,3 +1733,44 @@ func getAllNodes(l *level, results *SublistResult) { getAllNodes(n.next, results) } } + +// IntersectStree will match all items in the given subject tree that +// have interest expressed in the given sublist. The callback will only be called +// once for each subject, regardless of overlapping subscriptions in the sublist. +func IntersectStree[T any](st *stree.SubjectTree[T], sl *Sublist, cb func(subj []byte, entry *T)) { + var _subj [255]byte + intersectStree(st, sl.root, _subj[:0], cb) +} + +func intersectStree[T any](st *stree.SubjectTree[T], r *level, subj []byte, cb func(subj []byte, entry *T)) { + if r.numNodes() == 0 { + st.Match(subj, cb) + return + } + nsubj := subj + if len(nsubj) > 0 { + nsubj = append(subj, '.') + } + switch { + case r.fwc != nil: + // We've reached a full wildcard, do a FWC match on the stree at this point + // and don't keep iterating downward. + nsubj := append(nsubj, '>') + st.Match(nsubj, cb) + case r.pwc != nil: + // We've found a partial wildcard. We'll keep iterating downwards, but first + // check whether there's interest at this level (without triggering dupes) and + // match if so. + nsubj := append(nsubj, '*') + if len(r.pwc.psubs)+len(r.pwc.qsubs) > 0 && r.pwc.next != nil && r.pwc.next.numNodes() > 0 { + st.Match(nsubj, cb) + } + intersectStree(st, r.pwc.next, nsubj, cb) + case r.numNodes() > 0: + // Normal node with subject literals, keep iterating. + for t, n := range r.nodes { + nsubj := append(nsubj, t...) + intersectStree(st, n.next, nsubj, cb) + } + } +} diff --git a/server/sublist_test.go b/server/sublist_test.go index e0642903de3..9b782960840 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/nats-io/nats-server/v2/server/stree" "github.com/nats-io/nuid" ) @@ -1982,6 +1983,128 @@ func TestSublistNumInterest(t *testing.T) { sl.Remove(qsub) } +func TestSublistInterestBasedIntersection(t *testing.T) { + st := stree.NewSubjectTree[struct{}]() + st.Insert([]byte("one.two.three.four"), struct{}{}) + st.Insert([]byte("one.two.three.five"), struct{}{}) + st.Insert([]byte("one.two.six"), struct{}{}) + st.Insert([]byte("one.two.seven"), struct{}{}) + st.Insert([]byte("eight.nine"), struct{}{}) + + require_NoDuplicates := func(t *testing.T, got map[string]int) { + for _, c := range got { + require_Equal(t, c, 1) + } + } + + t.Run("Literals", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one.two.six")) + sl.Insert(newSub("eight.nine")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWC", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one.two.*.*")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCOverlapping", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one.two.*.four")) + sl.Insert(newSub("one.two.*.*")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 2) + require_NoDuplicates(t, got) + }) + + t.Run("PWCAll", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("*.*")) + sl.Insert(newSub("*.*.*")) + sl.Insert(newSub("*.*.*.*")) + require_True(t, sl.HasInterest("foo.bar")) + require_True(t, sl.HasInterest("foo.bar.baz")) + require_True(t, sl.HasInterest("foo.bar.baz.qux")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 5) + require_NoDuplicates(t, got) + }) + + t.Run("FWC", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one.>")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 4) + require_NoDuplicates(t, got) + }) + + t.Run("FWCOverlapping", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one.two.three.four")) + sl.Insert(newSub("one.>")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 4) + require_NoDuplicates(t, got) + }) + + t.Run("FWCAll", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub(">")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 5) + require_NoDuplicates(t, got) + }) + + t.Run("NoMatch", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 0) + }) + + t.Run("NoMatches", func(t *testing.T) { + got := map[string]int{} + sl := NewSublistNoCache() + sl.Insert(newSub("one")) + sl.Insert(newSub("eight")) + sl.Insert(newSub("ten")) + IntersectStree(st, sl, func(subj []byte, entry *struct{}) { + got[string(subj)]++ + }) + require_Len(t, len(got), 0) + }) +} + func subsInit(pre string, toks []string) { var sub string for _, t := range toks {