Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-21.2] kvserver: Clean up empty range directories after snapshots #88142

Merged
merged 2 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,16 @@ func TestAddReplicaViaLearner(t *testing.T) {
// The happy case! \o/

blockUntilSnapshotCh := make(chan struct{})
var receivedSnap int64
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error {
close(blockUntilSnapshotCh)
if atomic.CompareAndSwapInt64(&receivedSnap, 0, 1) {
close(blockUntilSnapshotCh)
} else {
// Do nothing. We aren't interested in subsequent snapshots.
return nil
}
select {
case <-blockSnapshotsCh:
case <-time.After(10 * time.Second):
Expand Down
55 changes: 53 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
Expand All @@ -31,6 +32,10 @@ type SSTSnapshotStorage struct {
engine storage.Engine
limiter *rate.Limiter
dir string
mu struct {
syncutil.Mutex
rangeRefCount map[roachpb.RangeID]int
}
}

// NewSSTSnapshotStorage creates a new SST snapshot storage.
Expand All @@ -39,6 +44,10 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
engine: engine,
limiter: limiter,
dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"),
mu: struct {
syncutil.Mutex
rangeRefCount map[roachpb.RangeID]int
}{rangeRefCount: make(map[roachpb.RangeID]int)},
}
}

Expand All @@ -47,9 +56,13 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
s.mu.rangeRefCount[rangeID]++
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
storage: s,
rangeID: rangeID,
snapDir: snapDir,
}
}
Expand All @@ -59,14 +72,38 @@ func (s *SSTSnapshotStorage) Clear() error {
return s.engine.RemoveAll(s.dir)
}

// scratchClosed is called when an SSTSnapshotStorageScratch created by this
// SSTSnapshotStorage is closed. This method handles any cleanup of range
// directories if all SSTSnapshotStorageScratches corresponding to a range
// have closed.
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.rangeRefCount[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.rangeRefCount[rangeID] = val
if val == 0 {
delete(s.mu.rangeRefCount, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
_ = s.engine.RemoveAll(filepath.Join(s.dir, strconv.Itoa(int(rangeID))))
}
}

// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
// when receiving a snapshot. Each scratch is associated with a specific
// snapshot.
type SSTSnapshotStorageScratch struct {
storage *SSTSnapshotStorage
rangeID roachpb.RangeID
ssts []string
snapDir string
dirCreated bool
closed bool
}

func (s *SSTSnapshotStorageScratch) filename(id int) string {
Expand All @@ -87,6 +124,9 @@ func (s *SSTSnapshotStorageScratch) createDir() error {
func (s *SSTSnapshotStorageScratch) NewFile(
ctx context.Context, bytesPerSync int64,
) (*SSTSnapshotStorageFile, error) {
if s.closed {
return nil, errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
id := len(s.ssts)
filename := s.filename(id)
s.ssts = append(s.ssts, filename)
Expand All @@ -103,6 +143,9 @@ func (s *SSTSnapshotStorageScratch) NewFile(
// the provided SST when it is finished using it. If the provided SST is empty,
// then no file will be created and nothing will be written.
func (s *SSTSnapshotStorageScratch) WriteSST(ctx context.Context, data []byte) error {
if s.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
if len(data) == 0 {
return nil
}
Expand All @@ -129,8 +172,13 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string {
return s.ssts
}

// Clear removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Clear() error {
// Close removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.storage.scratchClosed(s.rangeID)
return s.storage.engine.RemoveAll(s.snapDir)
}

Expand All @@ -157,6 +205,9 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
return err
}
}
if f.scratch.closed {
return errors.AssertionFailedf("SSTSnapshotStorageScratch closed")
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
Expand Down
125 changes: 123 additions & 2 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ package kvserver
import (
"context"
"io/ioutil"
"path/filepath"
"strconv"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -92,12 +96,129 @@ func TestSSTSnapshotStorage(t *testing.T) {
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Clear removes the directory.
require.NoError(t, scratch.Clear())
// Check that Close removes the snapshot directory as well as the range
// directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(scratch.rangeID)))
_, err = eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", sstSnapshotStorage.dir)
}
}

func TestSSTSnapshotStorageConcurrentRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
testRangeID := roachpb.RangeID(1)
testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890")))
testSnapUUID2 := uuid.Must(uuid.FromBytes([]byte("foobar2345678910")))
testLimiter := rate.NewLimiter(rate.Inf, 0)

cleanup, eng := newOnDiskEngine(t)
defer cleanup()
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)

runForSnap := func(snapUUID uuid.UUID) error {
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, snapUUID)

// Check that the storage lazily creates the directories on first write.
_, err := eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}

f, err := scratch.NewFile(ctx, 0)
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
}()

// Check that even though the files aren't created, they are still recorded in SSTs().
require.Equal(t, len(scratch.SSTs()), 1)

// Check that the storage lazily creates the files on write.
for _, fileName := range scratch.SSTs() {
_, err := eng.Stat(fileName)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", fileName)
}
}

_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// After writing to files, check that they have been flushed to disk.
for _, fileName := range scratch.SSTs() {
f, err := eng.Open(fileName)
require.NoError(t, err)
data, err := ioutil.ReadAll(f)
require.NoError(t, err)
require.Equal(t, data, []byte("foo"))
require.NoError(t, f.Close())
}

// Check that closing is idempotent.
require.NoError(t, f.Close())
require.NoError(t, f.Close())

// Check that writing to a closed file is an error.
_, err = f.Write([]byte("foo"))
require.EqualError(t, err, "file has already been closed")

// Check that closing an empty file is an error.
f, err = scratch.NewFile(ctx, 0)
require.NoError(t, err)
require.EqualError(t, f.Close(), "file is empty")
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Close removes the snapshot directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}
return nil
}

var wg sync.WaitGroup
wg.Add(2)
errChan := make(chan error)
for _, snapID := range []uuid.UUID{testSnapUUID, testSnapUUID2} {
snapID := snapID
go func() {
defer wg.Done()
if err := runForSnap(snapID); err != nil {
errChan <- err
}
}()
}
wg.Wait()
select {
case err := <-errChan:
t.Fatal(err)
default:
}
// Ensure that the range directory was deleted after the scratches were
// closed.
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(testRangeID)))
_, err := eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
// A failure to clean up the storage is benign except that it will leak
// disk space (which is reclaimed on node restart). It is unexpected
// though, so log a warning.
if err := kvSS.scratch.Clear(); err != nil {
if err := kvSS.scratch.Close(); err != nil {
log.Warningf(ctx, "error closing kvBatchSnapshotStrategy: %v", err)
}
}
Expand Down