diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2db50d08e6d..d55b3e0bcdc 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2143,7 +2143,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // from underneath the one that is running since it will be the same raft node. defer n.Stop() - qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() + qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group()) @@ -2249,7 +2249,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps startDirectAccessMonitoring := func() { if dat == nil { - dat = time.NewTicker(1 * time.Second) + dat = time.NewTicker(2 * time.Second) datc = dat.C } } @@ -2301,6 +2301,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps select { case <-s.quitCh: return + case <-mqch: + return case <-qch: return case <-aq.ch: @@ -2322,6 +2324,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ne, nb = n.Applied(ce.Index) ce.ReturnToPool() } else { + // Our stream was closed out from underneath of us, simply return here. + if err == errStreamClosed { + return + } s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { if mset.isMirror() && mset.IsLeader() { @@ -2352,7 +2358,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps if mset != nil && n != nil && sendSnapshot { n.SendSnapshot(mset.stateSnapshot()) sendSnapshot = false - } if isRestore { acc, _ := s.LookupAccount(sa.Client.serviceAccount()) @@ -2385,17 +2390,22 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Here we are checking if we are not the leader but we have been asked to allow // direct access. We now allow non-leaders to participate in the queue group. if !isLeader && mset != nil { - startDirectAccessMonitoring() + mset.mu.RLock() + ad, md := mset.cfg.AllowDirect, mset.cfg.MirrorDirect + mset.mu.RUnlock() + if ad || md { + startDirectAccessMonitoring() + } } case <-datc: if mset == nil || isRecovering { - return + continue } // If we are leader we can stop, we know this is setup now. if isLeader { stopDirectMonitoring() - return + continue } mset.mu.Lock() @@ -2547,6 +2557,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps mset.setStreamAssignment(sa) // Make sure to update our updateC which would have been nil. uch = mset.updateC() + // Also update our mqch + mqch = mset.monitorQuitC() } } if err != nil { @@ -2779,6 +2791,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Grab last sequence and CLFS. last, clfs := mset.lastSeqAndCLFS() + // We can skip if we know this is less than what we already have. if lseq-clfs < last { s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d", @@ -2809,13 +2822,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Process the actual message here. if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil { - // Only return in place if we are going to reset stream or we are out of space. - if isClusterResetErr(err) || isOutOfSpaceErr(err) { + // Only return in place if we are going to reset our stream or we are out of space, or we are closed. + if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed { return err } s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v", mset.account(), mset.name(), err) } + case deleteMsgOp: md, err := decodeMsgDelete(buf[1:]) if err != nil { @@ -7388,7 +7402,7 @@ func (mset *stream) stateSnapshotLocked() []byte { Bytes: state.Bytes, FirstSeq: state.FirstSeq, LastSeq: state.LastSeq, - Failed: mset.clfs, + Failed: mset.getCLFS(), Deleted: state.Deleted, } b, _ := json.Marshal(snap) @@ -7425,7 +7439,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 - name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store + name, stype := mset.cfg.Name, mset.cfg.Storage s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed @@ -7525,26 +7539,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Some header checks can be checked pre proposal. Most can not. if len(hdr) > 0 { - // For CAS operations, e.g. ExpectedLastSeqPerSubject, we can also check here and not have to go through. - // Can only precheck for seq != 0. - if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 { - var smv StoreMsg - var fseq uint64 - sm, err := store.LoadLastMsg(subject, &smv) - if sm != nil { - fseq = sm.seq - } - if err != nil || fseq != seq { - if canRespond { - var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}} - resp.PubAck = &PubAck{Stream: name} - resp.Error = NewJSStreamWrongLastSequenceError(fseq) - b, _ := json.Marshal(resp) - outq.sendMsg(reply, b) - } - return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq) - } - } // Expected stream name can also be pre-checked. if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name { if canRespond { @@ -7752,8 +7746,8 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) { mset.mu.Lock() var state StreamState - mset.clfs = snap.Failed mset.store.FastState(&state) + mset.setCLFS(snap.Failed) sreq := mset.calculateSyncRequest(&state, snap) s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 4fc95340d16..ae6e01aa2f7 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -6968,7 +6968,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) { // Make sure we get all direct subs. checkForDirectSubs := func() { t.Helper() - checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + checkFor(t, 10*time.Second, 250*time.Millisecond, func() error { for _, s := range c.servers { mset, err := s.GlobalAccount().lookupStream("TEST") if err != nil { diff --git a/server/norace_test.go b/server/norace_test.go index 4b308baf1a2..6e8db5b7647 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -9064,12 +9064,28 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) { return &fullState{state, mset.lseq, mset.clfs} } + grabStore := func(mset *stream) map[string][]uint64 { + mset.mu.RLock() + store := mset.store + mset.mu.RUnlock() + var state StreamState + store.FastState(&state) + storeMap := make(map[string][]uint64) + for seq := state.FirstSeq; seq <= state.LastSeq; seq++ { + if sm, err := store.LoadMsg(seq, nil); err == nil { + storeMap[sm.subj] = append(storeMap[sm.subj], sm.seq) + } + } + return storeMap + } + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { // Current stream leader. sl := c.streamLeader(globalAccountName, "KV_TEST") mset, err := sl.GlobalAccount().lookupStream("KV_TEST") require_NoError(t, err) lstate := grabState(mset) + golden := grabStore(mset) // Report messages per server. for _, s := range c.servers { @@ -9082,6 +9098,10 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) { if !reflect.DeepEqual(state, lstate) { return fmt.Errorf("Expected follower state\n%+v\nto match leader's\n %+v", state, lstate) } + sm := grabStore(mset) + if !reflect.DeepEqual(sm, golden) { + t.Fatalf("Expected follower store for %v\n%+v\nto match leader's %v\n %+v", s, sm, sl, golden) + } } return nil }) diff --git a/server/raft.go b/server/raft.go index a169011623b..ce0084dc956 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2160,12 +2160,15 @@ func (n *raft) runAsLeader() { // For forwarded proposals, both normal and remove peer proposals. fsub, err := n.subscribe(psubj, n.handleForwardedProposal) if err != nil { - n.debug("Error subscribing to forwarded proposals: %v", err) + n.warn("Error subscribing to forwarded proposals: %v", err) + n.stepdown.push(noLeader) return } rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal) if err != nil { - n.debug("Error subscribing to forwarded proposals: %v", err) + n.warn("Error subscribing to forwarded remove peer proposals: %v", err) + n.unsubscribe(fsub) + n.stepdown.push(noLeader) return } diff --git a/server/stream.go b/server/stream.go index bdc395bba13..3d93744b96f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -239,6 +239,7 @@ type stream struct { ddindex int ddtmr *time.Timer qch chan struct{} + mqch chan struct{} active bool ddloaded bool closed bool @@ -558,6 +559,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt msgs: newIPQueue[*inMsg](s, qpfx+"messages"), gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"), qch: make(chan struct{}), + mqch: make(chan struct{}), uch: make(chan struct{}, 4), sch: make(chan struct{}, 1), } @@ -785,6 +787,15 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) { } } +func (mset *stream) monitorQuitC() <-chan struct{} { + if mset == nil { + return nil + } + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.mqch +} + func (mset *stream) updateC() <-chan struct{} { if mset == nil { return nil @@ -4069,6 +4080,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac var ( errLastSeqMismatch = errors.New("last sequence mismatch") errMsgIdDuplicate = errors.New("msgid is duplicate") + errStreamClosed = errors.New("stream closed") ) // processJetStreamMsg is where we try to actually process the stream msg. @@ -4077,7 +4089,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, c, s, store := mset.client, mset.srv, mset.store if mset.closed || c == nil { mset.mu.Unlock() - return nil + return errStreamClosed } // Apply the input subject transform if any @@ -4407,7 +4419,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Make sure to take into account any message assignments that we had to skip (clfs). seq = lseq + 1 - clfs // Check for preAcks and the need to skip vs store. - if mset.hasAllPreAcks(seq, subject) { mset.clearAllPreAcks(seq) store.SkipMsg() @@ -4899,9 +4910,28 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { accName := jsa.account.Name jsa.mu.Unlock() - // Clean up consumers. + // Mark as closed, kick monitor and collect consumers first. mset.mu.Lock() mset.closed = true + // Signal to the monitor loop. + // Can't use qch here. + if mset.mqch != nil { + close(mset.mqch) + mset.mqch = nil + } + + // Stop responding to sync requests. + mset.stopClusterSubs() + // Unsubscribe from direct stream. + mset.unsubscribeToStream(true) + + // Our info sub if we spun it up. + if mset.infoSub != nil { + mset.srv.sysUnsubscribe(mset.infoSub) + mset.infoSub = nil + } + + // Clean up consumers. var obs []*consumer for _, o := range mset.consumers { obs = append(obs, o) @@ -4922,21 +4952,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.cancelSourceConsumer(si.iname) } } - - // Cluster cleanup - var sa *streamAssignment - if n := mset.node; n != nil { - if deleteFlag { - n.Delete() - sa = mset.sa - } else { - if n.NeedSnapshot() { - // Attempt snapshot on clean exit. - n.InstallSnapshot(mset.stateSnapshotLocked()) - } - n.Stop() - } - } mset.mu.Unlock() isShuttingDown := js.isShuttingDown() @@ -4953,17 +4968,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Lock() - // Stop responding to sync requests. - mset.stopClusterSubs() - // Unsubscribe from direct stream. - mset.unsubscribeToStream(true) - - // Our info sub if we spun it up. - if mset.infoSub != nil { - mset.srv.sysUnsubscribe(mset.infoSub) - mset.infoSub = nil - } - // Send stream delete advisory after the consumers. if deleteFlag && advisory { mset.sendDeleteAdvisoryLocked() @@ -4975,11 +4979,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.qch = nil } - c := mset.client - mset.client = nil - if c == nil { - mset.mu.Unlock() - return nil + // Cluster cleanup + var sa *streamAssignment + if n := mset.node; n != nil { + if deleteFlag { + n.Delete() + sa = mset.sa + } else { + // Always attempt snapshot on clean exit. + n.InstallSnapshot(mset.stateSnapshotLocked()) + n.Stop() + } } // Cleanup duplicate timer if running. @@ -5005,6 +5015,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Snapshot store. store := mset.store + c := mset.client + mset.client = nil // Clustered cleanup. mset.mu.Unlock() @@ -5019,7 +5031,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { js.mu.Unlock() } - c.closeConnection(ClientClosed) + if c != nil { + c.closeConnection(ClientClosed) + } if sysc != nil { sysc.closeConnection(ClientClosed)