Skip to content

Commit

Permalink
[FIXED] Bug that would cause extended purge or compact to fail. (#5264)
Browse files Browse the repository at this point in the history
In memstore if upto sequence was not found, e.g. deleted already, the
compact or purge command would fail.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Apr 3, 2024
2 parents 6840c4d + f6400f0 commit 546f8e7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
16 changes: 8 additions & 8 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,15 +842,15 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.mu.Lock()
cb := ms.scb
if seq <= ms.state.LastSeq {
sm, ok := ms.msgs[seq]
if !ok {
ms.mu.Unlock()
return 0, ErrStoreMsgNotFound
}
fseq := ms.state.FirstSeq
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()

// Determine new first sequence.
for ; seq <= ms.state.LastSeq; seq++ {
if sm, ok := ms.msgs[seq]; ok {
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
break
}
}
for seq := seq - 1; seq >= fseq; seq-- {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
Expand Down
29 changes: 29 additions & 0 deletions server/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,32 @@ func TestMemStoreMultiLastSeqsMaxAllowed(t *testing.T) {
require_True(t, seqs == nil)
require_Error(t, err, ErrTooManyResults)
}

// Bug would cause PurgeEx to fail if it encountered a deleted msg at sequence to delete up to.
func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{"foo"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
defer ms.Stop()

msg := []byte("abc")
for i := 1; i <= 10; i++ {
ms.StoreMsg("foo", nil, msg)
}
ms.RemoveMsg(2)
ms.RemoveMsg(9) // This was the bug

n, err := ms.PurgeEx(_EMPTY_, 9, 0)
require_NoError(t, err)
require_Equal(t, n, 7)

var state StreamState
ms.FastState(&state)
require_Equal(t, state.FirstSeq, 10)
require_Equal(t, state.LastSeq, 10)
require_Equal(t, state.Msgs, 1)
}

0 comments on commit 546f8e7

Please sign in to comment.