diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index c656e3172b8d..ca2d51e8c5cb 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -34,10 +34,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" @@ -1031,6 +1033,110 @@ func TestCreateCheckpoint(t *testing.T) { } } +func TestCreateCheckpoint_SpanConstrained(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.WithIssue(t, 100935) + ctx := context.Background() + + rng, _ := randutil.NewTestRand() + dir, cleanup := testutils.TempDir(t) + defer cleanup() + + key := func(i int) roachpb.Key { + return keys.SystemSQLCodec.TablePrefix(uint32(i)) + } + + db, err := Open( + ctx, + Filesystem(dir), + cluster.MakeTestingClusterSettings(), + TargetFileSize(10<<10 /* 10 KB */), + ) + assert.NoError(t, err) + defer db.Close() + + // Write keys /Table/1/../Table/10000. + b := db.NewWriteBatch() + const maxTableID = 10000 + for i := 1; i <= maxTableID; i++ { + require.NoError(t, b.PutMVCC( + MVCCKey{Key: key(i), Timestamp: hlc.Timestamp{WallTime: int64(i)}}, + MVCCValue{Value: roachpb.Value{RawBytes: randutil.RandBytes(rng, 500)}}, + )) + } + require.NoError(t, b.Commit(true /* sync */)) + require.NoError(t, db.Flush()) + + sstables, err := db.db.SSTables() + require.NoError(t, err) + for _, tbls := range sstables { + for _, tbl := range tbls { + t.Logf("%s: %s-%s", tbl.FileNum, tbl.Smallest, tbl.Largest) + } + } + + checkpointRootDir := filepath.Join(dir, "checkpoint") + require.NoError(t, db.FS.MkdirAll(checkpointRootDir, os.ModePerm)) + + var checkpointNum int + checkpointSpan := func(s roachpb.Span) string { + checkpointNum++ + dir := filepath.Join(checkpointRootDir, fmt.Sprintf("%06d", checkpointNum)) + t.Logf("Writing checkpoint for span %s to %q", s, dir) + assert.NoError(t, db.CreateCheckpoint(dir, []roachpb.Span{s})) + assert.DirExists(t, dir) + m, err := filepath.Glob(dir + "/*") + assert.NoError(t, err) + assert.True(t, len(m) > 0) + t.Logf("Checkpoint wrote files: %s", strings.Join(m, ", ")) + return dir + } + verifyCheckpoint := func(dir string, low, high int) { + t.Logf("Verifying checkpoint for span [%d,%d) in %q", low, high, dir) + // Verify that we can open the checkpoint. + cDB, err := Open( + ctx, + Filesystem(dir), + cluster.MakeTestingClusterSettings(), + MustExist) + require.NoError(t, err) + defer cDB.Close() + + iter := cDB.NewMVCCIterator(MVCCKeyIterKind, IterOptions{ + LowerBound: key(low), + UpperBound: key(high), + }) + defer iter.Close() + iter.SeekGE(MVCCKey{Key: key(low)}) + count := 0 + for { + if valid, err := iter.Valid(); !valid { + require.NoError(t, err) + break + } + count++ + iter.Next() + } + require.Equal(t, count, high-low) + } + + for i := 0; i < 10; i++ { + start := randutil.RandIntInRange(rng, 1, maxTableID) + end := randutil.RandIntInRange(rng, 1, maxTableID) + for start == end { + end = randutil.RandIntInRange(rng, 1, maxTableID) + } + if start > end { + start, end = end, start + } + + span := roachpb.Span{Key: key(start), EndKey: key(end)} + dir := checkpointSpan(span) + verifyCheckpoint(dir, start, end) + } +} + func TestIngestDelayLimit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/storage/open.go b/pkg/storage/open.go index 4c5f4cbb9d08..c59e4ea9cdfb 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -107,6 +107,17 @@ func BlockSize(size int) ConfigOption { } } +// TargetFileSize sets the target file size across all levels of the LSM, +// primarily for testing purposes. +func TargetFileSize(size int64) ConfigOption { + return func(cfg *engineConfig) error { + for i := range cfg.Opts.Levels { + cfg.Opts.Levels[i].TargetFileSize = size + } + return nil + } +} + // MaxWriterConcurrency sets the concurrency of the sstable Writers. A concurrency // of 0 implies no parallelism in the Writer, and a concurrency of 1 or more implies // parallelism in the Writer. Currently, there's no difference between a concurrency diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 7f546f39fb1b..dafb70cbce8b 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1950,7 +1950,10 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error { if l := len(spans); l > 0 { s := make([]pebble.CheckpointSpan, 0, l) for _, span := range spans { - s = append(s, pebble.CheckpointSpan{Start: span.Key, End: span.EndKey}) + s = append(s, pebble.CheckpointSpan{ + Start: EngineKey{Key: span.Key}.Encode(), + End: EngineKey{Key: span.EndKey}.Encode(), + }) } opts = append(opts, pebble.WithRestrictToSpans(s)) }