Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: plumb context through ingest methods #3821

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
if err != nil {
return nil, err
}
Expand All @@ -1298,7 +1298,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
}

if len(ingestSplitFiles) > 0 {
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
return nil, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ func TestCompactionErrorCleanup(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
}
ingest("a", "c")
ingest("b")
Expand Down Expand Up @@ -2591,7 +2591,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
require.NoError(t, w.Set(key, nil))
require.NoError(t, w.Close())
// Ingest the SST.
return db.Ingest([]string{fName})
return db.Ingest(context.Background(), []string{fName})
}

testCases := []struct {
Expand Down Expand Up @@ -2800,7 +2800,7 @@ func TestCompactionErrorStats(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
}
ingest("a", "c")
// Snapshot will preserve the older "a" key during compaction.
Expand Down
6 changes: 3 additions & 3 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
return err
}
return nil
Expand All @@ -1364,7 +1364,7 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths = append(paths, arg.String())
}

if err := d.Ingest(paths); err != nil {
if err := d.Ingest(context.Background(), paths); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1444,7 +1444,7 @@ func runIngestExternalCmd(
external = append(external, ef)
}

if _, err := d.IngestExternalFiles(external); err != nil {
if _, err := d.IngestExternalFiles(context.Background(), external); err != nil {
return err
}
return nil
Expand Down
16 changes: 8 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ func TestDBClosed(t *testing.T) {
require.True(t, errors.Is(catch(func() { _, _, _ = d.Get(nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Delete(nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.DeleteRange(nil, nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Ingest(nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Ingest(context.Background(), nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.LogData(nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Merge(nil, nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.RatchetFormatMajorVersion(internalFormatNewest) }), ErrClosed))
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestDBConcurrentCompactClose(t *testing.T) {
})
require.NoError(t, w.Set([]byte(fmt.Sprint(j)), nil))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

require.NoError(t, d.Close())
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("cc"), []byte("foo")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest2.sst"
Expand All @@ -1660,7 +1660,7 @@ func TestMemtableIngestInversion(t *testing.T) {
require.NoError(t, w.Set([]byte("bb"), []byte("foo2")))
require.NoError(t, w.Set([]byte("cc"), []byte("foo2")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest3.sst"
Expand All @@ -1671,7 +1671,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("bb"), []byte("foo3")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest4.sst"
Expand All @@ -1682,7 +1682,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("bb"), []byte("foo4")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

// We now have a base compaction blocked. Block a memtable flush to cause
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.DeleteRange([]byte("cc"), []byte("e")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
t.Log("main ingest complete")
printLSM()
Expand Down Expand Up @@ -1795,7 +1795,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("cc"), []byte("doesntmatter")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

// Unblock earlier flushes. We will first finish flushing the blocked
Expand Down
5 changes: 3 additions & 2 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"bytes"
"context"
"fmt"
"reflect"
"runtime"
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestEventListener(t *testing.T) {
if err := w.Close(); err != nil {
return err.Error()
}
if err := d.Ingest([]string{"ext/0"}); err != nil {
if err := d.Ingest(context.Background(), []string{"ext/0"}); err != nil {
return err.Error()
}
return memLog.String()
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestEventListener(t *testing.T) {
if err := writeTable(tableB, 'b'); err != nil {
return err.Error()
}
if err := d.Ingest([]string{tableA, tableB}); err != nil {
if err := d.Ingest(context.Background(), []string{tableA, tableB}); err != nil {
return err.Error()
}

Expand Down
5 changes: 3 additions & 2 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pebble

import (
"bytes"
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -57,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, lr.local); err != nil {
if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
45 changes: 26 additions & 19 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (r *ingestLoadResult) fileCount() int {
}

func ingestLoad(
ctx context.Context,
opts *Options,
fmv FormatMajorVersion,
paths []string,
Expand All @@ -431,8 +432,6 @@ func ingestLoad(
cacheID uint64,
pending []base.FileNum,
) (ingestLoadResult, error) {
ctx := context.TODO()

localFileNums := pending[:len(paths)]
sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
Expand Down Expand Up @@ -590,11 +589,15 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro
// ingestLinkLocal creates new objects which are backed by either hardlinks to or
// copies of the ingested files.
func ingestLinkLocal(
jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
ctx context.Context,
jobID JobID,
opts *Options,
objProvider objstorage.Provider,
localMetas []ingestLocalMeta,
) error {
for i := range localMetas {
objMeta, err := objProvider.LinkOrCopyFromLocal(
context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true},
)
if err != nil {
Expand Down Expand Up @@ -1027,14 +1030,14 @@ func ingestTargetLevel(
// can produce a noticeable hiccup in performance. See
// https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
// this hiccup.
func (d *DB) Ingest(paths []string) error {
func (d *DB) Ingest(ctx context.Context, paths []string) error {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return ErrReadOnly
}
_, err := d.ingest(paths, nil /* shared */, KeyRange{}, false, nil /* external */)
_, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */)
return err
}

Expand Down Expand Up @@ -1115,21 +1118,23 @@ type ExternalFile struct {

// IngestWithStats does the same as Ingest, and additionally returns
// IngestOperationStats.
func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return IngestOperationStats{}, ErrReadOnly
}
return d.ingest(paths, nil, KeyRange{}, false, nil)
return d.ingest(ctx, paths, nil, KeyRange{}, false, nil)
}

// IngestExternalFiles does the same as IngestWithStats, and additionally
// accepts external files (with locator info that can be resolved using
// d.opts.SharedStorage). These files must also be non-overlapping with
// each other, and must be resolvable through d.objProvider.
func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
func (d *DB) IngestExternalFiles(
ctx context.Context, external []ExternalFile,
) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand All @@ -1140,7 +1145,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
if d.opts.Experimental.RemoteStorage == nil {
return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
}
return d.ingest(nil, nil, KeyRange{}, false, external)
return d.ingest(ctx, nil, nil, KeyRange{}, false, external)
}

// IngestAndExcise does the same as IngestWithStats, and additionally accepts a
Expand All @@ -1154,6 +1159,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
// Panics if this DB instance was not instantiated with a remote.Storage and
// shared sstables are present.
func (d *DB) IngestAndExcise(
ctx context.Context,
paths []string,
shared []SharedSSTMeta,
external []ExternalFile,
Expand Down Expand Up @@ -1181,7 +1187,7 @@ func (d *DB) IngestAndExcise(
v, FormatMinForSharedObjects,
)
}
return d.ingest(paths, shared, exciseSpan, sstsContainExciseTombstone, external)
return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external)
}

// Both DB.mu and commitPipeline.mu must be held while this is called.
Expand Down Expand Up @@ -1303,6 +1309,7 @@ func (d *DB) handleIngestAsFlushable(

// See comment at Ingest() for details on how this works.
func (d *DB) ingest(
ctx context.Context,
paths []string,
shared []SharedSSTMeta,
exciseSpan KeyRange,
Expand All @@ -1325,7 +1332,6 @@ func (d *DB) ingest(
}
}
}
ctx := context.Background()
// Allocate file numbers for all of the files being ingested and mark them as
// pending in order to prevent them from being deleted. Note that this causes
// the file number ordering to be out of alignment with sequence number
Expand All @@ -1342,7 +1348,7 @@ func (d *DB) ingest(

// Load the metadata for all the files being ingested. This step detects
// and elides empty sstables.
loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
if err != nil {
return IngestOperationStats{}, err
}
Expand All @@ -1362,7 +1368,7 @@ func (d *DB) ingest(
// (e.g. because the files reside on a different filesystem), ingestLinkLocal
// will fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
return IngestOperationStats{}, err
}

Expand Down Expand Up @@ -1697,7 +1703,7 @@ func (d *DB) ingest(
//
// The manifest lock must be held when calling this method.
func (d *DB) excise(
exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
) ([]manifest.NewFileEntry, error) {
numCreatedFiles := 0
// Check if there's actually an overlap between m and exciseSpan.
Expand All @@ -1722,7 +1728,7 @@ func (d *DB) excise(
return nil
}
var err error
iters, err = d.newIters(context.TODO(), m, &IterOptions{
iters, err = d.newIters(ctx, m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
Expand Down Expand Up @@ -1982,6 +1988,7 @@ type ingestSplitFile struct {
//
// d.mu as well as the manifest lock must be held when calling this method.
func (d *DB) ingestSplit(
ctx context.Context,
ve *versionEdit,
updateMetrics func(*fileMetadata, int, []newFileEntry),
files []ingestSplitFile,
Expand Down Expand Up @@ -2047,7 +2054,7 @@ func (d *DB) ingestSplit(
// as we're guaranteed to not have any data overlap between splitFile and
// s.ingestFile. d.excise will return an error if we pass an inclusive user
// key bound _and_ we end up seeing data overlap at the end key.
added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
if err != nil {
return err
}
Expand Down Expand Up @@ -2288,7 +2295,7 @@ func (d *DB) ingestApply(
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level)
newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level)
if err != nil {
return nil, err
}
Expand All @@ -2308,7 +2315,7 @@ func (d *DB) ingestApply(
if len(filesToSplit) > 0 {
// For the same reasons as the above call to excise, we hold the db mutex
// while calling this method.
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
return nil, err
}
}
Expand Down
Loading
Loading