From f6400f0d95e72da79ae4cc932b95fddf7ae09c53 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 30 Mar 2024 09:32:04 -0700 Subject: [PATCH] Fixed a bug that would cause extended purge or compact to fail in memstore if upto sequence was not found, e.g. deleted already. Signed-off-by: Derek Collison --- server/memstore.go | 16 ++++++++-------- server/memstore_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/server/memstore.go b/server/memstore.go index 7aae6f65775..780f7b533c0 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -842,15 +842,15 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) { ms.mu.Lock() cb := ms.scb if seq <= ms.state.LastSeq { - sm, ok := ms.msgs[seq] - if !ok { - ms.mu.Unlock() - return 0, ErrStoreMsgNotFound - } fseq := ms.state.FirstSeq - ms.state.FirstSeq = seq - ms.state.FirstTime = time.Unix(0, sm.ts).UTC() - + // Determine new first sequence. + for ; seq <= ms.state.LastSeq; seq++ { + if sm, ok := ms.msgs[seq]; ok { + ms.state.FirstSeq = seq + ms.state.FirstTime = time.Unix(0, sm.ts).UTC() + break + } + } for seq := seq - 1; seq >= fseq; seq-- { if sm := ms.msgs[seq]; sm != nil { bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg) diff --git a/server/memstore_test.go b/server/memstore_test.go index 143c38b0e4c..3c049233447 100644 --- a/server/memstore_test.go +++ b/server/memstore_test.go @@ -1022,3 +1022,32 @@ func TestMemStoreMultiLastSeqsMaxAllowed(t *testing.T) { require_True(t, seqs == nil) require_Error(t, err, ErrTooManyResults) } + +// Bug would cause PurgeEx to fail if it encountered a deleted msg at sequence to delete up to. +func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) { + cfg := &StreamConfig{ + Name: "zzz", + Subjects: []string{"foo"}, + Storage: MemoryStorage, + } + ms, err := newMemStore(cfg) + require_NoError(t, err) + defer ms.Stop() + + msg := []byte("abc") + for i := 1; i <= 10; i++ { + ms.StoreMsg("foo", nil, msg) + } + ms.RemoveMsg(2) + ms.RemoveMsg(9) // This was the bug + + n, err := ms.PurgeEx(_EMPTY_, 9, 0) + require_NoError(t, err) + require_Equal(t, n, 7) + + var state StreamState + ms.FastState(&state) + require_Equal(t, state.FirstSeq, 10) + require_Equal(t, state.LastSeq, 10) + require_Equal(t, state.Msgs, 1) +}