Skip to content

Commit

Permalink
kvserver: Clean up empty range directories after snapshots
Browse files Browse the repository at this point in the history
Previously, we were creating subdirectories for ranges and
range snapshots in the auxiliary directory every time we
accepted a snapshot, but only cleaning up the snapshot
subdirectories after a snapshot scratch space closed. This
left empty parent range directories around on the FS,
slowing down future calls to Pebble.Capacity() and indirectly
slowing down AddSSTable in the future.

This change adds code to clean up empty range directories
in the aux directory if they're not being used. Some coordination
and synchronization code had to be added to ensure we wouldn't
remove a directory that was just created by a concurrent snapshot.

Fixes #83137.

Release note (performance improvement): Addresses issue where
imports and rebalances were being slowed down due to the accumulation
of empty directories from range snapshot applications.
  • Loading branch information
itsbilal committed Jul 11, 2022
1 parent 37cf9bf commit 05ee29e
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 5 deletions.
58 changes: 56 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
Expand All @@ -31,6 +32,10 @@ type SSTSnapshotStorage struct {
engine storage.Engine
limiter *rate.Limiter
dir string
mu struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
}
}

// NewSSTSnapshotStorage creates a new SST snapshot storage.
Expand All @@ -39,6 +44,10 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
engine: engine,
limiter: limiter,
dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"),
mu: struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
}{ranges: make(map[roachpb.RangeID]int)},
}
}

Expand All @@ -47,9 +56,16 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
rangeStorage := s.mu.ranges[rangeID]
if rangeStorage == 0 {
s.mu.ranges[rangeID] = 1
}
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
storage: s,
rangeID: rangeID,
snapDir: snapDir,
}
}
Expand All @@ -59,14 +75,38 @@ func (s *SSTSnapshotStorage) Clear() error {
return s.engine.RemoveAll(s.dir)
}

// scratchClosed is called when an SSTSnapshotStorageScratch created by this
// SSTSnapshotStorage is closed. This method handles any cleanup of range
// directories if all SSTSnapshotStorageScratches corresponding to a range
// have closed.
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.ranges[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.ranges[rangeID] = val
if val == 0 {
delete(s.mu.ranges, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
_ = s.engine.RemoveAll(filepath.Join(s.dir, strconv.Itoa(int(rangeID))))
}
}

// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
// when receiving a snapshot. Each scratch is associated with a specific
// snapshot.
type SSTSnapshotStorageScratch struct {
storage *SSTSnapshotStorage
rangeID roachpb.RangeID
ssts []string
snapDir string
dirCreated bool
closed bool
}

func (s *SSTSnapshotStorageScratch) filename(id int) string {
Expand All @@ -87,6 +127,9 @@ func (s *SSTSnapshotStorageScratch) createDir() error {
func (s *SSTSnapshotStorageScratch) NewFile(
ctx context.Context, bytesPerSync int64,
) (*SSTSnapshotStorageFile, error) {
if s.closed {
return nil, errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
Expand All @@ -103,6 +146,9 @@ func (s *SSTSnapshotStorageScratch) NewFile(
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
if s.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
if len(data) == 0 {
return nil
}
Expand All @@ -129,8 +175,13 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string {
return s.ssts
}

// Clear removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Clear() error {
// Close removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.storage.scratchClosed(s.rangeID)
return s.storage.engine.RemoveAll(s.snapDir)
}

Expand All @@ -157,6 +208,9 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
return err
}
}
if f.scratch.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ package kvserver
import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -93,12 +95,18 @@ func TestSSTSnapshotStorage(t *testing.T) {
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Clear removes the directory.
require.NoError(t, scratch.Clear())
// Check that Close removes the snapshot directory as well as the range
// directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(scratch.rangeID)))
_, err = eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
// A failure to clean up the storage is benign except that it will leak
// disk space (which is reclaimed on node restart). It is unexpected
// though, so log a warning.
if err := kvSS.scratch.Clear(); err != nil {
if err := kvSS.scratch.Close(); err != nil {
log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
}
}
Expand Down

0 comments on commit 05ee29e

Please sign in to comment.