Skip to content

Commit

Permalink
storage: add method to ingest external files, rename IngestExternalFiles
Browse files Browse the repository at this point in the history
This change renames the existing IngestExternalFiles method
on storage.Engine to IngestLocalFiles, and adds a new IngestExternalFiles
that ingests pebble.ExternalFile, for use with online restore.

Depends on cockroachdb/pebble#2753.

Epic: none

Release note: None
  • Loading branch information
itsbilal committed Aug 10, 2023
1 parent 1fd3eb8 commit 97c2a85
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 31 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_cockroachdb_pebble//objstorage/remote",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@ func TestEvalAddSSTable(t *testing.T) {
} else {
require.NotNil(t, result.Replicated.AddSSTable)
require.NoError(t, fs.WriteFile(engine, "sst", result.Replicated.AddSSTable.Data))
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))
}

var expect kvs
Expand Down Expand Up @@ -1652,7 +1652,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
require.NoError(t, err)

require.NoError(t, fs.WriteFile(engine, "sst", sst))
require.NoError(t, engine.IngestExternalFiles(ctx, []string{"sst"}))
require.NoError(t, engine.IngestLocalFiles(ctx, []string{"sst"}))

statsEvaled := statsBefore
statsEvaled.Add(*cArgs.Stats)
Expand Down
45 changes: 30 additions & 15 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/redact"
"github.com/kr/pretty"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -628,19 +630,32 @@ func addSSTablePreApply(
sst.Span,
sst.RemoteFileLoc,
)
// TODO(bilal): replace this with the real ingest.
/*
start := storage.EngineKey{Key: sst.Span.Key}
end := storage.EngineKey{Key: sst.Span.EndKey}
externalFile := pebble.ExternalFile{
Locator: shared.Locator(sst.RemoteFileLoc),
ObjName: sst.RemoteFilePath,
Size: sst.BackingFileSize,
SmallestUserKey: start.Encode(),
LargestUserKey: end.Encode(),
}*/
log.Fatalf(ctx, "Unsupported IngestRemoteFile")
start := storage.EngineKey{Key: sst.Span.Key}
end := storage.EngineKey{Key: sst.Span.EndKey}
externalFile := pebble.ExternalFile{
Locator: remote.Locator(sst.RemoteFileLoc),
ObjName: sst.RemoteFilePath,
Size: sst.BackingFileSize,
SmallestUserKey: start.Encode(),
LargestUserKey: end.Encode(),
}
tBegin := timeutil.Now()
defer func() {
if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() {
log.Infof(ctx,
"ingesting SST of size %s at index %d took %.2fs",
humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(),
)
}
}()

_, ingestErr := env.eng.IngestExternalFiles(ctx, []pebble.ExternalFile{externalFile})
if ingestErr != nil {
log.Fatalf(ctx, "while ingesting %s: %v", sst.RemoteFilePath, ingestErr)
}
// Adding without modification succeeded, no copy necessary.
log.Eventf(ctx, "ingested SSTable at index %d, term %d: external %s", index, term, sst.RemoteFilePath)
return false
}
checksum := util.CRC32(sst.Data)

Expand Down Expand Up @@ -685,7 +700,7 @@ func addSSTablePreApply(
}

// Regular path - we made a hard link, so we can ingest the hard link now.
ingestErr := env.eng.IngestExternalFiles(ctx, []string{ingestPath})
ingestErr := env.eng.IngestLocalFiles(ctx, []string{ingestPath})
if ingestErr != nil {
log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr)
}
Expand Down Expand Up @@ -726,7 +741,7 @@ func ingestViaCopy(
if err := kvserverbase.WriteFileSyncing(ctx, ingestPath, sst.Data, eng, 0600, st, limiter); err != nil {
return errors.Wrapf(err, "while ingesting %s", ingestPath)
}
if err := eng.IngestExternalFiles(ctx, []string{ingestPath}); err != nil {
if err := eng.IngestLocalFiles(ctx, []string{ingestPath}); err != nil {
return errors.Wrapf(err, "while ingesting %s", ingestPath)
}
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (r *Replica) applySnapshot(
// TODO: separate ingestions for log and statemachine engine. See:
//
// https://github.com/cockroachdb/cockroach/issues/93251
r.store.TODOEngine().IngestExternalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
if r.store.cfg.KVAdmissionController != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,7 @@ func BenchmarkMVCCScannerWithIntentsAndVersions(b *testing.B) {
}
require.NoError(b, writer.Close())
batch.Close()
require.NoError(b, eng.IngestExternalFiles(ctx, []string{sstFileName}))
require.NoError(b, eng.IngestLocalFiles(ctx, []string{sstFileName}))
}
for i := 0; i < b.N; i++ {
rw := eng.NewReadOnly(StandardDurability)
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,13 +977,17 @@ type Engine interface {
NewSnapshot() Reader
// Type returns engine type.
Type() enginepb.EngineType
// IngestExternalFiles atomically links a slice of files into the RocksDB
// IngestLocalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
IngestExternalFiles(ctx context.Context, paths []string) error
// IngestExternalFilesWithStats is a variant of IngestExternalFiles that
IngestLocalFiles(ctx context.Context, paths []string) error
// IngestLocalFilesWithStats is a variant of IngestLocalFiles that
// additionally returns ingestion stats.
IngestExternalFilesWithStats(
IngestLocalFilesWithStats(
ctx context.Context, paths []string) (pebble.IngestOperationStats, error)
// IngestExternalFiles is a variant of IngestLocalFiles that takes external
// files. These files can be referred to by multiple stores, but are not
// modified or deleted by the Engine doing the ingestion.
IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error)
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (i ingestOp) run(ctx context.Context) string {
}
sstWriter.Close()

if err := i.m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil {
if err := i.m.engine.IngestLocalFiles(ctx, []string{sstPath}); err != nil {
return fmt.Sprintf("error = %s", err.Error())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
if err := fs.WriteFile(db2, `ingest`, memFile.Data()); err != nil {
t.Fatal(err)
}
if err := db2.IngestExternalFiles(ctx, []string{`ingest`}); err != nil {
if err := db2.IngestLocalFiles(ctx, []string{`ingest`}); err != nil {
t.Fatal(err)
}
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,7 @@ func DefaultPebbleOptions() *pebble.Options {
MemTableStopWritesThreshold: 4,
Merger: MVCCMerger,
BlockPropertyCollectors: PebbleBlockPropertyCollectors,
// Minimum supported format.
FormatMajorVersion: MinimumSupportedFormatVersion,
FormatMajorVersion: MinimumSupportedFormatVersion,
}
// Automatically flush 10s after the first range tombstone is added to a
// memtable. This ensures that we can reclaim space even when there's no
Expand Down Expand Up @@ -2021,18 +2020,25 @@ func (p *Pebble) Type() enginepb.EngineType {
return enginepb.EngineTypePebble
}

// IngestExternalFiles implements the Engine interface.
func (p *Pebble) IngestExternalFiles(ctx context.Context, paths []string) error {
// IngestLocalFiles implements the Engine interface.
func (p *Pebble) IngestLocalFiles(ctx context.Context, paths []string) error {
return p.db.Ingest(paths)
}

// IngestExternalFilesWithStats implements the Engine interface.
func (p *Pebble) IngestExternalFilesWithStats(
// IngestLocalFilesWithStats implements the Engine interface.
func (p *Pebble) IngestLocalFilesWithStats(
ctx context.Context, paths []string,
) (pebble.IngestOperationStats, error) {
return p.db.IngestWithStats(paths)
}

// IngestExternalFiles implements the Engine interface.
func (p *Pebble) IngestExternalFiles(
ctx context.Context, external []pebble.ExternalFile,
) (pebble.IngestOperationStats, error) {
return p.db.IngestExternalFiles(external)
}

// PreIngestDelay implements the Engine interface.
func (p *Pebble) PreIngestDelay(ctx context.Context) {
preIngestDelay(ctx, p, p.settings)
Expand Down

0 comments on commit 97c2a85

Please sign in to comment.