Skip to content

Commit

Permalink
kvserver: Clean up empty range directories after snapshots
Browse files Browse the repository at this point in the history
Previously, we were creating subdirectories for ranges and
range snapshots in the auxiliary directory every time we
accepted a snapshot, but only cleaning up the snapshot
subdirectories after a snapshot scratch space closed. This
left empty parent range directories around on the FS,
slowing down future calls to Pebble.Capacity() and indirectly
slowing down AddSSTable in the future.

This change adds code to clean up empty range directories
in the aux directory if they're not being used. Some coordination
and synchronization code had to be added to ensure we wouldn't
remove a directory that was just created by a concurrent snapshot.

Fixes cockroachdb#83137.

Release note (performance improvement): Addresses issue where
imports and rebalances were being slowed down due to the accumulation
of empty directories from range snapshot applications.
  • Loading branch information
itsbilal committed Jul 8, 2022
1 parent 3b22cdd commit cd76112
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 5 deletions.
57 changes: 55 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
Expand All @@ -31,6 +32,10 @@ type SSTSnapshotStorage struct {
engine storage.Engine
limiter *rate.Limiter
dir string
mu struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
}
}

// NewSSTSnapshotStorage creates a new SST snapshot storage.
Expand All @@ -42,14 +47,26 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
}
}

// Init initializes the SSTSnapshotStorage struct.
func (s *SSTSnapshotStorage) Init() {
s.mu.ranges = make(map[roachpb.RangeID]int)
}

// NewScratchSpace creates a new storage scratch space for SSTs for a specific
// snapshot.
func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
rangeStorage := s.mu.ranges[rangeID]
if rangeStorage == 0 {
s.mu.ranges[rangeID] = 1
}
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
storage: s,
rangeID: rangeID,
snapDir: snapDir,
}
}
Expand All @@ -59,14 +76,36 @@ func (s *SSTSnapshotStorage) Clear() error {
return s.engine.RemoveAll(s.dir)
}

// scratchClosed is called when an SSTSnapshotStorageScratch created by this
// SSTSnapshotStorage is closed. This method handles any
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.ranges[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.ranges[rangeID] = val
if val == 0 {
delete(s.mu.ranges, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
_ = s.engine.RemoveAll(filepath.Join(s.dir, strconv.Itoa(int(rangeID))))
}
}

// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
// when receiving a snapshot. Each scratch is associated with a specific
// snapshot.
type SSTSnapshotStorageScratch struct {
storage *SSTSnapshotStorage
rangeID roachpb.RangeID
ssts []string
snapDir string
dirCreated bool
closed bool
}

func (s *SSTSnapshotStorageScratch) filename(id int) string {
Expand All @@ -87,6 +126,9 @@ func (s *SSTSnapshotStorageScratch) createDir() error {
func (s *SSTSnapshotStorageScratch) NewFile(
ctx context.Context, bytesPerSync int64,
) (*SSTSnapshotStorageFile, error) {
if s.closed {
panic("closed")
}
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
Expand All @@ -103,6 +145,9 @@ func (s *SSTSnapshotStorageScratch) NewFile(
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
if s.closed {
panic("closed")
}
if len(data) == 0 {
return nil
}
Expand All @@ -129,8 +174,13 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string {
return s.ssts
}

// Clear removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Clear() error {
// Close removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.storage.scratchClosed(s.rangeID)
return s.storage.engine.RemoveAll(s.snapDir)
}

Expand All @@ -157,6 +207,9 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
return err
}
}
if f.scratch.closed {
panic("scratch closed")
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvserver
import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -40,6 +42,7 @@ func TestSSTSnapshotStorage(t *testing.T) {
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
sstSnapshotStorage.Init()
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)

// Check that the storage lazily creates the directories on first write.
Expand Down Expand Up @@ -93,12 +96,18 @@ func TestSSTSnapshotStorage(t *testing.T) {
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Clear removes the directory.
require.NoError(t, scratch.Clear())
// Check that Close removes the snapshot directory as well as the range
// directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(scratch.rangeID)))
_, err = eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
Expand All @@ -122,6 +131,7 @@ func TestSSTSnapshotStorageContextCancellation(t *testing.T) {
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
sstSnapshotStorage.Init()
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)

var cancel func()
Expand Down Expand Up @@ -159,6 +169,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) {
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
sstSnapshotStorage.Init()
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,7 @@ func NewStore(
// it can clean it up. If this fails it's not a correctness issue since the
// storage is also cleared before receiving a snapshot.
s.sstSnapshotStorage = NewSSTSnapshotStorage(s.engine, s.limiters.BulkIOWriteRate)
s.sstSnapshotStorage.Init()
if err := s.sstSnapshotStorage.Clear(); err != nil {
log.Warningf(ctx, "failed to clear snapshot storage: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
// A failure to clean up the storage is benign except that it will leak
// disk space (which is reclaimed on node restart). It is unexpected
// though, so log a warning.
if err := kvSS.scratch.Clear(); err != nil {
if err := kvSS.scratch.Close(); err != nil {
log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
}
}
Expand Down

0 comments on commit cd76112

Please sign in to comment.