From de8920731f2b518559ed867514f677e30475d0bf Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 8 Nov 2023 19:27:37 -0800 Subject: [PATCH 1/2] General stability and consistency improvements for clustered streams with failure offsets and server restarts. When a message is rejected due to constraints, like CAS operations on KeyValue, we track offsets between the NRG (Raft) layer and the stream. During heavy usage and many msgs being rejected due to constraint violations and constantly restarting servers we saw some inconsistencies. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 56 +++++++++++-------------- server/norace_test.go | 20 +++++++++ server/raft.go | 7 +++- server/stream.go | 84 +++++++++++++++++++++---------------- 4 files changed, 99 insertions(+), 68 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2db50d08e6d..d55b3e0bcdc 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2143,7 +2143,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // from underneath the one that is running since it will be the same raft node. defer n.Stop() - qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() + qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) @@ -2249,7 +2249,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps startDirectAccessMonitoring := func() { if dat == nil { - dat = time.NewTicker(1 * time.Second) + dat = time.NewTicker(2 * time.Second) datc = dat.C } } @@ -2301,6 +2301,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps select { case <-s.quitCh: return + case <-mqch: + return case <-qch: return case <-aq.ch: @@ -2322,6 +2324,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ne, nb = n.Applied(ce.Index) ce.ReturnToPool() } else { + // Our stream was closed out from underneath of us, simply return here. + if err == errStreamClosed { + return + } s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { if mset.isMirror() && mset.IsLeader() { @@ -2352,7 +2358,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if mset != nil && n != nil && sendSnapshot { n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false - } if isRestore { acc, _ := s.LookupAccount(sa.Client.serviceAccount()) @@ -2385,17 +2390,22 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Here we are checking if we are not the leader but we have been asked to allow // direct access. We now allow non-leaders to participate in the queue group. if !isLeader && mset != nil { - startDirectAccessMonitoring() + mset.mu.RLock() + ad, md := mset.cfg.AllowDirect, mset.cfg.MirrorDirect + mset.mu.RUnlock() + if ad || md { + startDirectAccessMonitoring() + } } case <-datc: if mset == nil || isRecovering { - return + continue } // If we are leader we can stop, we know this is setup now. if isLeader { stopDirectMonitoring() - return + continue } mset.mu.Lock() @@ -2547,6 +2557,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps mset.setStreamAssignment(sa) // Make sure to update our updateC which would have been nil. uch = mset.updateC() + // Also update our mqch + mqch = mset.monitorQuitC() } } if err != nil { @@ -2779,6 +2791,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Grab last sequence and CLFS. last, clfs := mset.lastSeqAndCLFS() + // We can skip if we know this is less than what we already have. if lseq-clfs < last { s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", @@ -2809,13 +2822,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Process the actual message here. if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { - // Only return in place if we are going to reset stream or we are out of space. - if isClusterResetErr(err) || isOutOfSpaceErr(err) { + // Only return in place if we are going to reset our stream or we are out of space, or we are closed. + if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed { return err } s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v", mset.account(), mset.name(), err) } + case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { @@ -7388,7 +7402,7 @@ func (mset *stream) stateSnapshotLocked() []byte { Bytes: state.Bytes, FirstSeq: state.FirstSeq, LastSeq: state.LastSeq, - Failed: mset.clfs, + Failed: mset.getCLFS(), Deleted: state.Deleted, } b, _ := json.Marshal(snap) @@ -7425,7 +7439,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store + name, stype := mset.cfg.Name, mset.cfg.Storage s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed @@ -7525,26 +7539,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Some header checks can be checked pre proposal. Most can not. if len(hdr) > 0 { - // For CAS operations, e.g. ExpectedLastSeqPerSubject, we can also check here and not have to go through. - // Can only precheck for seq != 0. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 { - var smv StoreMsg - var fseq uint64 - sm, err := store.LoadLastMsg(subject, &smv) - if sm != nil { - fseq = sm.seq - } - if err != nil || fseq != seq { - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.PubAck = &PubAck{Stream: name} - resp.Error = NewJSStreamWrongLastSequenceError(fseq) - b, _ := json.Marshal(resp) - outq.sendMsg(reply, b) - } - return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq) - } - } // Expected stream name can also be pre-checked. if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { if canRespond { @@ -7752,8 +7746,8 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { mset.mu.Lock() var state StreamState - mset.clfs = snap.Failed mset.store.FastState(&state) + mset.setCLFS(snap.Failed) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node diff --git a/server/norace_test.go b/server/norace_test.go index 4b308baf1a2..6e8db5b7647 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -9064,12 +9064,28 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) { return &fullState{state, mset.lseq, mset.clfs} } + grabStore := func(mset *stream) map[string][]uint64 { + mset.mu.RLock() + store := mset.store + mset.mu.RUnlock() + var state StreamState + store.FastState(&state) + storeMap := make(map[string][]uint64) + for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { + if sm, err := store.LoadMsg(seq, nil); err == nil { + storeMap[sm.subj] = append(storeMap[sm.subj], sm.seq) + } + } + return storeMap + } + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { // Current stream leader. sl := c.streamLeader(globalAccountName, "KV_TEST") mset, err := sl.GlobalAccount().lookupStream("KV_TEST") require_NoError(t, err) lstate := grabState(mset) + golden := grabStore(mset) // Report messages per server. for _, s := range c.servers { @@ -9082,6 +9098,10 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) { if !reflect.DeepEqual(state, lstate) { return fmt.Errorf("Expected follower state\n%+v\nto match leader's\n %+v", state, lstate) } + sm := grabStore(mset) + if !reflect.DeepEqual(sm, golden) { + t.Fatalf("Expected follower store for %v\n%+v\nto match leader's %v\n %+v", s, sm, sl, golden) + } } return nil }) diff --git a/server/raft.go b/server/raft.go index a169011623b..ce0084dc956 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2160,12 +2160,15 @@ func (n *raft) runAsLeader() { // For forwarded proposals, both normal and remove peer proposals. fsub, err := n.subscribe(psubj, n.handleForwardedProposal) if err != nil { - n.debug("Error subscribing to forwarded proposals: %v", err) + n.warn("Error subscribing to forwarded proposals: %v", err) + n.stepdown.push(noLeader) return } rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal) if err != nil { - n.debug("Error subscribing to forwarded proposals: %v", err) + n.warn("Error subscribing to forwarded remove peer proposals: %v", err) + n.unsubscribe(fsub) + n.stepdown.push(noLeader) return } diff --git a/server/stream.go b/server/stream.go index bdc395bba13..3d93744b96f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -239,6 +239,7 @@ type stream struct { ddindex int ddtmr *time.Timer qch chan struct{} + mqch chan struct{} active bool ddloaded bool closed bool @@ -558,6 +559,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt msgs: newIPQueue[*inMsg](s, qpfx+"messages"), gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), qch: make(chan struct{}), + mqch: make(chan struct{}), uch: make(chan struct{}, 4), sch: make(chan struct{}, 1), } @@ -785,6 +787,15 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) { } } +func (mset *stream) monitorQuitC() <-chan struct{} { + if mset == nil { + return nil + } + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.mqch +} + func (mset *stream) updateC() <-chan struct{} { if mset == nil { return nil @@ -4069,6 +4080,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac var ( errLastSeqMismatch = errors.New("last sequence mismatch") errMsgIdDuplicate = errors.New("msgid is duplicate") + errStreamClosed = errors.New("stream closed") ) // processJetStreamMsg is where we try to actually process the stream msg. @@ -4077,7 +4089,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, c, s, store := mset.client, mset.srv, mset.store if mset.closed || c == nil { mset.mu.Unlock() - return nil + return errStreamClosed } // Apply the input subject transform if any @@ -4407,7 +4419,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Make sure to take into account any message assignments that we had to skip (clfs). seq = lseq + 1 - clfs // Check for preAcks and the need to skip vs store. - if mset.hasAllPreAcks(seq, subject) { mset.clearAllPreAcks(seq) store.SkipMsg() @@ -4899,9 +4910,28 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { accName := jsa.account.Name jsa.mu.Unlock() - // Clean up consumers. + // Mark as closed, kick monitor and collect consumers first. mset.mu.Lock() mset.closed = true + // Signal to the monitor loop. + // Can't use qch here. + if mset.mqch != nil { + close(mset.mqch) + mset.mqch = nil + } + + // Stop responding to sync requests. + mset.stopClusterSubs() + // Unsubscribe from direct stream. + mset.unsubscribeToStream(true) + + // Our info sub if we spun it up. + if mset.infoSub != nil { + mset.srv.sysUnsubscribe(mset.infoSub) + mset.infoSub = nil + } + + // Clean up consumers. var obs []*consumer for _, o := range mset.consumers { obs = append(obs, o) @@ -4922,21 +4952,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.cancelSourceConsumer(si.iname) } } - - // Cluster cleanup - var sa *streamAssignment - if n := mset.node; n != nil { - if deleteFlag { - n.Delete() - sa = mset.sa - } else { - if n.NeedSnapshot() { - // Attempt snapshot on clean exit. - n.InstallSnapshot(mset.stateSnapshotLocked()) - } - n.Stop() - } - } mset.mu.Unlock() isShuttingDown := js.isShuttingDown() @@ -4953,17 +4968,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Lock() - // Stop responding to sync requests. - mset.stopClusterSubs() - // Unsubscribe from direct stream. - mset.unsubscribeToStream(true) - - // Our info sub if we spun it up. - if mset.infoSub != nil { - mset.srv.sysUnsubscribe(mset.infoSub) - mset.infoSub = nil - } - // Send stream delete advisory after the consumers. if deleteFlag && advisory { mset.sendDeleteAdvisoryLocked() @@ -4975,11 +4979,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.qch = nil } - c := mset.client - mset.client = nil - if c == nil { - mset.mu.Unlock() - return nil + // Cluster cleanup + var sa *streamAssignment + if n := mset.node; n != nil { + if deleteFlag { + n.Delete() + sa = mset.sa + } else { + // Always attempt snapshot on clean exit. + n.InstallSnapshot(mset.stateSnapshotLocked()) + n.Stop() + } } // Cleanup duplicate timer if running. @@ -5005,6 +5015,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Snapshot store. store := mset.store + c := mset.client + mset.client = nil // Clustered cleanup. mset.mu.Unlock() @@ -5019,7 +5031,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { js.mu.Unlock() } - c.closeConnection(ClientClosed) + if c != nil { + c.closeConnection(ClientClosed) + } if sysc != nil { sysc.closeConnection(ClientClosed) From 08412af3b3504fe8c1917c5b07f6d500c8eaa527 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 8 Nov 2023 19:42:19 -0800 Subject: [PATCH 2/2] Wait longer for direct subs since we now set ticker to 2s Signed-off-by: Derek Collison --- server/jetstream_cluster_2_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 4fc95340d16..ae6e01aa2f7 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6968,7 +6968,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) { // Make sure we get all direct subs. checkForDirectSubs := func() { t.Helper() - checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { for _, s := range c.servers { mset, err := s.GlobalAccount().lookupStream("TEST") if err != nil {