Skip to content

Commit

Permalink
kvserver: Fix missing increment in SSTSnapshotStorage refcount
Browse files Browse the repository at this point in the history
Previously, in rare cases where we had two SSTSnapshotStorageScratch
for a given range, we'd have not incremented the refcount for the
second scratch creation, leading to an inconsistent refcount
panic down the line. This change fixes it and adds a test to
exercise the concurrent case.

Bug popped up in cockroachdb#84100.

Release note: None.
  • Loading branch information
itsbilal committed Sep 19, 2022
1 parent 1ea1965 commit 520de9c
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 11 deletions.
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,16 @@ func TestAddReplicaViaLearner(t *testing.T) {
// The happy case! \o/

blockUntilSnapshotCh := make(chan struct{})
var receivedSnap int64
blockSnapshotsCh := make(chan struct{})
knobs, ltk := makeReplicationTestKnobs()
ltk.storeKnobs.ReceiveSnapshot = func(h *kvserver.SnapshotRequest_Header) error {
close(blockUntilSnapshotCh)
if atomic.CompareAndSwapInt64(&receivedSnap, 0, 1) {
close(blockUntilSnapshotCh)
} else {
// Do nothing. We aren't interested in subsequent snapshots.
return nil
}
select {
case <-blockSnapshotsCh:
case <-time.After(10 * time.Second):
Expand Down
17 changes: 7 additions & 10 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type SSTSnapshotStorage struct {
dir string
mu struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
rangeRefCount map[roachpb.RangeID]int
}
}

Expand All @@ -46,8 +46,8 @@ func NewSSTSnapshotStorage(engine storage.Engine, limiter *rate.Limiter) SSTSnap
dir: filepath.Join(engine.GetAuxiliaryDir(), "sstsnapshot"),
mu: struct {
syncutil.Mutex
ranges map[roachpb.RangeID]int
}{ranges: make(map[roachpb.RangeID]int)},
rangeRefCount map[roachpb.RangeID]int
}{rangeRefCount: make(map[roachpb.RangeID]int)},
}
}

Expand All @@ -57,10 +57,7 @@ func (s *SSTSnapshotStorage) NewScratchSpace(
rangeID roachpb.RangeID, snapUUID uuid.UUID,
) *SSTSnapshotStorageScratch {
s.mu.Lock()
rangeStorage := s.mu.ranges[rangeID]
if rangeStorage == 0 {
s.mu.ranges[rangeID] = 1
}
s.mu.rangeRefCount[rangeID]++
s.mu.Unlock()
snapDir := filepath.Join(s.dir, strconv.Itoa(int(rangeID)), snapUUID.String())
return &SSTSnapshotStorageScratch{
Expand All @@ -82,14 +79,14 @@ func (s *SSTSnapshotStorage) Clear() error {
func (s *SSTSnapshotStorage) scratchClosed(rangeID roachpb.RangeID) {
s.mu.Lock()
defer s.mu.Unlock()
val := s.mu.ranges[rangeID]
val := s.mu.rangeRefCount[rangeID]
if val <= 0 {
panic("inconsistent scratch ref count")
}
val--
s.mu.ranges[rangeID] = val
s.mu.rangeRefCount[rangeID] = val
if val == 0 {
delete(s.mu.ranges, rangeID)
delete(s.mu.rangeRefCount, rangeID)
// Suppressing an error here is okay, as orphaned directories are at worst
// a performance issue when we later walk directories in pebble.Capacity()
// but not a correctness issue.
Expand Down
113 changes: 113 additions & 0 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io/ioutil"
"path/filepath"
"strconv"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -113,6 +115,117 @@ func TestSSTSnapshotStorage(t *testing.T) {
}
}

func TestSSTSnapshotStorageConcurrentRange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
testRangeID := roachpb.RangeID(1)
testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890")))
testSnapUUID2 := uuid.Must(uuid.FromBytes([]byte("foobar2345678910")))
testLimiter := rate.NewLimiter(rate.Inf, 0)

cleanup, eng := newOnDiskEngine(t)
defer cleanup()
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)

runForSnap := func(snapUUID uuid.UUID) error {
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, snapUUID)

// Check that the storage lazily creates the directories on first write.
_, err := eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}

f, err := scratch.NewFile(ctx, 0)
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
}()

// Check that even though the files aren't created, they are still recorded in SSTs().
require.Equal(t, len(scratch.SSTs()), 1)

// Check that the storage lazily creates the files on write.
for _, fileName := range scratch.SSTs() {
_, err := eng.Stat(fileName)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", fileName)
}
}

_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// After writing to files, check that they have been flushed to disk.
for _, fileName := range scratch.SSTs() {
f, err := eng.Open(fileName)
require.NoError(t, err)
data, err := ioutil.ReadAll(f)
require.NoError(t, err)
require.Equal(t, data, []byte("foo"))
require.NoError(t, f.Close())
}

// Check that closing is idempotent.
require.NoError(t, f.Close())
require.NoError(t, f.Close())

// Check that writing to a closed file is an error.
_, err = f.Write([]byte("foo"))
require.EqualError(t, err, "file has already been closed")

// Check that closing an empty file is an error.
f, err = scratch.NewFile(ctx, 0)
require.NoError(t, err)
require.EqualError(t, f.Close(), "file is empty")
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// Check that Close removes the snapshot directory.
require.NoError(t, scratch.Close())
_, err = eng.Stat(scratch.snapDir)
if !oserror.IsNotExist(err) {
return errors.Errorf("expected %s to not exist", scratch.snapDir)
}
return nil
}

var wg sync.WaitGroup
wg.Add(2)
errChan := make(chan error)
for _, snapID := range []uuid.UUID{testSnapUUID, testSnapUUID2} {
snapID := snapID
go func() {
defer wg.Done()
if err := runForSnap(snapID); err != nil {
errChan <- err
}
}()
}
wg.Wait()
select {
case err := <-errChan:
t.Fatal(err)
default:
}
// Ensure that the range directory was deleted after the scratches were
// closed.
rangeDir := filepath.Join(sstSnapshotStorage.dir, strconv.Itoa(int(testRangeID)))
_, err := eng.Stat(rangeDir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", rangeDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = eng.Stat(sstSnapshotStorage.dir)
if !oserror.IsNotExist(err) {
t.Fatalf("expected %s to not exist", sstSnapshotStorage.dir)
}
}

// TestMultiSSTWriterInitSST tests that multiSSTWriter initializes each of the
// SST files associated with the replicated key ranges by writing a range
// deletion tombstone that spans the entire range of each respectively.
Expand Down

0 comments on commit 520de9c

Please sign in to comment.