Skip to content

Commit

Permalink
storage: encode engine keys passed as checkpoint bounds
Browse files Browse the repository at this point in the history
Previously, CreateCheckpoint would restrict a checkpoint by passing invalid
keys to Pebble. This keys were unencoded `roachpb.Key`s without a version
length last byte.

The unit test is skipped, because it reveals another ununderstood problem.

Close cockroachdb#100919.
Informs cockroachdb#100935.
Epic: none
Release note: None
  • Loading branch information
jbowens committed Apr 7, 2023
1 parent 8e6f530 commit c61b6f5
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 1 deletion.
106 changes: 106 additions & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
Expand Down Expand Up @@ -1031,6 +1033,110 @@ func TestCreateCheckpoint(t *testing.T) {
}
}

func TestCreateCheckpoint_SpanConstrained(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 100935)
ctx := context.Background()

rng, _ := randutil.NewTestRand()
dir, cleanup := testutils.TempDir(t)
defer cleanup()

key := func(i int) roachpb.Key {
return keys.SystemSQLCodec.TablePrefix(uint32(i))
}

db, err := Open(
ctx,
Filesystem(dir),
cluster.MakeTestingClusterSettings(),
TargetFileSize(10<<10 /* 10 KB */),
)
assert.NoError(t, err)
defer db.Close()

// Write keys /Table/1/../Table/10000.
b := db.NewWriteBatch()
const maxTableID = 10000
for i := 1; i <= maxTableID; i++ {
require.NoError(t, b.PutMVCC(
MVCCKey{Key: key(i), Timestamp: hlc.Timestamp{WallTime: int64(i)}},
MVCCValue{Value: roachpb.Value{RawBytes: randutil.RandBytes(rng, 500)}},
))
}
require.NoError(t, b.Commit(true /* sync */))
require.NoError(t, db.Flush())

sstables, err := db.db.SSTables()
require.NoError(t, err)
for _, tbls := range sstables {
for _, tbl := range tbls {
t.Logf("%s: %s-%s", tbl.FileNum, tbl.Smallest, tbl.Largest)
}
}

checkpointRootDir := filepath.Join(dir, "checkpoint")
require.NoError(t, db.FS.MkdirAll(checkpointRootDir, os.ModePerm))

var checkpointNum int
checkpointSpan := func(s roachpb.Span) string {
checkpointNum++
dir := filepath.Join(checkpointRootDir, fmt.Sprintf("%06d", checkpointNum))
t.Logf("Writing checkpoint for span %s to %q", s, dir)
assert.NoError(t, db.CreateCheckpoint(dir, []roachpb.Span{s}))
assert.DirExists(t, dir)
m, err := filepath.Glob(dir + "/*")
assert.NoError(t, err)
assert.True(t, len(m) > 0)
t.Logf("Checkpoint wrote files: %s", strings.Join(m, ", "))
return dir
}
verifyCheckpoint := func(dir string, low, high int) {
t.Logf("Verifying checkpoint for span [%d,%d) in %q", low, high, dir)
// Verify that we can open the checkpoint.
cDB, err := Open(
ctx,
Filesystem(dir),
cluster.MakeTestingClusterSettings(),
MustExist)
require.NoError(t, err)
defer cDB.Close()

iter := cDB.NewMVCCIterator(MVCCKeyIterKind, IterOptions{
LowerBound: key(low),
UpperBound: key(high),
})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: key(low)})
count := 0
for {
if valid, err := iter.Valid(); !valid {
require.NoError(t, err)
break
}
count++
iter.Next()
}
require.Equal(t, count, high-low)
}

for i := 0; i < 10; i++ {
start := randutil.RandIntInRange(rng, 1, maxTableID)
end := randutil.RandIntInRange(rng, 1, maxTableID)
for start == end {
end = randutil.RandIntInRange(rng, 1, maxTableID)
}
if start > end {
start, end = end, start
}

span := roachpb.Span{Key: key(start), EndKey: key(end)}
dir := checkpointSpan(span)
verifyCheckpoint(dir, start, end)
}
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,17 @@ func BlockSize(size int) ConfigOption {
}
}

// TargetFileSize sets the target file size across all levels of the LSM,
// primarily for testing purposes.
func TargetFileSize(size int64) ConfigOption {
return func(cfg *engineConfig) error {
for i := range cfg.Opts.Levels {
cfg.Opts.Levels[i].TargetFileSize = size
}
return nil
}
}

// MaxWriterConcurrency sets the concurrency of the sstable Writers. A concurrency
// of 0 implies no parallelism in the Writer, and a concurrency of 1 or more implies
// parallelism in the Writer. Currently, there's no difference between a concurrency
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,10 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error {
if l := len(spans); l > 0 {
s := make([]pebble.CheckpointSpan, 0, l)
for _, span := range spans {
s = append(s, pebble.CheckpointSpan{Start: span.Key, End: span.EndKey})
s = append(s, pebble.CheckpointSpan{
Start: EngineKey{Key: span.Key}.Encode(),
End: EngineKey{Key: span.EndKey}.Encode(),
})
}
opts = append(opts, pebble.WithRestrictToSpans(s))
}
Expand Down

0 comments on commit c61b6f5

Please sign in to comment.