From 6f05a82bfcbafd4a90ef013289496b0c3a2f3306 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 10 Jun 2024 09:50:42 -0700 Subject: [PATCH] [IMPROVED] Memory based streams and NRG behavior during server restarts (#5506) Improvements to catchups and health checks. Improvements to handling snapshots for memory based wals. With memory based wals we can not use snapshots on restarts, but we do use them while they are running. However if a server becomes a leader with no snapshot it will be forced to stepdown when asked to catchup a follower. So we now inherit a leaders snapshot. Also when we tried to truncate on a mismatch, we needed to truncate the previous index, not current. When we fail due to the previous entry being compacted away, we would reset. We now reset the wal to the prior index and use the truncate term and index. Lastly if we receive a heartbeat with correct index but newer term just inherit. For stream health checks for replicated streams make sure that the monitor routine is running. When waiting on consumer assignments at the beginning of the stream monitor, make sure the consumer monitor is running as well if replicated. On a consumer snapshot, register pre-acks as needed. On stream checkInterestState reset an empty stream to the low ack floor from all consumers. Last fix consistency bug with memstore when skipping msgs on empty stream to ensure first == last + 1. Signed-off-by: Derek Collison --------- Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 54 ++++++++--- server/jetstream_cluster_4_test.go | 2 +- server/memstore.go | 2 +- server/memstore_test.go | 25 +++++ server/monitor.go | 17 ++++ server/norace_test.go | 149 +++++++++++++++++++++++++++++ server/raft.go | 58 +++++++---- server/stream.go | 9 +- 8 files changed, 284 insertions(+), 32 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 88b8160c2ef..406c249b41c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -534,12 +534,18 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { return false } - // If we are catching up return false. - if mset.isCatchingUp() { + // If R1 we are good. + if node == nil { + return true + } + + // Here we are a replicated stream. + // First make sure our monitor routine is running. + if !mset.isMonitorRunning() { return false } - if node == nil || node.Healthy() { + if node.Healthy() { // Check if we are processing a snapshot and are catching up. if !mset.isCatchingUp() { return true @@ -553,7 +559,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool { js.restartStream(acc, sa) } } - return false } @@ -863,6 +868,8 @@ func (js *jetStream) setupMetaGroup() error { atomic.StoreInt32(&js.clustered, 1) c.registerWithAccount(sacc) + // Set to true before we start. + js.metaRecovering = true js.srv.startGoRoutine( js.monitorCluster, pprofLabels{ @@ -2164,7 +2171,7 @@ func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPee // Should only be called from monitorStream. func (mset *stream) waitOnConsumerAssignments() { mset.mu.RLock() - s, js, acc, sa, name := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name + s, js, acc, sa, name, replicas := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name, mset.cfg.Replicas mset.mu.RUnlock() if s == nil || js == nil || acc == nil || sa == nil { @@ -2186,6 +2193,9 @@ func (mset *stream) waitOnConsumerAssignments() { for _, o := range mset.getConsumers() { // Make sure we are registered with our consumer assignment. if ca := o.consumerAssignment(); ca != nil { + if replicas > 1 && !o.isMonitorRunning() { + break + } numReady++ } else { break @@ -2373,7 +2383,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // since we process streams first then consumers as an asset class. mset.waitOnConsumerAssignments() // Setup a periodic check here. - cist = time.NewTicker(30 * time.Second) + // We will fire in 5s the first time then back off to 30s + cist = time.NewTicker(5 * time.Second) cistc = cist.C } @@ -2496,6 +2507,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } case <-cistc: + cist.Reset(30 * time.Second) // We may be adjusting some things with consumers so do this in its own go routine. go mset.checkInterestState() @@ -4924,7 +4936,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } } // Check our interest state if applicable. - o.checkStateForInterestStream() + if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq { + o.mu.RLock() + mset := o.mset + o.mu.RUnlock() + // Register pre-acks unless no state at all for the stream and we would create alot of pre-acks. + mset.mu.Lock() + var ss StreamState + mset.store.FastState(&ss) + // Only register if we have a valid FirstSeq. + if ss.FirstSeq > 0 { + for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ { + mset.registerPreAck(o, seq) + } + } + mset.mu.Unlock() + } } } else if e.Type == EntryRemovePeer { @@ -8165,8 +8192,11 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { var sub *subscription var err error - const activityInterval = 30 * time.Second - notActive := time.NewTimer(activityInterval) + const ( + startInterval = 5 * time.Second + activityInterval = 30 * time.Second + ) + notActive := time.NewTimer(startInterval) defer notActive.Stop() defer func() { @@ -8249,7 +8279,7 @@ RETRY: default: } } - notActive.Reset(activityInterval) + notActive.Reset(startInterval) // Grab sync request again on failures. if sreq == nil { @@ -8294,8 +8324,10 @@ RETRY: // Send our sync request. b, _ := json.Marshal(sreq) s.sendInternalMsgLocked(subject, reply, nil, b) + // Remember when we sent this out to avoid loop spins on errors below. reqSendTime := time.Now() + // Clear our sync request. sreq = nil @@ -8844,7 +8876,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait if !done { // Wait for a small bit. - time.Sleep(50 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } else { // GC friendly. mw.Stop() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 9f93d863477..b5c9495d900 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2273,7 +2273,7 @@ func TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe(t *testing.T) { checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { mset.store.FastState(&state) if state.LastSeq != 222 { - return fmt.Errorf("%v LAST SEQ WRONG %d for %q - STATE %+v", s, state.LastSeq, stream, state) + return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state) } return nil }) diff --git a/server/memstore.go b/server/memstore.go index 4d9b184bc73..a7405789ddd 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -261,7 +261,7 @@ func (ms *memStore) SkipMsg() uint64 { ms.state.LastSeq = seq ms.state.LastTime = now if ms.state.Msgs == 0 { - ms.state.FirstSeq = seq + ms.state.FirstSeq = seq + 1 ms.state.FirstTime = now } else { ms.dmap.Insert(seq) diff --git a/server/memstore_test.go b/server/memstore_test.go index e74976c4cf2..867249a5549 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1052,6 +1052,31 @@ func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) { require_Equal(t, state.Msgs, 1) } +// When all messages are deleted we should have a state of first = last + 1. +func TestMemStoreDeleteAllFirstSequenceCheck(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"foo"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + msg := []byte("abc") + for i := 1; i <= 10; i++ { + ms.StoreMsg("foo", nil, msg) + } + for seq := uint64(1); seq <= 10; seq++ { + ms.RemoveMsg(seq) + } + var state StreamState + ms.FastState(&state) + require_Equal(t, state.FirstSeq, 11) + require_Equal(t, state.LastSeq, 10) + require_Equal(t, state.Msgs, 0) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/monitor.go b/server/monitor.go index e4023cba4bc..0a2815b943b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3490,6 +3490,23 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { return health } + // Are we still recovering meta layer? + if js.isMetaRecovering() { + if !details { + health.Status = na + health.Error = "JetStream is still recovering meta layer" + + } else { + health.Errors = []HealthzError{ + { + Type: HealthzErrorJetStream, + Error: "JetStream is still recovering meta layer", + }, + } + } + return health + } + // Range across all accounts, the streams assigned to them, and the consumers. // If they are assigned to this server check their status. ourID := meta.ID() diff --git a/server/norace_test.go b/server/norace_test.go index 87d864314c2..35fbd8722cd 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10569,3 +10569,152 @@ func TestNoRaceLargeNumDeletesStreamCatchups(t *testing.T) { return nil }) } + +func TestNoRaceJetStreamClusterMemoryStreamLastSequenceResetAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + numStreams := 250 + var wg sync.WaitGroup + wg.Add(numStreams) + + for i := 1; i <= numStreams; i++ { + go func(n int) { + defer wg.Done() + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST:%d", n), + Storage: nats.MemoryStorage, + Subjects: []string{fmt.Sprintf("foo.%d.*", n)}, + Replicas: 3, + }, nats.MaxWait(30*time.Second)) + require_NoError(t, err) + subj := fmt.Sprintf("foo.%d.bar", n) + for i := 0; i < 222; i++ { + js.Publish(subj, nil) + } + }(i) + } + wg.Wait() + + // Make sure all streams have a snapshot in place to stress the snapshot logic for memory based streams. + for _, s := range c.servers { + for i := 1; i <= numStreams; i++ { + stream := fmt.Sprintf("TEST:%d", i) + mset, err := s.GlobalAccount().lookupStream(stream) + require_NoError(t, err) + node := mset.raftNode() + require_NotNil(t, node) + node.InstallSnapshot(mset.stateSnapshot()) + } + } + + // Do 5 rolling restarts waiting on healthz in between. + for i := 0; i < 5; i++ { + // Walk the servers and shut each down, and wipe the storage directory. + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + checkFor(t, 30*time.Second, time.Second, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + // Make sure all streams are current after healthz returns ok. + for i := 1; i <= numStreams; i++ { + stream := fmt.Sprintf("TEST:%d", i) + mset, err := s.GlobalAccount().lookupStream(stream) + require_NoError(t, err) + var state StreamState + checkFor(t, 30*time.Second, time.Second, func() error { + mset.store.FastState(&state) + if state.LastSeq != 222 { + return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state) + } + return nil + }) + } + } + } +} + +func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + numStreams := 50 + var wg sync.WaitGroup + wg.Add(numStreams) + + for i := 1; i <= numStreams; i++ { + go func(n int) { + defer wg.Done() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("TEST:%d", n), + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Subjects: []string{fmt.Sprintf("foo.%d.*", n)}, + Replicas: 3, + }, nats.MaxWait(30*time.Second)) + require_NoError(t, err) + subj := fmt.Sprintf("foo.%d.bar", n) + for i := 0; i < 22; i++ { + js.Publish(subj, nil) + } + // Now consumer them all as well. + sub, err := js.PullSubscribe(subj, "wq") + require_NoError(t, err) + msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), 22) + for _, m := range msgs { + err := m.AckSync() + require_NoError(t, err) + } + }(i) + } + wg.Wait() + + // Do 2 rolling restarts waiting on healthz in between. + for i := 0; i < 2; i++ { + // Walk the servers and shut each down, and wipe the storage directory. + for _, s := range c.servers { + s.Shutdown() + s.WaitForShutdown() + s = c.restartServer(s) + checkFor(t, 30*time.Second, time.Second, func() error { + hs := s.healthz(nil) + if hs.Error != _EMPTY_ { + return errors.New(hs.Error) + } + return nil + }) + // Make sure all streams are current after healthz returns ok. + for i := 1; i <= numStreams; i++ { + stream := fmt.Sprintf("TEST:%d", i) + mset, err := s.GlobalAccount().lookupStream(stream) + require_NoError(t, err) + var state StreamState + checkFor(t, 20*time.Second, time.Second, func() error { + mset.store.FastState(&state) + if state.LastSeq != 22 { + return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state) + } + if state.FirstSeq != 23 { + return fmt.Errorf("%v Wrong first sequence %d for %q - State %+v", s, state.FirstSeq, stream, state) + } + return nil + }) + } + } + } +} diff --git a/server/raft.go b/server/raft.go index 8ac89166578..63aea67cdb5 100644 --- a/server/raft.go +++ b/server/raft.go @@ -85,6 +85,7 @@ type WAL interface { RemoveMsg(index uint64) (bool, error) Compact(index uint64) (uint64, error) Purge() (uint64, error) + PurgeEx(subject string, seq, keep uint64) (uint64, error) Truncate(seq uint64) error State() StreamState FastState(*StreamState) @@ -414,7 +415,8 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return nil, fmt.Errorf("could not create snapshots directory - %v", err) } - // Can't recover snapshots if memory based. + // Can't recover snapshots if memory based since wal will be reset. + // We will inherit from the current leader. if _, ok := n.wal.(*memStore); ok { os.Remove(filepath.Join(n.sd, snapshotsDir, "*")) } else { @@ -1012,17 +1014,16 @@ func (n *raft) InstallSnapshot(data []byte) error { } n.Lock() + defer n.Unlock() // If a write error has occurred already then stop here. if werr := n.werr; werr != nil { - n.Unlock() return werr } // Check that a catchup isn't already taking place. If it is then we won't // allow installing snapshots until it is done. if len(n.progress) > 0 { - n.Unlock() return errCatchupsRunning } @@ -1030,7 +1031,6 @@ func (n *raft) InstallSnapshot(data []byte) error { n.wal.FastState(&state) if n.applied == 0 { - n.Unlock() return errNoSnapAvailable } @@ -1055,6 +1055,12 @@ func (n *raft) InstallSnapshot(data []byte) error { data: data, } + return n.installSnapshot(snap) +} + +// Install the snapshot. +// Lock should be held. +func (n *raft) installSnapshot(snap *snapshot) error { snapDir := filepath.Join(n.sd, snapshotsDir) sn := fmt.Sprintf(snapFileT, snap.lastTerm, snap.lastIndex) sfile := filepath.Join(snapDir, sn) @@ -1064,7 +1070,6 @@ func (n *raft) InstallSnapshot(data []byte) error { dios <- struct{}{} if err != nil { - n.Unlock() // We could set write err here, but if this is a temporary situation, too many open files etc. // we want to retry and snapshots are not fatal. return err @@ -1074,19 +1079,19 @@ func (n *raft) InstallSnapshot(data []byte) error { n.snapfile = sfile if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) - n.Unlock() return err } - n.Unlock() - - psnaps, _ := os.ReadDir(snapDir) // Remove any old snapshots. - for _, fi := range psnaps { - pn := fi.Name() - if pn != sn { - os.Remove(filepath.Join(snapDir, pn)) + // Do this in a go routine. + go func() { + psnaps, _ := os.ReadDir(snapDir) + for _, fi := range psnaps { + pn := fi.Name() + if pn != sn { + os.Remove(filepath.Join(snapDir, pn)) + } } - } + }() return nil } @@ -3082,17 +3087,20 @@ func (n *raft) truncateWAL(term, index uint64) { if err := n.wal.Truncate(index); err != nil { // If we get an invalid sequence, reset our wal all together. + // We will not have holes, so this means we do not have this message stored anymore. if err == ErrInvalidSequence { n.debug("Resetting WAL") n.wal.Truncate(0) - index, n.term, n.pterm, n.pindex = 0, 0, 0, 0 + // If our index is non-zero use PurgeEx to set us to the correct next index. + if index > 0 { + n.wal.PurgeEx(fwcs, index+1, 0) + } } else { n.warn("Error truncating WAL: %v", err) n.setWriteErrLocked(err) + return } - return } - // Set after we know we have truncated properly. n.term, n.pterm, n.pindex = term, term, index } @@ -3266,7 +3274,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If terms mismatched, or we got an error loading, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. - n.truncateWAL(ae.pterm, ae.pindex) + n.truncateWAL(eae.pterm, eae.pindex) } // Cancel regardless. n.cancelCatchup() @@ -3313,6 +3321,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } + // Inherit state from appendEntry with the leader's snapshot. n.pindex = ae.pindex n.pterm = ae.pterm n.commit = ae.pindex @@ -3323,6 +3332,19 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } + snap := &snapshot{ + lastTerm: n.pterm, + lastIndex: n.pindex, + peerstate: encodePeerState(&peerState{n.peerNames(), n.csz, n.extSt}), + data: ae.entries[0].Data, + } + // Install the leader's snapshot as our own. + if err := n.installSnapshot(snap); err != nil { + n.setWriteErrLocked(err) + n.Unlock() + return + } + // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) n.Unlock() diff --git a/server/stream.go b/server/stream.go index 34d75ac9a15..3e7af8f08cf 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5574,7 +5574,7 @@ func (mset *stream) checkInterestState() { defer mset.mu.Unlock() // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq { + if lowAckFloor <= state.LastSeq || state.Msgs == 0 { // Purge the stream to lowest ack floor + 1 mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) } else { @@ -6235,3 +6235,10 @@ func (mset *stream) clearMonitorRunning() { defer mset.mu.Unlock() mset.inMonitor = false } + +// Check if our monitor is running. +func (mset *stream) isMonitorRunning() bool { + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.inMonitor +}