diff --git a/go.mod b/go.mod index f9a0b5ed0be..dd56502c6e9 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,14 @@ module github.com/nats-io/nats-server/v2 go 1.21.0 require ( - github.com/klauspost/compress v1.17.9 + github.com/klauspost/compress v1.17.10 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.5.8 github.com/nats-io/nats.go v1.36.0 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 - go.uber.org/automaxprocs v1.5.3 - golang.org/x/crypto v0.27.0 - golang.org/x/sys v0.25.0 - golang.org/x/time v0.6.0 + go.uber.org/automaxprocs v1.6.0 + golang.org/x/crypto v0.28.0 + golang.org/x/sys v0.26.0 + golang.org/x/time v0.7.0 ) diff --git a/go.sum b/go.sum index fe5935a31ee..ef85af00ba5 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= +github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= @@ -18,14 +18,14 @@ github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4 github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= -go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= -golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= +golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 767719fc4c2..657e202025d 100644 --- a/main.go +++ b/main.go @@ -38,7 +38,7 @@ Server Options: -c, --config Configuration file -t Test configuration and exit -sl,--signal [=] Send signal to nats-server process (ldm, stop, quit, term, reopen, reload) - pid> can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid) + can be either a PID (e.g. 1) or the path to a PID file (e.g. /var/run/nats-server.pid) --client_advertise Client URL to advertise to other servers --ports_file_dir Creates a ports file in the specified directory (_.ports). diff --git a/server/consumer.go b/server/consumer.go index 38af7da959a..35cad6848d1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -320,6 +320,7 @@ type consumer struct { dseq uint64 // delivered consumer sequence adflr uint64 // ack delivery floor asflr uint64 // ack store floor + chkflr uint64 // our check floor, interest streams only. npc int64 // Num Pending Count npf uint64 // Num Pending Floor Sequence dsubj string @@ -2918,28 +2919,6 @@ func (o *consumer) isFiltered() bool { return false } -// Check if we would have matched and needed an ack for this store seq. -// This is called for interest based retention streams to remove messages. -func (o *consumer) matchAck(sseq uint64) bool { - o.mu.RLock() - defer o.mu.RUnlock() - - // Check if we are filtered, and if so check if this is even applicable to us. - if o.isFiltered() { - if o.mset == nil { - return false - } - var svp StoreMsg - if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { - return false - } - if !o.isFilteredMatch(svp.subj) { - return false - } - } - return true -} - // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -5512,16 +5491,24 @@ func (o *consumer) isMonitorRunning() bool { var errAckFloorHigherThanLastSeq = errors.New("consumer ack floor is higher than streams last sequence") // If we are a consumer of an interest or workqueue policy stream, process that state and make sure consistent. -func (o *consumer) checkStateForInterestStream() error { +func (o *consumer) checkStateForInterestStream(ss *StreamState) error { o.mu.RLock() // See if we need to process this update if our parent stream is not a limits policy stream. mset := o.mset shouldProcessState := mset != nil && o.retention != LimitsPolicy - if o.closed || !shouldProcessState || o.store == nil { + if o.closed || !shouldProcessState || o.store == nil || ss == nil { o.mu.RUnlock() return nil } + store := mset.store state, err := o.store.State() + + filters, subjf, filter := o.filters, o.subjf, _EMPTY_ + var wc bool + if filters == nil && subjf != nil { + filter, wc = subjf[0].subject, subjf[0].hasWildcard + } + chkfloor := o.chkflr o.mu.RUnlock() if err != nil { @@ -5534,26 +5521,46 @@ func (o *consumer) checkStateForInterestStream() error { return nil } - // We should make sure to update the acks. - var ss StreamState - mset.store.FastState(&ss) - // Check if the underlying stream's last sequence is less than our floor. // This can happen if the stream has been reset and has not caught up yet. if asflr > ss.LastSeq { return errAckFloorHigherThanLastSeq } - for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - if o.matchAck(seq) { + var smv StoreMsg + var seq, nseq uint64 + // Start at first stream seq or a previous check floor, whichever is higher. + // Note this will really help for interest retention, with WQ the loadNextMsg + // gets us a long way already since it will skip deleted msgs not for our filter. + fseq := ss.FirstSeq + if chkfloor > fseq { + fseq = chkfloor + } + + for seq = fseq; asflr > 0 && seq <= asflr; seq++ { + if filters != nil { + _, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv) + } else { + _, nseq, err = store.LoadNextMsg(filter, wc, seq, &smv) + } + // if we advanced sequence update our seq. This can be on no error and EOF. + if nseq > seq { + seq = nseq + } + // Only ack though if no error and seq <= ack floor. + if err == nil && seq <= asflr { mset.ackMsg(o, seq) } } - o.mu.RLock() + o.mu.Lock() + // Update our check floor. + if seq > o.chkflr { + o.chkflr = seq + } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() - o.mu.RUnlock() + o.mu.Unlock() // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 { diff --git a/server/filestore.go b/server/filestore.go index c8f8b0271d6..45e7ac17c82 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -195,6 +195,7 @@ type fileStore struct { closed bool fip bool receivedAny bool + firstMoved bool } // Represents a message store block and its data. @@ -456,8 +457,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Check if our prior state remembers a last sequence past where we can see. if fs.ld != nil && prior.LastSeq > fs.state.LastSeq { fs.state.LastSeq, fs.state.LastTime = prior.LastSeq, prior.LastTime - if lmb, err := fs.newMsgBlockForWrite(); err == nil { - lmb.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()) + if _, err := fs.newMsgBlockForWrite(); err == nil { + if err = fs.writeTombstone(prior.LastSeq, prior.LastTime.UnixNano()); err != nil { + return nil, err + } } else { return nil, err } @@ -1651,7 +1654,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { subj := buf[bi : bi+lsubj] // We had a bug that could cause memory corruption in the PSIM that could have gotten stored to disk. // Only would affect subjects, so do quick check. - if !isValidSubject(string(subj), true) { + if !isValidSubject(bytesToString(subj), true) { os.Remove(fn) fs.warn("Stream state corrupt subject detected") return errCorruptState @@ -2122,7 +2125,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // Check if we have no messages and blocks left. if fs.lmb == nil && last.seq != 0 { if lmb, _ := fs.newMsgBlockForWrite(); lmb != nil { - lmb.writeTombstone(last.seq, last.ts) + fs.writeTombstone(last.seq, last.ts) } // Clear any global subject state. fs.psim, fs.tsl = fs.psim.Empty(), 0 @@ -2586,7 +2589,7 @@ func (fs *fileStore) numFilteredPendingNoLast(filter string, ss *SimpleState) { // Optimized way for getting all num pending matching a filter subject. // Optionally look up last sequence. Sometimes do not need last and this avoids cost. -// Lock should be held. +// Read lock should be held. func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *SimpleState) { isAll := filter == _EMPTY_ || filter == fwcs @@ -2653,17 +2656,25 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si } } // Update fblk since fblk was outdated. - if !wc { - if info, ok := fs.psim.Find(stringToBytes(filter)); ok { - info.fblk = i - } - } else { - fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { - if i > psi.fblk { - psi.fblk = i + // We only require read lock here as that is desirable, + // so we need to do this in a go routine to acquire write lock. + go func() { + fs.mu.Lock() + defer fs.mu.Unlock() + if !wc { + if info, ok := fs.psim.Find(stringToBytes(filter)); ok { + if i > info.fblk { + info.fblk = i + } } - }) - } + } else { + fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) { + if i > psi.fblk { + psi.fblk = i + } + }) + } + }() } // Now gather last sequence if asked to do so. if last { @@ -3965,11 +3976,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // This is for user initiated removes or to hold the first seq // when the last block is empty. - // If not via limits and not empty and last (empty writes tombstone above if last) write tombstone. - if !viaLimits && !(isEmpty && isLastBlock) { - if lmb := fs.lmb; sm != nil && lmb != nil { - lmb.writeTombstone(sm.seq, sm.ts) - } + // If not via limits and not empty (empty writes tombstone above if last) write tombstone. + if !viaLimits && !isEmpty && sm != nil { + fs.writeTombstone(sm.seq, sm.ts) } if cb := fs.scb; cb != nil { @@ -4009,11 +4018,18 @@ func (mb *msgBlock) shouldCompactSync() bool { return mb.bytes*2 < mb.rbytes && !mb.noCompact } +// This will compact and rewrite this block. This version will not process any tombstone cleanup. +// Write lock needs to be held. +func (mb *msgBlock) compact() { + mb.compactWithFloor(0) +} + // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. // This should not be called on the lmb since we will prune tail deleted messages which could cause issues with // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. +// if fseq > 0 we will attempt to cleanup stale tombstones. // Write lock needs to be held. -func (mb *msgBlock) compact() { +func (mb *msgBlock) compactWithFloor(floor uint64) { wasLoaded := mb.cacheAlreadyLoaded() if !wasLoaded { if err := mb.loadMsgsWithLock(); err != nil { @@ -4055,7 +4071,9 @@ func (mb *msgBlock) compact() { if seq&tbit != 0 { seq = seq &^ tbit // If this entry is for a lower seq than ours then keep around. - if seq < fseq { + // We also check that it is greater than our floor. Floor is zero on normal + // calls to compact. + if seq < fseq && seq >= floor { nbuf = append(nbuf, buf[index:index+rl]...) } } else { @@ -4072,7 +4090,7 @@ func (mb *msgBlock) compact() { } // Handle compression - if mb.cmp != NoCompression { + if mb.cmp != NoCompression && len(nbuf) > 0 { cbuf, err := mb.cmp.Compress(nbuf) if err != nil { return @@ -4515,6 +4533,8 @@ func (fs *fileStore) selectNextFirst() { fs.state.FirstSeq = fs.state.LastSeq + 1 fs.state.FirstTime = time.Time{} } + // Mark first as moved. Plays into tombstone cleanup for syncBlocks. + fs.firstMoved = true } // Lock should be held. @@ -5059,6 +5079,26 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg return rl, err } +// For writing tombstones to our lmb. This version will enforce maximum block sizes. +// Lock should be held. +func (fs *fileStore) writeTombstone(seq uint64, ts int64) error { + // Grab our current last message block. + lmb := fs.lmb + var err error + + if lmb == nil || lmb.blkSize()+emptyRecordLen > fs.fcfg.BlockSize { + if lmb != nil && fs.fcfg.Compression != NoCompression { + // We've now reached the end of this message block, if we want + // to compress blocks then now's the time to do it. + go lmb.recompressOnDiskIfNeeded() + } + if lmb, err = fs.newMsgBlockForWrite(); err != nil { + return err + } + } + return lmb.writeTombstone(seq, ts) +} + func (mb *msgBlock) recompressOnDiskIfNeeded() error { alg := mb.fs.fcfg.Compression mb.mu.Lock() @@ -5072,7 +5112,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // 1. The block will be compressed already and have a valid metadata // header, in which case we do nothing. // 2. The block will be uncompressed, in which case we will compress it - // and then write it back out to disk, reencrypting if necessary. + // and then write it back out to disk, re-encrypting if necessary. <-dios origBuf, err := os.ReadFile(origFN) dios <- struct{}{} @@ -5185,6 +5225,10 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error { // compression algorithm is up-to-date, since this will be needed when // compacting or truncating. mb.cmp = alg + + // Also update rbytes + mb.rbytes = uint64(len(cmpBuf)) + return nil } @@ -5228,15 +5272,17 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { - fs.mu.RLock() + fs.mu.Lock() // If closed or a snapshot is in progress bail. if fs.closed || fs.sips > 0 { - fs.mu.RUnlock() + fs.mu.Unlock() return } blks := append([]*msgBlock(nil), fs.blks...) - lmb := fs.lmb - fs.mu.RUnlock() + lmb, firstMoved, firstSeq := fs.lmb, fs.firstMoved, fs.state.FirstSeq + // Clear first moved. + fs.firstMoved = false + fs.mu.Unlock() var markDirty bool for _, mb := range blks { @@ -5251,6 +5297,11 @@ func (fs *fileStore) syncBlocks() { mb.dirtyCloseWithRemove(false) } + // If our first has moved and we are set to noCompact (which is from tombstones), + // clear so that we might cleanup tombstones. + if firstMoved && mb.noCompact { + mb.noCompact = false + } // Check if we should compact here as well. // Do not compact last mb. var needsCompact bool @@ -5268,13 +5319,26 @@ func (fs *fileStore) syncBlocks() { mb.mu.Unlock() // Check if we should compact here. - // Need to hold fs lock in case we reference psim when loading in the mb. + // Need to hold fs lock in case we reference psim when loading in the mb and we may remove this block if truly empty. if needsCompact { fs.mu.RLock() mb.mu.Lock() - mb.compact() + mb.compactWithFloor(firstSeq) + // If this compact removed all raw bytes due to tombstone cleanup, schedule to remove. + shouldRemove := mb.rbytes == 0 mb.mu.Unlock() fs.mu.RUnlock() + + // Check if we should remove. This will not be common, so we will re-take fs write lock here vs changing + // it above which we would prefer to be a readlock such that other lookups can occur while compacting this block. + if shouldRemove { + fs.mu.Lock() + mb.mu.Lock() + fs.removeMsgBlock(mb) + mb.mu.Unlock() + fs.mu.Unlock() + needSync = false + } } // Check if we need to sync this block. @@ -6440,8 +6504,11 @@ func (fs *fileStore) State() StreamState { state.Deleted = append(state.Deleted, seq) } } - cur = atomic.LoadUint64(&mb.last.seq) + 1 // Expected next first. - + // Only advance cur if we are increasing. We could have marker blocks with just tombstones. + if last := atomic.LoadUint64(&mb.last.seq); last >= cur { + cur = last + 1 // Expected next first. + } + // Add in deleted. mb.dmap.Range(func(seq uint64) bool { if seq < fseq { mb.dmap.Delete(seq) @@ -6816,7 +6883,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint // Write any tombstones as needed. for _, tomb := range tombs { if tomb.seq > fseq { - fs.lmb.writeTombstone(tomb.seq, tomb.ts) + fs.writeTombstone(tomb.seq, tomb.ts) } } @@ -6913,7 +6980,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { if lseq := atomic.LoadUint64(&lmb.last.seq); lseq > 1 { // Leave a tombstone so we can remember our starting sequence in case // full state becomes corrupted. - lmb.writeTombstone(lseq, lmb.last.ts) + fs.writeTombstone(lseq, lmb.last.ts) } cb := fs.scb @@ -7311,7 +7378,7 @@ func (fs *fileStore) Truncate(seq uint64) error { // Write any tombstones as needed. for _, tomb := range tombs { if tomb.seq <= lsm.seq { - fs.lmb.writeTombstone(tomb.seq, tomb.ts) + fs.writeTombstone(tomb.seq, tomb.ts) } } @@ -7386,7 +7453,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) { mb.mu.Unlock() // Write the tombstone to remember since this was last block. if lmb, _ := fs.newMsgBlockForWrite(); lmb != nil { - lmb.writeTombstone(lseq, lts) + fs.writeTombstone(lseq, lts) } mb.mu.Lock() } diff --git a/server/filestore_test.go b/server/filestore_test.go index 6fbffe6bd0e..1f7a4b403e0 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -5581,7 +5581,15 @@ func TestFileStoreFullStateTestUserRemoveWAL(t *testing.T) { require_NoError(t, err) defer fs.Stop() - if newState := fs.State(); !reflect.DeepEqual(state, newState) { + newState := fs.State() + // We will properly detect lost data for sequence #2 here. + require_True(t, newState.Lost != nil) + require_Equal(t, len(newState.Lost.Msgs), 1) + require_Equal(t, newState.Lost.Msgs[0], 2) + // Clear for deep equal compare below. + newState.Lost = nil + + if !reflect.DeepEqual(state, newState) { t.Fatalf("Restore state does not match:\n%+v\n%+v", state, newState) } @@ -7042,14 +7050,21 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { require_Equal(t, ss.First, 1002) require_Equal(t, ss.Last, 1003) - // Check psi was updated. - fs.mu.RLock() - psi, ok = fs.psim.Find([]byte("foo.baz")) - fs.mu.RUnlock() - require_True(t, ok) - require_Equal(t, psi.total, 2) - require_Equal(t, psi.fblk, 84) - require_Equal(t, psi.lblk, 84) + // Check psi was updated. This is done in separate go routine to acquire + // the write lock now. + checkFor(t, time.Second, 100*time.Millisecond, func() error { + fs.mu.RLock() + psi, ok = fs.psim.Find([]byte("foo.baz")) + total, fblk, lblk := psi.total, psi.fblk, psi.lblk + fs.mu.RUnlock() + require_True(t, ok) + require_Equal(t, total, 2) + require_Equal(t, lblk, 84) + if fblk != 84 { + return fmt.Errorf("fblk should be 84, still %d", fblk) + } + return nil + }) } func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { @@ -7086,19 +7101,21 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { require_Equal(t, fs.numMsgBlocks(), 92) fs.mu.RLock() psi, ok := fs.psim.Find([]byte("foo.22.baz")) + total, fblk, lblk := psi.total, psi.fblk, psi.lblk fs.mu.RUnlock() require_True(t, ok) - require_Equal(t, psi.total, 2) - require_Equal(t, psi.fblk, 1) - require_Equal(t, psi.lblk, 92) + require_Equal(t, total, 2) + require_Equal(t, fblk, 1) + require_Equal(t, lblk, 92) fs.mu.RLock() psi, ok = fs.psim.Find([]byte("foo.22.bar")) + total, fblk, lblk = psi.total, psi.fblk, psi.lblk fs.mu.RUnlock() require_True(t, ok) - require_Equal(t, psi.total, 2) - require_Equal(t, psi.fblk, 1) - require_Equal(t, psi.lblk, 92) + require_Equal(t, total, 2) + require_Equal(t, fblk, 1) + require_Equal(t, lblk, 92) // No make sure that a call to numFilterPending which will initially walk all blocks if starting from seq 1 updates psi. var ss SimpleState @@ -7110,21 +7127,33 @@ func TestFileStoreWildcardFilteredPendingPSIMFirstBlockUpdate(t *testing.T) { require_Equal(t, ss.Last, 1006) // Check both psi were updated. - fs.mu.RLock() - psi, ok = fs.psim.Find([]byte("foo.22.baz")) - fs.mu.RUnlock() - require_True(t, ok) - require_Equal(t, psi.total, 2) - require_Equal(t, psi.fblk, 92) - require_Equal(t, psi.lblk, 92) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + fs.mu.RLock() + psi, ok = fs.psim.Find([]byte("foo.22.baz")) + total, fblk, lblk = psi.total, psi.fblk, psi.lblk + fs.mu.RUnlock() + require_True(t, ok) + require_Equal(t, total, 2) + require_Equal(t, lblk, 92) + if fblk != 92 { + return fmt.Errorf("fblk should be 92, still %d", fblk) + } + return nil + }) - fs.mu.RLock() - psi, ok = fs.psim.Find([]byte("foo.22.bar")) - fs.mu.RUnlock() - require_True(t, ok) - require_Equal(t, psi.total, 2) - require_Equal(t, psi.fblk, 92) - require_Equal(t, psi.lblk, 92) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + fs.mu.RLock() + psi, ok = fs.psim.Find([]byte("foo.22.bar")) + total, fblk, lblk = psi.total, psi.fblk, psi.lblk + fs.mu.RUnlock() + require_True(t, ok) + require_Equal(t, total, 2) + require_Equal(t, fblk, 92) + if fblk != 92 { + return fmt.Errorf("fblk should be 92, still %d", fblk) + } + return nil + }) } // Make sure if we only miss by one for fblk that we still update it. @@ -7147,10 +7176,14 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) { fetch := func(subj string) *psi { t.Helper() fs.mu.RLock() + var info psi psi, ok := fs.psim.Find([]byte(subj)) + if ok && psi != nil { + info = *psi + } fs.mu.RUnlock() require_True(t, ok) - return psi + return &info } psi := fetch("foo.22.bar") @@ -7173,17 +7206,22 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) { require_Equal(t, ss.Last, 7) // Now make sure that we properly updated the psim entry. - psi = fetch("foo.22.bar") - require_Equal(t, psi.total, 3) - require_Equal(t, psi.fblk, 2) - require_Equal(t, psi.lblk, 4) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + psi = fetch("foo.22.bar") + require_Equal(t, psi.total, 3) + require_Equal(t, psi.lblk, 4) + if psi.fblk != 2 { + return fmt.Errorf("fblk should be 2, still %d", psi.fblk) + } + return nil + }) // Now make sure wildcard calls into also update blks. // First remove first "foo.22.baz" which will remove first block. removed, err = fs.RemoveMsg(2) require_NoError(t, err) require_True(t, removed) - // Make sure 3 blks left + // Make sure 3 blks left. require_Equal(t, fs.numMsgBlocks(), 3) psi = fetch("foo.22.baz") @@ -7199,10 +7237,15 @@ func TestFileStoreFilteredPendingPSIMFirstBlockUpdateNextBlock(t *testing.T) { require_Equal(t, ss.First, 4) require_Equal(t, ss.Last, 8) - psi = fetch("foo.22.baz") - require_Equal(t, psi.total, 3) - require_Equal(t, psi.fblk, 2) - require_Equal(t, psi.lblk, 4) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + psi = fetch("foo.22.baz") + require_Equal(t, psi.total, 3) + require_Equal(t, psi.lblk, 4) + if psi.fblk != 2 { + return fmt.Errorf("fblk should be 2, still %d", psi.fblk) + } + return nil + }) } func TestFileStoreLargeSparseMsgsDoNotLoadAfterLast(t *testing.T) { @@ -7448,11 +7491,11 @@ func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) { _, err = fs.RemoveMsg(seq) require_NoError(t, err) } - // Now make sure we add 4th block so syncBlocks will try to compress. + // Now make sure we add 4/5th block so syncBlocks will try to compact. for i := 0; i < 6; i++ { fs.StoreMsg("foo.BB", nil, msg) } - require_Equal(t, fs.numMsgBlocks(), 4) + require_Equal(t, fs.numMsgBlocks(), 5) // All should have compact set. fs.mu.Lock() @@ -7606,6 +7649,90 @@ func TestFileStoreRestoreIndexWithMatchButLeftOverBlocks(t *testing.T) { require_Equal(t, state.LastSeq, 18) } +func TestFileStoreRestoreDeleteTombstonesExceedingMaxBlkSize(t *testing.T) { + testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { + fcfg.BlockSize = 256 + fs, err := newFileStoreWithCreated( + fcfg, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}, + time.Now(), + prf(&fcfg), + nil, + ) + require_NoError(t, err) + defer fs.Stop() + + n, err := fs.PurgeEx(_EMPTY_, 1_000_000_000, 0) + require_NoError(t, err) + require_Equal(t, n, 0) + + msg := []byte("hello") + // 6 msgs per block with blk size 256. + for i := 1; i <= 10_000; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + // Now delete msgs which will write tombstones. + for seq := uint64(1_000_000_001); seq < 1_000_000_101; seq++ { + removed, err := fs.RemoveMsg(seq) + require_NoError(t, err) + require_True(t, removed) + } + + // Check last block and make sure the tombstones did not exceed blk size maximum. + // Check to make sure no blocks exceed blk size. + fs.mu.RLock() + blks := append([]*msgBlock(nil), fs.blks...) + lmb := fs.lmb + fs.mu.RUnlock() + + var emptyBlks []*msgBlock + for _, mb := range blks { + mb.mu.RLock() + bytes, rbytes := mb.bytes, mb.rbytes + mb.mu.RUnlock() + require_True(t, bytes < 256) + require_True(t, rbytes < 256) + if bytes == 0 && mb != lmb { + emptyBlks = append(emptyBlks, mb) + } + } + // Check each block such that it signals it can be compacted but if we attempt compact here nothing should change. + for _, mb := range emptyBlks { + mb.mu.Lock() + mb.ensureRawBytesLoaded() + bytes, rbytes, shouldCompact := mb.bytes, mb.rbytes, mb.shouldCompactSync() + // Do the compact and make sure nothing changed. + mb.compact() + nbytes, nrbytes := mb.bytes, mb.rbytes + mb.mu.Unlock() + require_True(t, shouldCompact) + require_Equal(t, bytes, nbytes) + require_Equal(t, rbytes, nrbytes) + } + + // Now remove first msg which will invalidate the tombstones since they will be < first sequence. + removed, err := fs.RemoveMsg(1_000_000_000) + require_NoError(t, err) + require_True(t, removed) + + // Now simulate a syncBlocks call and make sure it cleans up the tombstones that are no longer relevant. + fs.syncBlocks() + for _, mb := range emptyBlks { + mb.mu.Lock() + mb.ensureRawBytesLoaded() + index, bytes, rbytes := mb.index, mb.bytes, mb.rbytes + mb.mu.Unlock() + require_Equal(t, bytes, 0) + require_Equal(t, rbytes, 0) + // Also make sure we removed these blks all together. + fs.mu.RLock() + imb := fs.bim[index] + fs.mu.RUnlock() + require_True(t, imb == nil) + } + }) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/jetstream.go b/server/jetstream.go index 4c56b79775f..e3f073fa95f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -214,6 +214,7 @@ func (s *Server) EnableJetStream(config *JetStreamConfig) error { cfg := *config if cfg.StoreDir == _EMPTY_ { cfg.StoreDir = filepath.Join(os.TempDir(), JetStreamStoreDir) + s.Warnf("Temporary storage directory used, data could be lost on system reboot") } // We will consistently place the 'jetstream' directory under the storedir that was handed to us. Prior to 2.2.3 though @@ -2471,6 +2472,7 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je } else { // Create one in tmp directory, but make it consistent for restarts. jsc.StoreDir = filepath.Join(os.TempDir(), "nats", JetStreamStoreDir) + s.Warnf("Temporary storage directory used, data could be lost on system reboot") } opts := s.getOpts() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4c9e0931aeb..5a44e83c21c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2397,14 +2397,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps var cist *time.Ticker var cistc <-chan time.Time + // 2 minutes plus up to 30s jitter. + checkInterestInterval := 2*time.Minute + time.Duration(rand.Intn(30))*time.Second + if mset != nil && mset.isInterestRetention() { // Wait on our consumers to be assigned and running before proceeding. // This can become important when a server has lots of assets // since we process streams first then consumers as an asset class. mset.waitOnConsumerAssignments() - // Setup a periodic check here. - // We will fire in 5s the first time then back off to 30s - cist = time.NewTicker(5 * time.Second) + // Setup our periodic check here. We will check once we have restored right away. + cist = time.NewTicker(checkInterestInterval) cistc = cist.C } @@ -2438,7 +2440,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps isRecovering = false // If we are interest based make sure to check consumers if interest retention policy. // This is to make sure we process any outstanding acks from all consumers. - mset.checkInterestState() + if mset != nil && mset.isInterestRetention() { + fire := time.Duration(rand.Intn(5)+5) * time.Second + time.AfterFunc(fire, mset.checkInterestState) + } // If we became leader during this time and we need to send a snapshot to our // followers, i.e. as a result of a scale-up from R1, do it now. if sendSnapshot && isLeader && mset != nil && n != nil { @@ -2537,7 +2542,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } case <-cistc: - cist.Reset(30 * time.Second) + cist.Reset(checkInterestInterval) // We may be adjusting some things with consumers so do this in its own go routine. go mset.checkInterestState() @@ -2704,7 +2709,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps mqch = mset.monitorQuitC() // Setup a periodic check here if we are interest based as well. if mset.isInterestRetention() { - cist = time.NewTicker(30 * time.Second) + cist = time.NewTicker(checkInterestInterval) cistc = cist.C } } @@ -2845,9 +2850,14 @@ func (mset *stream) resetClusteredState(err error) bool { return false } - // We delete our raft state. Will recreate. if node != nil { - node.Delete() + if err == errCatchupTooManyRetries { + // Don't delete all state, could've just been temporarily unable to reach the leader. + node.Stop() + } else { + // We delete our raft state. Will recreate. + node.Delete() + } } // Preserve our current state and messages unless we have a first sequence mismatch. @@ -4830,7 +4840,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // Process the change. if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { // Check our state if we are under an interest based stream. - o.checkStateForInterestStream() + if mset := o.getStream(); mset != nil { + var ss StreamState + mset.store.FastState(&ss) + o.checkStateForInterestStream(&ss) + } // Do a snapshot. doSnapshot(true) // Synchronize followers to our state. Only send out if we have state and nothing pending. @@ -4957,21 +4971,20 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea } } // Check our interest state if applicable. - if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq { - o.mu.RLock() - mset := o.mset - o.mu.RUnlock() - // Register pre-acks unless no state at all for the stream and we would create alot of pre-acks. - mset.mu.Lock() + if mset := o.getStream(); mset != nil { var ss StreamState mset.store.FastState(&ss) - // Only register if we have a valid FirstSeq. - if ss.FirstSeq > 0 { - for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ { - mset.registerPreAck(o, seq) + if err := o.checkStateForInterestStream(&ss); err == errAckFloorHigherThanLastSeq { + // Register pre-acks unless no state at all for the stream and we would create alot of pre-acks. + mset.mu.Lock() + // Only register if we have a valid FirstSeq. + if ss.FirstSeq > 0 { + for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ { + mset.registerPreAck(o, seq) + } } + mset.mu.Unlock() } - mset.mu.Unlock() } } @@ -8290,7 +8303,7 @@ RETRY: } numRetries++ - if numRetries >= maxRetries { + if numRetries > maxRetries { // Force a hard reset here. return errCatchupTooManyRetries } diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 0388b0817f4..a78c35dc7f3 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7408,6 +7408,189 @@ func TestJetStreamClusterR1ConsumerAdvisory(t *testing.T) { checkSubsPending(t, sub, 2) } +func TestJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.WorkQueuePolicy, + }) + require_NoError(t, err) + + // Load up a bunch of messages for three different subjects. + msg := bytes.Repeat([]byte("Z"), 4096) + for i := 0; i < 100_000; i++ { + js.PublishAsync("foo.foo", msg) + } + for i := 0; i < 5_000; i++ { + js.PublishAsync("foo.bar", msg) + js.PublishAsync("foo.baz", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // We will not process this one and leave it as "offline". + _, err = js.PullSubscribe("foo.foo", "A") + require_NoError(t, err) + subB, err := js.PullSubscribe("foo.bar", "B") + require_NoError(t, err) + subC, err := js.PullSubscribe("foo.baz", "C") + require_NoError(t, err) + + // Now catch up both B and C but let A simulate being offline of very behind. + for i := 0; i < 5; i++ { + for _, sub := range []*nats.Subscription{subB, subC} { + msgs, err := sub.Fetch(1000) + require_NoError(t, err) + require_Equal(t, len(msgs), 1000) + for _, m := range msgs { + m.Ack() + } + } + } + // Let acks process. + nc.Flush() + time.Sleep(200 * time.Millisecond) + + // Now test the check checkInterestState() on the stream. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + expireAllBlks := func() { + mset.mu.RLock() + fs := mset.store.(*fileStore) + mset.mu.RUnlock() + fs.mu.RLock() + for _, mb := range fs.blks { + mb.tryForceExpireCache() + } + fs.mu.RUnlock() + } + + // First expire all the blocks. + expireAllBlks() + + start := time.Now() + mset.checkInterestState() + elapsed := time.Since(start) + // This is actually ~300 microseconds but due to travis and race flags etc. + // Was > 30 ms before fix for comparison, M2 macbook air. + require_True(t, elapsed < 5*time.Millisecond) + + // Make sure we set the chkflr correctly. + checkFloor := func(o *consumer) uint64 { + require_True(t, o != nil) + o.mu.RLock() + defer o.mu.RUnlock() + return o.chkflr + } + + require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001) + require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001) + + // Expire all the blocks again. + expireAllBlks() + + // This checks the chkflr state. + start = time.Now() + mset.checkInterestState() + require_True(t, time.Since(start) < elapsed) +} + +func TestJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.*"}, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + // We will not process this one and leave it as "offline". + _, err = js.PullSubscribe("foo.foo", "A") + require_NoError(t, err) + _, err = js.PullSubscribe("foo.*", "B") + require_NoError(t, err) + // Make subC multi-subject. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "C", + FilterSubjects: []string{"foo.foo", "foo.bar", "foo.baz"}, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Load up a bunch of messages for three different subjects. + msg := bytes.Repeat([]byte("Z"), 4096) + for i := 0; i < 90_000; i++ { + js.PublishAsync("foo.foo", msg) + } + for i := 0; i < 5_000; i++ { + js.PublishAsync("foo.bar", msg) + js.PublishAsync("foo.baz", msg) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Now catch up both B and C but let A simulate being offline of very behind. + // Will do this manually here to speed up tests. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + + for _, cname := range []string{"B", "C"} { + o := mset.lookupConsumer(cname) + o.mu.Lock() + o.setStoreState(&ConsumerState{ + Delivered: SequencePair{100_000, 100_000}, + AckFloor: SequencePair{100_000, 100_000}, + }) + o.mu.Unlock() + } + + // Now test the check checkInterestState() on the stream. + start := time.Now() + mset.checkInterestState() + elapsed := time.Since(start) + + // Make sure we set the chkflr correctly. + checkFloor := func(o *consumer) uint64 { + require_True(t, o != nil) + o.mu.RLock() + defer o.mu.RUnlock() + return o.chkflr + } + + require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) + + // This checks the chkflr state. For this test this should be much faster, + // two orders of magnitude then the first time. + start = time.Now() + mset.checkInterestState() + require_True(t, time.Since(start) < elapsed/100) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 501e2322a62..ec483a0c7d2 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3700,3 +3700,76 @@ func TestJetStreamConsumerReplicasAfterScale(t *testing.T) { require_Equal(t, ci.Config.Replicas, 3) require_Equal(t, len(ci.Cluster.Replicas), 2) } + +func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() + + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader + }) + + // Publish 10 messages. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+1)) + } + + outdatedServerName := servers[0] + clusterResetServerName := servers[1] + + outdatedServer := c.serverByName(outdatedServerName) + outdatedServer.Shutdown() + outdatedServer.WaitForShutdown() + + // Publish 10 more messages, one server will be behind. + for i := 0; i < 10; i++ { + pubAck, err := js.Publish("foo", []byte("ok")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, uint64(i+11)) + } + + // We will not need the client anymore. + nc.Close() + + // Shutdown stream leader so one server remains. + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() + + clusterResetServer := c.serverByName(clusterResetServerName) + acc, err := clusterResetServer.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + + // Too many retries while processing snapshot is considered a cluster reset. + // If a leader is temporarily unavailable we shouldn't blow away our state. + require_True(t, isClusterResetErr(errCatchupTooManyRetries)) + mset.resetClusteredState(errCatchupTooManyRetries) + + // Stream leader stays offline, we only start the server with missing stream data. + // We expect that the reset server must not allow the outdated server to become leader, as that would result in desync. + c.restartServer(outdatedServer) + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Outdated server must NOT become the leader. + newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") + require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) +} diff --git a/server/raft.go b/server/raft.go index 165cbbe91fb..1d75ec3a383 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3457,13 +3457,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } + // Make a copy of these values, as the AppendEntry might be cached and returned to the pool in applyCommit. + aeCommit := ae.commit + aeReply := ae.reply + // Apply anything we need here. - if ae.commit > n.commit { + if aeCommit > n.commit { if n.paused { - n.hcommit = ae.commit - n.debug("Paused, not applying %d", ae.commit) + n.hcommit = aeCommit + n.debug("Paused, not applying %d", aeCommit) } else { - for index := n.commit + 1; index <= ae.commit; index++ { + for index := n.commit + 1; index <= aeCommit; index++ { if err := n.applyCommit(index); err != nil { break } @@ -3479,7 +3483,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Success. Send our response. if ar != nil { - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) + n.sendRPC(aeReply, _EMPTY_, ar.encode(arbuf)) arPool.Put(ar) } } diff --git a/server/stream.go b/server/stream.go index 428891cf9d8..bfc75b3c1c0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1970,6 +1970,8 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) for _, c := range mset.consumers { toUpdate = append(toUpdate, c) } + var ss StreamState + mset.store.FastState(&ss) mset.mu.Unlock() for _, c := range toUpdate { c.mu.Lock() @@ -1978,7 +1980,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) if c.retention == InterestPolicy { // If we're switching to interest, force a check of the // interest of existing stream messages. - c.checkStateForInterestStream() + c.checkStateForInterestStream(&ss) } } mset.mu.Lock() @@ -5303,8 +5305,11 @@ func (mset *stream) checkInterestState() { return } + var ss StreamState + mset.store.FastState(&ss) + for _, o := range mset.getConsumers() { - o.checkStateForInterestStream() + o.checkStateForInterestStream(&ss) } } @@ -5901,7 +5906,7 @@ func (mset *stream) checkForOrphanMsgs() { mset.mu.RUnlock() for _, o := range consumers { - if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq { + if err := o.checkStateForInterestStream(&ss); err == errAckFloorHigherThanLastSeq { o.mu.RLock() s, consumer := o.srv, o.name state, _ := o.store.State() diff --git a/server/stree/stree.go b/server/stree/stree.go index d9167d94f7c..a289a629742 100644 --- a/server/stree/stree.go +++ b/server/stree/stree.go @@ -55,6 +55,11 @@ func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) { return nil, false } + // Make sure we never insert anything with a noPivot byte. + if bytes.IndexByte(subject, noPivot) >= 0 { + return nil, false + } + old, updated := t.insert(&t.root, subject, value, 0) if !updated { t.size++ @@ -151,7 +156,7 @@ func (t *SubjectTree[T]) insert(np *node, subject []byte, value T, si int) (*T, ln.setSuffix(ln.suffix[cpi:]) si += cpi // Make sure we have different pivot, normally this will be the case unless we have overflowing prefixes. - if p := pivot(ln.suffix, 0); si < len(subject) && p == subject[si] { + if p := pivot(ln.suffix, 0); cpi > 0 && si < len(subject) && p == subject[si] { // We need to split the original leaf. Recursively call into insert. t.insert(np, subject, value, si) // Now add the update version of *np as a child to the new node4. diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index 7207e7e6910..2a84d3c7598 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -246,7 +246,7 @@ func TestSubjectTreeConstruction(t *testing.T) { st.Insert(b("foo.bar"), 42) checkNode := func(an *node, kind string, pors string, numChildren uint16) { - // t.Helper() + t.Helper() require_True(t, an != nil) n := *an require_True(t, n != nil) @@ -258,7 +258,7 @@ func TestSubjectTreeConstruction(t *testing.T) { checkNode(&st.root, "NODE4", "foo.ba", 2) nn := st.root.findChild('r') checkNode(nn, "NODE4", "r", 2) - checkNode((*nn).findChild(0), "LEAF", "", 0) + checkNode((*nn).findChild(noPivot), "LEAF", "", 0) rnn := (*nn).findChild('.') checkNode(rnn, "NODE4", ".", 3) checkNode((*rnn).findChild('A'), "LEAF", "A", 0) @@ -804,3 +804,41 @@ func TestSubjectTreeNilNoPanic(t *testing.T) { _, found = st.Insert([]byte("foo"), 22) require_False(t, found) } + +// This bug requires the trailing suffix contain repeating nulls \x00 +// and the second subject be longer with more nulls. +func TestSubjectTreeInsertLongerLeafSuffixWithTrailingNulls(t *testing.T) { + st := NewSubjectTree[int]() + subj := []byte("foo.bar.baz_") + // add in 10 nulls. + for i := 0; i < 10; i++ { + subj = append(subj, 0) + } + + st.Insert(subj, 1) + // add in 10 more nulls. + subj2 := subj + for i := 0; i < 10; i++ { + subj2 = append(subj, 0) + } + st.Insert(subj2, 2) + + // Make sure we can look them up. + v, found := st.Find(subj) + require_True(t, found) + require_Equal(t, *v, 1) + v, found = st.Find(subj2) + require_True(t, found) + require_Equal(t, *v, 2) +} + +// Make sure the system does not insert any subject with the noPivot (DEL) in it. +func TestSubjectTreeInsertWithNoPivot(t *testing.T) { + st := NewSubjectTree[int]() + subj := []byte("foo.bar.baz.") + subj = append(subj, noPivot) + old, updated := st.Insert(subj, 22) + require_True(t, old == nil) + require_False(t, updated) + require_Equal(t, st.Size(), 0) +} diff --git a/server/stree/util.go b/server/stree/util.go index 585800aba0a..108f78fda92 100644 --- a/server/stree/util.go +++ b/server/stree/util.go @@ -44,10 +44,14 @@ func copyBytes(src []byte) []byte { type position interface{ int | uint16 } -// Can return 0 if we have all the subject as prefixes. +// No pivot available. +const noPivot = byte(127) + +// Can return 127 (DEL) if we have all the subject as prefixes. +// We used to use 0, but when that was in the subject would cause infinite recursion in some situations. func pivot[N position](subject []byte, pos N) byte { if int(pos) >= len(subject) { - return 0 + return noPivot } return subject[pos] } diff --git a/server/sublist.go b/server/sublist.go index 67eb88ae079..7171eef27ad 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -1181,6 +1181,10 @@ func isValidSubject(subject string, checkRunes bool) bool { return false } if checkRunes { + // Check if we have embedded nulls. + if bytes.IndexByte(stringToBytes(subject), 0) >= 0 { + return false + } // Since casting to a string will always produce valid UTF-8, we need to look for replacement runes. // This signals something is off or corrupt. for _, r := range subject { diff --git a/server/sublist_test.go b/server/sublist_test.go index beab4334129..c5718f8b0d9 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -634,6 +634,11 @@ func TestSublistValidSubjects(t *testing.T) { checkBool(IsValidSubject("foo.>bar"), true, t) checkBool(IsValidSubject("foo>.bar"), true, t) checkBool(IsValidSubject(">bar"), true, t) + + // Check for embedded nulls. + subj := []byte("foo.bar.baz.") + subj = append(subj, 0) + checkBool(isValidSubject(string(subj), true), false, t) } func TestSublistMatchLiterals(t *testing.T) { @@ -697,7 +702,6 @@ func TestValidateDestinationSubject(t *testing.T) { checkError(ValidateMappingDestination("foo.{{unknown(1)}}"), ErrInvalidMappingDestination, t) checkError(ValidateMappingDestination("foo..}"), ErrInvalidMappingDestination, t) checkError(ValidateMappingDestination("foo. bar}"), ErrInvalidMappingDestinationSubject, t) - } func TestSubjectToken(t *testing.T) {