From bcdbb2e03d4573bb36d0f20b6a2d8c898f7192fd Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 4 Apr 2024 13:33:11 -0700 Subject: [PATCH] Make sure to hold lock for duration of truncate, was calling slotInfo unprotected. Signed-off-by: Derek Collison --- server/filestore.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 25c0d7e13d6..2056f755e5d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4313,8 +4313,11 @@ func (mb *msgBlock) eraseMsg(seq uint64, ri, rl int) error { // Truncate this message block to the storedMsg. func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { + mb.mu.Lock() + defer mb.mu.Unlock() + // Make sure we are loaded to process messages etc. - if err := mb.loadMsgs(); err != nil { + if err := mb.loadMsgsWithLock(); err != nil { return 0, 0, err } @@ -4328,8 +4331,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { var purged, bytes uint64 - mb.mu.Lock() - checkDmap := mb.dmap.Size() > 0 var smv StoreMsg @@ -4365,13 +4366,11 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { if mb.cmp != NoCompression { buf, err := mb.loadBlock(nil) if err != nil { - mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to load block from disk: %w", err) } if mb.bek != nil && len(buf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { - mb.mu.Unlock() return 0, 0, err } mb.bek = bek @@ -4379,14 +4378,12 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } buf, err = mb.decompressIfNeeded(buf) if err != nil { - mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to decompress block: %w", err) } buf = buf[:eof] copy(mb.lchk[0:], buf[:len(buf)-checksumSize]) buf, err = mb.cmp.Compress(buf) if err != nil { - mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to recompress block: %w", err) } meta := &CompressionInfo{ @@ -4397,7 +4394,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { if mb.bek != nil && len(buf) > 0 { bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce) if err != nil { - mb.mu.Unlock() return 0, 0, err } mb.bek = bek @@ -4405,11 +4401,9 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { } n, err := mb.writeAt(buf, 0) if err != nil { - mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to rewrite compressed block: %w", err) } if n != len(buf) { - mb.mu.Unlock() return 0, 0, fmt.Errorf("short write (%d != %d)", n, len(buf)) } mb.mfd.Truncate(int64(len(buf))) @@ -4422,7 +4416,6 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { mb.mfd.ReadAt(lchk[:], eof-8) copy(mb.lchk[0:], lchk[:]) } else { - mb.mu.Unlock() return 0, 0, fmt.Errorf("failed to truncate msg block %d, file not open", mb.index) } @@ -4436,10 +4429,8 @@ func (mb *msgBlock) truncate(sm *StoreMsg) (nmsgs, nbytes uint64, err error) { // Redo per subject info for this block. mb.resetPerSubjectInfo() - mb.mu.Unlock() - // Load msgs again. - mb.loadMsgs() + mb.loadMsgsWithLock() return purged, bytes, nil }