diff --git a/server/filestore.go b/server/filestore.go index 606c6143ae5..a0a77b5f713 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -436,6 +436,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Attempt to recover our state. err = fs.recoverFullState() if err != nil { + if !os.IsNotExist(err) { + fs.warn("Recovering stream state from index errored: %v", err) + } // Hold onto state prior := fs.state // Reset anything that could have been set from above. @@ -469,7 +472,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim go fs.cleanupOldMeta() }() - // Lock while do enforcements and removals. + // Lock while we do enforcements and removals. fs.mu.Lock() // Check if we have any left over tombstones to process. @@ -975,7 +978,6 @@ func (mb *msgBlock) ensureLastChecksumLoaded() { // Lock held on entry func (fs *fileStore) recoverMsgBlock(index uint32) (*msgBlock, error) { mb := fs.initMsgBlock(index) - // Open up the message file, but we will try to recover from the index file. // We will check that the last checksums match. file, err := mb.openBlock() @@ -1357,6 +1359,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { minTombstoneTs int64 ) + // To detect gaps from compaction. + var last uint64 + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { if index+msgHdrSize > lbuf { truncate(index) @@ -1444,8 +1449,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { mb.bytes += uint64(rl) } + // Check for any gaps from compaction, meaning no ebit entry. + if last > 0 && seq != last+1 { + for dseq := last + 1; dseq < seq; dseq++ { + addToDmap(dseq) + } + } + // Always set last - atomic.StoreUint64(&mb.last.seq, seq) + last = seq + atomic.StoreUint64(&mb.last.seq, last) mb.last.ts = ts // Advance to next record. @@ -1665,7 +1678,8 @@ func (fs *fileStore) recoverFullState() (rerr error) { for i := 0; i < int(numBlocks); i++ { index, nbytes, fseq, fts, lseq, lts, numDeleted := uint32(readU64()), readU64(), readU64(), readI64(), readU64(), readI64(), readU64() if bi < 0 { - break + os.Remove(fn) + return errCorruptState } mb := fs.initMsgBlock(index) atomic.StoreUint64(&mb.first.seq, fseq) @@ -1734,6 +1748,12 @@ func (fs *fileStore) recoverFullState() (rerr error) { return errPriorState } if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched { + // If we are tracking max msgs per subject and we are not up to date we should rebuild. + if fs.cfg.MaxMsgsPer > 0 { + fs.warn("Stream state block state outdated, will rebuild") + return errPriorState + } + // Remove the last message block since recover will add in the new one. fs.removeMsgBlockFromList(mb) // Reverse update of tracking state for this mb, will add new state in below. @@ -1776,11 +1796,19 @@ func (fs *fileStore) recoverFullState() (rerr error) { // Update top level accounting if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq { fs.state.FirstSeq = fseq - fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() + if nmb.first.ts == 0 { + fs.state.FirstTime = time.Time{} + } else { + fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC() + } } if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq { fs.state.LastSeq = lseq - fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() + if mb.last.ts == 0 { + fs.state.LastTime = time.Time{} + } else { + fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC() + } } fs.state.Msgs += nmb.msgs fs.state.Bytes += nmb.bytes @@ -5415,7 +5443,8 @@ func (mb *msgBlock) ensureRawBytesLoaded() error { // Sync msg and index files as needed. This is called from a timer. func (fs *fileStore) syncBlocks() { fs.mu.RLock() - if fs.closed { + // If closed or a snapshot is in progress bail. + if fs.closed || fs.sips > 0 { fs.mu.RUnlock() return } @@ -6899,6 +6928,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } var smv StoreMsg + var tombs []msgId fs.mu.Lock() // We may remove blocks as we purge, so don't range directly on fs.blks @@ -6952,9 +6982,11 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint purged++ bytes += rl } - // FSS updates. + // PSIM and FSS updates. mb.removeSeqPerSubject(sm.subj, seq) fs.removePerSubject(sm.subj) + // Track tombstones we need to write. + tombs = append(tombs, msgId{sm.seq, sm.ts}) // Check for first message. if seq == atomic.LoadUint64(&mb.first.seq) { @@ -6993,7 +7025,16 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint if firstSeqNeedsUpdate { fs.selectNextFirst() } + fseq := fs.state.FirstSeq + + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq > fseq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() @@ -7036,7 +7077,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { fs.bim = make(map[uint32]*msgBlock) // Clear any per subject tracking. fs.psim, fs.tsl = fs.psim.Empty(), 0 - // Mark dirty + // Mark dirty. fs.dirty++ // Move the msgs directory out of the way, will delete out of band. @@ -7092,6 +7133,11 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) { cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -rbytes, 0, _EMPTY_) } @@ -7286,11 +7332,19 @@ SKIP: } fs.state.Bytes -= bytes + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil && purged > 0 { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -7351,6 +7405,40 @@ func (fs *fileStore) reset() error { return nil } +// Return all active tombstones in this msgBlock. +// Write lock should be held. +func (mb *msgBlock) tombs() []msgId { + var tombs []msgId + + if !mb.cacheAlreadyLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return nil + } + } + + var le = binary.LittleEndian + buf := mb.cache.buf + + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { + if index+msgHdrSize > lbuf { + return tombs + } + hdr := buf[index : index+msgHdrSize] + rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:]) + // Clear any headers bit that could be set. + rl &^= hbit + // Check for tombstones. + if seq&tbit != 0 { + ts := int64(le.Uint64(hdr[12:])) + tombs = append(tombs, msgId{seq &^ tbit, ts}) + } + // Advance to next record. + index += rl + } + + return tombs +} + // Truncate will truncate a stream store up to seq. Sequence needs to be valid. func (fs *fileStore) Truncate(seq uint64) error { // Check for request to reset. @@ -7386,6 +7474,10 @@ func (fs *fileStore) Truncate(seq uint64) error { fs.mu.Unlock() return err } + // Collect all tombstones, we want to put these back so we can survive + // a restore without index.db properly. + var tombs []msgId + tombs = append(tombs, nlmb.tombs()...) var purged, bytes uint64 @@ -7403,6 +7495,8 @@ func (fs *fileStore) Truncate(seq uint64) error { getLastMsgBlock := func() *msgBlock { return fs.blks[len(fs.blks)-1] } for mb := getLastMsgBlock(); mb != nlmb; mb = getLastMsgBlock() { mb.mu.Lock() + // We do this to load tombs. + tombs = append(tombs, mb.tombs()...) purged += mb.msgs bytes += mb.bytes fs.removeMsgBlock(mb) @@ -7425,11 +7519,29 @@ func (fs *fileStore) Truncate(seq uint64) error { // Reset our subject lookup info. fs.resetGlobalPerSubjectInfo() + // Always create new write block. + fs.newMsgBlockForWrite() + + // Write any tombstones as needed. + for _, tomb := range tombs { + if tomb.seq <= lsm.seq { + fs.lmb.writeTombstone(tomb.seq, tomb.ts) + } + } + + // Any existing state file no longer applicable. We will force write a new one + // after we release the lock. + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) fs.dirty++ cb := fs.scb fs.mu.Unlock() + // Force a new index.db to be written. + if purged > 0 { + fs.forceWriteFullState() + } + if cb != nil { cb(-int64(purged), -int64(bytes), 0, _EMPTY_) } @@ -8251,26 +8363,6 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { msgPre := msgDir + "/" var bbuf []byte - const minLen = 32 - sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) - if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { - if fs.aek != nil { - ns := fs.aek.NonceSize() - buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) - if err == nil { - // Redo hash checksum at end on plaintext. - fs.mu.Lock() - hh.Reset() - hh.Write(buf) - buf = fs.hh.Sum(buf) - fs.mu.Unlock() - } - } - if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { - return - } - } - // Now do messages themselves. for _, mb := range blks { if mb.pendingWriteSize() > 0 { @@ -8309,6 +8401,30 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, includeConsumers bool) { } } + // Do index.db last. We will force a write as well. + // Write out full state as well before proceeding. + if err := fs.forceWriteFullState(); err == nil { + const minLen = 32 + sfn := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + if buf, err := os.ReadFile(sfn); err == nil && len(buf) >= minLen { + if fs.aek != nil { + ns := fs.aek.NonceSize() + buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:len(buf)-highwayhash.Size64], nil) + if err == nil { + // Redo hash checksum at end on plaintext. + fs.mu.Lock() + hh.Reset() + hh.Write(buf) + buf = fs.hh.Sum(buf) + fs.mu.Unlock() + } + } + if err == nil && writeFile(msgPre+streamStreamStateFile, buf) != nil { + return + } + } + } + // Bail if no consumers requested. if !includeConsumers { return @@ -8381,9 +8497,6 @@ func (fs *fileStore) Snapshot(deadline time.Duration, checkMsgs, includeConsumer } } - // Write out full state as well before proceeding. - fs.writeFullState() - pr, pw := net.Pipe() // Set a write deadline here to protect ourselves. diff --git a/server/filestore_test.go b/server/filestore_test.go index 243155902f6..0868fcec02a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -1051,6 +1051,19 @@ func TestFileStoreStreamTruncate(t *testing.T) { mb := fs.getFirstBlock() require_True(t, mb != nil) require_NoError(t, mb.loadMsgs()) + + // Also make sure we can recover properly with no index.db present. + // We want to make sure we preserve tombstones from any blocks being deleted. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) + } }) } @@ -3635,11 +3648,16 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fcfg.BlockSize = 1000 cfg := StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Storage: FileStorage} - fs, err := newFileStoreWithCreated(fcfg, cfg, time.Now(), prf(&fcfg), nil) + created := time.Now() + fs, err := newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) require_NoError(t, err) defer fs.Stop() payload := make([]byte, 20) + + _, _, err = fs.StoreMsg("foo.0", nil, payload) + require_NoError(t, err) + total := 200 for i := 0; i < total; i++ { _, _, err = fs.StoreMsg("foo.1", nil, payload) @@ -3648,13 +3666,38 @@ func TestFileStorePurgeExWithSubject(t *testing.T) { _, _, err = fs.StoreMsg("foo.2", nil, []byte("xxxxxx")) require_NoError(t, err) - // This should purge all. + // This should purge all "foo.1" p, err := fs.PurgeEx("foo.1", 1, 0) require_NoError(t, err) - require_True(t, int(p) == total) - require_True(t, int(p) == total) - require_True(t, fs.State().Msgs == 1) - require_True(t, fs.State().FirstSeq == 201) + require_Equal(t, p, uint64(total)) + + state := fs.State() + require_Equal(t, state.Msgs, 2) + require_Equal(t, state.FirstSeq, 1) + + // Make sure we can recover same state. + fs.Stop() + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + before := state + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v", before, state) + } + + // Also make sure we can recover properly with no index.db present. + // We want to make sure we preserve any tombstones from the subject based purge. + fs.Stop() + os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile)) + + fs, err = newFileStoreWithCreated(fcfg, cfg, created, prf(&fcfg), nil) + require_NoError(t, err) + defer fs.Stop() + + if state := fs.State(); !reflect.DeepEqual(state, before) { + t.Fatalf("Expected state of %+v, got %+v without index.db state", before, state) + } }) } @@ -7557,6 +7600,47 @@ func TestFileStoreSyncCompressOnlyIfDirty(t *testing.T) { require_False(t, noCompact) } +// This test is for deleted interior message tracking after compaction from limits based deletes, meaning no tombstones. +// Bug was that dmap would not be properly be hydrated after the compact from rebuild. But we did so in populateGlobalInfo. +// So this is just to fix a bug in rebuildState tracking gaps after a compact. +func TestFileStoreDmapBlockRecoverAfterCompact(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 256}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + + // 6 msgs per block. + // Fill the first block. + for i := 1; i <= 6; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 1) + + // Now create holes in the first block via the max msgs per subject of 1. + for i := 2; i < 6; i++ { + fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 2) + // Compact and rebuild the first blk. Do not have it call indexCacheBuf which will fix it up. + mb := fs.getFirstBlock() + mb.mu.Lock() + mb.compact() + // Empty out dmap state. + mb.dmap.Empty() + ld, tombs, err := mb.rebuildStateLocked() + dmap := mb.dmap.Clone() + mb.mu.Unlock() + + require_NoError(t, err) + require_Equal(t, ld, nil) + require_Equal(t, len(tombs), 0) + require_Equal(t, dmap.Size(), 4) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// diff --git a/server/norace_test.go b/server/norace_test.go index 7de9c949eec..74e76dacdc6 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10838,3 +10838,68 @@ func TestNoRaceJetStreamStandaloneDontReplyToAckBeforeProcessingIt(t *testing.T) } } } + +// Under certain scenarios an old index.db with a stream that has max msgs per set will not restore properly +// due to and old index.db and compaction after the index.db took place which could lose per subject information. +func TestNoRaceFileStoreMaxMsgsPerSubjectAndOldRecoverState(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + msg := make([]byte, 1024) + + for i := 0; i < 10_000; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, msg) + } + + // This will write the index.db file. We will capture this and use it to replace a new one. + sfile := filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile) + fs.Stop() + _, err = os.Stat(sfile) + require_NoError(t, err) + + // Read it in and make sure len > 0. + buf, err := os.ReadFile(sfile) + require_NoError(t, err) + require_True(t, len(buf) > 0) + + // Restart + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + // Put in more messages with wider range. This will compact a bunch of the previous blocks. + for i := 0; i < 1_000_001; i++ { + subj := fmt.Sprintf("foo.%d", i) + fs.StoreMsg(subj, nil, msg) + } + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 10_001) + require_Equal(t, ss.LastSeq, 1_010_001) + require_Equal(t, ss.Msgs, 1_000_001) + + // Now stop again, but replace index.db with old one. + fs.Stop() + // Put back old stream state. + require_NoError(t, os.WriteFile(sfile, buf, defaultFilePerms)) + + // Restart + fs, err = newFileStore( + FileStoreConfig{StoreDir: sd}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1}) + require_NoError(t, err) + defer fs.Stop() + + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 10_001) + require_Equal(t, ss.LastSeq, 1_010_001) + require_Equal(t, ss.Msgs, 1_000_001) +}