Skip to content

Commit

Permalink
[IMPROVED] General stability and consistency improvements for cluster…
Browse files Browse the repository at this point in the history
…ed streams with failure offsets during server restarts. (#4777)

When a message is rejected due to constraints, like CAS operations on
KeyValue, we track offsets between the NRG (Raft) layer and the stream.
During heavy usage and many msgs being rejected due to constraint
violations and constantly restarting servers we saw some
inconsistencies.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Nov 9, 2023
2 parents 41944b5 + 08412af commit 5feedd3
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 69 deletions.
56 changes: 25 additions & 31 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()

qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()

s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
Expand Down Expand Up @@ -2249,7 +2249,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps

startDirectAccessMonitoring := func() {
if dat == nil {
dat = time.NewTicker(1 * time.Second)
dat = time.NewTicker(2 * time.Second)
datc = dat.C
}
}
Expand Down Expand Up @@ -2301,6 +2301,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
select {
case <-s.quitCh:
return
case <-mqch:
return
case <-qch:
return
case <-aq.ch:
Expand All @@ -2322,6 +2324,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ne, nb = n.Applied(ce.Index)
ce.ReturnToPool()
} else {
// Our stream was closed out from underneath of us, simply return here.
if err == errStreamClosed {
return
}
s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err)
if isClusterResetErr(err) {
if mset.isMirror() && mset.IsLeader() {
Expand Down Expand Up @@ -2352,7 +2358,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if mset != nil && n != nil && sendSnapshot {
n.SendSnapshot(mset.stateSnapshot())
sendSnapshot = false

}
if isRestore {
acc, _ := s.LookupAccount(sa.Client.serviceAccount())
Expand Down Expand Up @@ -2385,17 +2390,22 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
startDirectAccessMonitoring()
mset.mu.RLock()
ad, md := mset.cfg.AllowDirect, mset.cfg.MirrorDirect
mset.mu.RUnlock()
if ad || md {
startDirectAccessMonitoring()
}
}

case <-datc:
if mset == nil || isRecovering {
return
continue
}
// If we are leader we can stop, we know this is setup now.
if isLeader {
stopDirectMonitoring()
return
continue
}

mset.mu.Lock()
Expand Down Expand Up @@ -2547,6 +2557,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.setStreamAssignment(sa)
// Make sure to update our updateC which would have been nil.
uch = mset.updateC()
// Also update our mqch
mqch = mset.monitorQuitC()
}
}
if err != nil {
Expand Down Expand Up @@ -2779,6 +2791,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Grab last sequence and CLFS.
last, clfs := mset.lastSeqAndCLFS()

// We can skip if we know this is less than what we already have.
if lseq-clfs < last {
s.Debugf("Apply stream entries for '%s > %s' skipping message with sequence %d with last of %d",
Expand Down Expand Up @@ -2809,13 +2822,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
// Only return in place if we are going to reset stream or we are out of space.
if isClusterResetErr(err) || isOutOfSpaceErr(err) {
// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
if isClusterResetErr(err) || isOutOfSpaceErr(err) || err == errStreamClosed {
return err
}
s.Debugf("Apply stream entries for '%s > %s' got error processing message: %v",
mset.account(), mset.name(), err)
}

case deleteMsgOp:
md, err := decodeMsgDelete(buf[1:])
if err != nil {
Expand Down Expand Up @@ -7388,7 +7402,7 @@ func (mset *stream) stateSnapshotLocked() []byte {
Bytes: state.Bytes,
FirstSeq: state.FirstSeq,
LastSeq: state.LastSeq,
Failed: mset.clfs,
Failed: mset.getCLFS(),
Deleted: state.Deleted,
}
b, _ := json.Marshal(snap)
Expand Down Expand Up @@ -7425,7 +7439,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [

mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
name, stype := mset.cfg.Name, mset.cfg.Storage
s, js, jsa, st, rf, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
Expand Down Expand Up @@ -7525,26 +7539,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [

// Some header checks can be checked pre proposal. Most can not.
if len(hdr) > 0 {
// For CAS operations, e.g. ExpectedLastSeqPerSubject, we can also check here and not have to go through.
// Can only precheck for seq != 0.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 {
var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(subject, &smv)
if sm != nil {
fseq = sm.seq
}
if err != nil || fseq != seq {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
}
}
// Expected stream name can also be pre-checked.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
if canRespond {
Expand Down Expand Up @@ -7752,8 +7746,8 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {

mset.mu.Lock()
var state StreamState
mset.clfs = snap.Failed
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)

s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6968,7 +6968,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) {
// Make sure we get all direct subs.
checkForDirectSubs := func() {
t.Helper()
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
for _, s := range c.servers {
mset, err := s.GlobalAccount().lookupStream("TEST")
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9064,12 +9064,28 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) {
return &fullState{state, mset.lseq, mset.clfs}
}

grabStore := func(mset *stream) map[string][]uint64 {
mset.mu.RLock()
store := mset.store
mset.mu.RUnlock()
var state StreamState
store.FastState(&state)
storeMap := make(map[string][]uint64)
for seq := state.FirstSeq; seq <= state.LastSeq; seq++ {
if sm, err := store.LoadMsg(seq, nil); err == nil {
storeMap[sm.subj] = append(storeMap[sm.subj], sm.seq)
}
}
return storeMap
}

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
// Current stream leader.
sl := c.streamLeader(globalAccountName, "KV_TEST")
mset, err := sl.GlobalAccount().lookupStream("KV_TEST")
require_NoError(t, err)
lstate := grabState(mset)
golden := grabStore(mset)

// Report messages per server.
for _, s := range c.servers {
Expand All @@ -9082,6 +9098,10 @@ func TestNoRaceJetStreamClusterKVWithServerKill(t *testing.T) {
if !reflect.DeepEqual(state, lstate) {
return fmt.Errorf("Expected follower state\n%+v\nto match leader's\n %+v", state, lstate)
}
sm := grabStore(mset)
if !reflect.DeepEqual(sm, golden) {
t.Fatalf("Expected follower store for %v\n%+v\nto match leader's %v\n %+v", s, sm, sl, golden)
}
}
return nil
})
Expand Down
7 changes: 5 additions & 2 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,12 +2160,15 @@ func (n *raft) runAsLeader() {
// For forwarded proposals, both normal and remove peer proposals.
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.debug("Error subscribing to forwarded proposals: %v", err)
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.debug("Error subscribing to forwarded proposals: %v", err)
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
return
}

Expand Down
84 changes: 49 additions & 35 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type stream struct {
ddindex int
ddtmr *time.Timer
qch chan struct{}
mqch chan struct{}
active bool
ddloaded bool
closed bool
Expand Down Expand Up @@ -558,6 +559,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
msgs: newIPQueue[*inMsg](s, qpfx+"messages"),
gets: newIPQueue[*directGetReq](s, qpfx+"direct gets"),
qch: make(chan struct{}),
mqch: make(chan struct{}),
uch: make(chan struct{}, 4),
sch: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -785,6 +787,15 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
}
}

func (mset *stream) monitorQuitC() <-chan struct{} {
if mset == nil {
return nil
}
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.mqch
}

func (mset *stream) updateC() <-chan struct{} {
if mset == nil {
return nil
Expand Down Expand Up @@ -4069,6 +4080,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac
var (
errLastSeqMismatch = errors.New("last sequence mismatch")
errMsgIdDuplicate = errors.New("msgid is duplicate")
errStreamClosed = errors.New("stream closed")
)

// processJetStreamMsg is where we try to actually process the stream msg.
Expand All @@ -4077,7 +4089,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
c, s, store := mset.client, mset.srv, mset.store
if mset.closed || c == nil {
mset.mu.Unlock()
return nil
return errStreamClosed
}

// Apply the input subject transform if any
Expand Down Expand Up @@ -4407,7 +4419,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Make sure to take into account any message assignments that we had to skip (clfs).
seq = lseq + 1 - clfs
// Check for preAcks and the need to skip vs store.

if mset.hasAllPreAcks(seq, subject) {
mset.clearAllPreAcks(seq)
store.SkipMsg()
Expand Down Expand Up @@ -4899,9 +4910,28 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
accName := jsa.account.Name
jsa.mu.Unlock()

// Clean up consumers.
// Mark as closed, kick monitor and collect consumers first.
mset.mu.Lock()
mset.closed = true
// Signal to the monitor loop.
// Can't use qch here.
if mset.mqch != nil {
close(mset.mqch)
mset.mqch = nil
}

// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream(true)

// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}

// Clean up consumers.
var obs []*consumer
for _, o := range mset.consumers {
obs = append(obs, o)
Expand All @@ -4922,21 +4952,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.cancelSourceConsumer(si.iname)
}
}

// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
if n.NeedSnapshot() {
// Attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
}
n.Stop()
}
}
mset.mu.Unlock()

isShuttingDown := js.isShuttingDown()
Expand All @@ -4953,17 +4968,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}

mset.mu.Lock()
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
mset.unsubscribeToStream(true)

// Our info sub if we spun it up.
if mset.infoSub != nil {
mset.srv.sysUnsubscribe(mset.infoSub)
mset.infoSub = nil
}

// Send stream delete advisory after the consumers.
if deleteFlag && advisory {
mset.sendDeleteAdvisoryLocked()
Expand All @@ -4975,11 +4979,17 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
mset.qch = nil
}

c := mset.client
mset.client = nil
if c == nil {
mset.mu.Unlock()
return nil
// Cluster cleanup
var sa *streamAssignment
if n := mset.node; n != nil {
if deleteFlag {
n.Delete()
sa = mset.sa
} else {
// Always attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}

// Cleanup duplicate timer if running.
Expand All @@ -5005,6 +5015,8 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {

// Snapshot store.
store := mset.store
c := mset.client
mset.client = nil

// Clustered cleanup.
mset.mu.Unlock()
Expand All @@ -5019,7 +5031,9 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
js.mu.Unlock()
}

c.closeConnection(ClientClosed)
if c != nil {
c.closeConnection(ClientClosed)
}

if sysc != nil {
sysc.closeConnection(ClientClosed)
Expand Down

0 comments on commit 5feedd3

Please sign in to comment.