diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index b3dfc8a2ef7e..3eb1a7915007 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -224,6 +224,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", + "@com_github_cockroachdb_pebble//objstorage/remote", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 88501088f439..9b36881486af 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) { } else { require.NotNil(t, result.Replicated.AddSSTable) require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) } var expect kvs @@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) { require.NoError(t, err) require.NoError(t, fs.WriteFile(engine, "sst", sst)) - require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"})) + require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"})) statsEvaled := statsBefore statsEvaled.Add(*cArgs.Stats) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 4cf3df327f2e..c4d5a42fb106 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -37,6 +37,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/redact" "github.com/kr/pretty" "golang.org/x/time/rate" @@ -628,19 +630,32 @@ func addSSTablePreApply( sst.Span, sst.RemoteFileLoc, ) - // TODO(bilal): replace this with the real ingest. - /* - start := storage.EngineKey{Key: sst.Span.Key} - end := storage.EngineKey{Key: sst.Span.EndKey} - - externalFile := pebble.ExternalFile{ - Locator: shared.Locator(sst.RemoteFileLoc), - ObjName: sst.RemoteFilePath, - Size: sst.BackingFileSize, - SmallestUserKey: start.Encode(), - LargestUserKey: end.Encode(), - }*/ - log.Fatalf(ctx, "Unsupported IngestRemoteFile") + start := storage.EngineKey{Key: sst.Span.Key} + end := storage.EngineKey{Key: sst.Span.EndKey} + externalFile := pebble.ExternalFile{ + Locator: remote.Locator(sst.RemoteFileLoc), + ObjName: sst.RemoteFilePath, + Size: sst.BackingFileSize, + SmallestUserKey: start.Encode(), + LargestUserKey: end.Encode(), + } + tBegin := timeutil.Now() + defer func() { + if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() { + log.Infof(ctx, + "ingesting SST of size %s at index %d took %.2fs", + humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(), + ) + } + }() + + _, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile}) + if ingestErr != nil { + log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr) + } + // Adding without modification succeeded, no copy necessary. + log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath) + return false } checksum := util.CRC32(sst.Data) @@ -685,7 +700,7 @@ func addSSTablePreApply( } // Regular path - we made a hard link, so we can ingest the hard link now. - ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath}) + ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath}) if ingestErr != nil { log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr) } @@ -726,7 +741,7 @@ func ingestViaCopy( if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } - if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil { + if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil { return errors.Wrapf(err, "while ingesting %s", ingestPath) } log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 4865725f64ea..bd269d450ac4 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -578,7 +578,7 @@ func (r *Replica) applySnapshot( // TODO: separate ingestions for log and statemachine engine. See: // // https://github.com/cockroachdb/cockroach/issues/93251 - r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) } if r.store.cfg.KVAdmissionController != nil { diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 345fec419e00..3fdcd1dcf39b 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -1944,7 +1944,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) { } require.NoError(b, writer.Close()) batch.Close() - require.NoError(b, eng.IngestExternalFiles(ctx, []string{sstFileName})) + require.NoError(b, eng.IngestLocalFiles(ctx, []string{sstFileName})) } for i := 0; i < b.N; i++ { rw := eng.NewReadOnly(StandardDurability) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index be393d0e1355..4be584751f10 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -977,13 +977,17 @@ type Engine interface { NewSnapshot() Reader // Type returns engine type. Type() enginepb.EngineType - // IngestExternalFiles atomically links a slice of files into the RocksDB + // IngestLocalFiles atomically links a slice of files into the RocksDB // log-structured merge-tree. - IngestExternalFiles(ctx context.Context, paths []string) error - // IngestExternalFilesWithStats is a variant of IngestExternalFiles that + IngestLocalFiles(ctx context.Context, paths []string) error + // IngestLocalFilesWithStats is a variant of IngestLocalFiles that // additionally returns ingestion stats. - IngestExternalFilesWithStats( + IngestLocalFilesWithStats( ctx context.Context, paths []string) (pebble.IngestOperationStats, error) + // IngestExternalFiles is a variant of IngestLocalFiles that takes external + // files. These files can be referred to by multiple stores, but are not + // modified or deleted by the Engine doing the ingestion. + IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error) // PreIngestDelay offers an engine the chance to backpressure ingestions. // When called, it may choose to block if the engine determines that it is in // or approaching a state where further ingestions may risk its health. diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 9b6e83f1291b..9e10b24d514a 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -779,7 +779,7 @@ func (i ingestOp) run(ctx context.Context) string { } sstWriter.Close() - if err := i.m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil { + if err := i.m.engine.IngestLocalFiles(ctx, []string{sstPath}); err != nil { return fmt.Sprintf("error = %s", err.Error()) } diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 7e272f838518..64381b0685d3 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -1371,7 +1371,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { if err := fs.WriteFile(db2, `ingest`, memFile.Data()); err != nil { t.Fatal(err) } - if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil { + if err := db2.IngestLocalFiles(ctx, []string{`ingest`}); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 63705f2264d1..e3c09a7cce11 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -582,8 +582,7 @@ func DefaultPebbleOptions() *pebble.Options { MemTableStopWritesThreshold: 4, Merger: MVCCMerger, BlockPropertyCollectors: PebbleBlockPropertyCollectors, - // Minimum supported format. - FormatMajorVersion: MinimumSupportedFormatVersion, + FormatMajorVersion: MinimumSupportedFormatVersion, } // Automatically flush 10s after the first range tombstone is added to a // memtable. This ensures that we can reclaim space even when there's no @@ -2021,18 +2020,25 @@ func (p *Pebble) Type() enginepb.EngineType { return enginepb.EngineTypePebble } -// IngestExternalFiles implements the Engine interface. -func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string) error { +// IngestLocalFiles implements the Engine interface. +func (p *Pebble) IngestLocalFiles(ctx context.Context, paths []string) error { return p.db.Ingest(paths) } -// IngestExternalFilesWithStats implements the Engine interface. -func (p *Pebble) IngestExternalFilesWithStats( +// IngestLocalFilesWithStats implements the Engine interface. +func (p *Pebble) IngestLocalFilesWithStats( ctx context.Context, paths []string, ) (pebble.IngestOperationStats, error) { return p.db.IngestWithStats(paths) } +// IngestExternalFiles implements the Engine interface. +func (p *Pebble) IngestExternalFiles( + ctx context.Context, external []pebble.ExternalFile, +) (pebble.IngestOperationStats, error) { + return p.db.IngestExternalFiles(external) +} + // PreIngestDelay implements the Engine interface. func (p *Pebble) PreIngestDelay(ctx context.Context) { preIngestDelay(ctx, p, p.settings)