Skip to content

Commit

Permalink
db: plumb context through ingest methods
Browse files Browse the repository at this point in the history
  • Loading branch information
RaduBerinde committed Aug 1, 2024
1 parent a466dad commit 38fd593
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 85 deletions.
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
if err != nil {
return nil, err
}
Expand All @@ -1298,7 +1298,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
}

if len(ingestSplitFiles) > 0 {
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
if err := d.ingestSplit(context.TODO(), ve, updateLevelMetricsOnExcise, ingestSplitFiles, replacedFiles); err != nil {
return nil, err
}
}
Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ func TestCompactionErrorCleanup(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
}
ingest("a", "c")
ingest("b")
Expand Down Expand Up @@ -2591,7 +2591,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
require.NoError(t, w.Set(key, nil))
require.NoError(t, w.Close())
// Ingest the SST.
return db.Ingest([]string{fName})
return db.Ingest(context.Background(), []string{fName})
}

testCases := []struct {
Expand Down Expand Up @@ -2800,7 +2800,7 @@ func TestCompactionErrorStats(t *testing.T) {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{"ext"}))
require.NoError(t, d.Ingest(context.Background(), []string{"ext"}))
}
ingest("a", "c")
// Snapshot will preserve the older "a" key during compaction.
Expand Down
6 changes: 3 additions & 3 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil {
return err
}
return nil
Expand All @@ -1364,7 +1364,7 @@ func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths = append(paths, arg.String())
}

if err := d.Ingest(paths); err != nil {
if err := d.Ingest(context.Background(), paths); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -1444,7 +1444,7 @@ func runIngestExternalCmd(
external = append(external, ef)
}

if _, err := d.IngestExternalFiles(external); err != nil {
if _, err := d.IngestExternalFiles(context.Background(), external); err != nil {
return err
}
return nil
Expand Down
16 changes: 8 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ func TestDBClosed(t *testing.T) {
require.True(t, errors.Is(catch(func() { _, _, _ = d.Get(nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Delete(nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.DeleteRange(nil, nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Ingest(nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Ingest(context.Background(), nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.LogData(nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.Merge(nil, nil, nil) }), ErrClosed))
require.True(t, errors.Is(catch(func() { _ = d.RatchetFormatMajorVersion(internalFormatNewest) }), ErrClosed))
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestDBConcurrentCompactClose(t *testing.T) {
})
require.NoError(t, w.Set([]byte(fmt.Sprint(j)), nil))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

require.NoError(t, d.Close())
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("cc"), []byte("foo")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest2.sst"
Expand All @@ -1660,7 +1660,7 @@ func TestMemtableIngestInversion(t *testing.T) {
require.NoError(t, w.Set([]byte("bb"), []byte("foo2")))
require.NoError(t, w.Set([]byte("cc"), []byte("foo2")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest3.sst"
Expand All @@ -1671,7 +1671,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("bb"), []byte("foo3")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
{
path := "ingest4.sst"
Expand All @@ -1682,7 +1682,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("bb"), []byte("foo4")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

// We now have a base compaction blocked. Block a memtable flush to cause
Expand Down Expand Up @@ -1761,7 +1761,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.DeleteRange([]byte("cc"), []byte("e")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}
t.Log("main ingest complete")
printLSM()
Expand Down Expand Up @@ -1795,7 +1795,7 @@ func TestMemtableIngestInversion(t *testing.T) {
})
require.NoError(t, w.Set([]byte("cc"), []byte("doesntmatter")))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
require.NoError(t, d.Ingest(context.Background(), []string{path}))
}

// Unblock earlier flushes. We will first finish flushing the blocked
Expand Down
5 changes: 3 additions & 2 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"bytes"
"context"
"fmt"
"reflect"
"runtime"
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestEventListener(t *testing.T) {
if err := w.Close(); err != nil {
return err.Error()
}
if err := d.Ingest([]string{"ext/0"}); err != nil {
if err := d.Ingest(context.Background(), []string{"ext/0"}); err != nil {
return err.Error()
}
return memLog.String()
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestEventListener(t *testing.T) {
if err := writeTable(tableB, 'b'); err != nil {
return err.Error()
}
if err := d.Ingest([]string{tableA, tableB}); err != nil {
if err := d.Ingest(context.Background(), []string{tableA, tableB}); err != nil {
return err.Error()
}

Expand Down
5 changes: 3 additions & 2 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pebble

import (
"bytes"
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -57,7 +58,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, lr.local); err != nil {
if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
45 changes: 26 additions & 19 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (r *ingestLoadResult) fileCount() int {
}

func ingestLoad(
ctx context.Context,
opts *Options,
fmv FormatMajorVersion,
paths []string,
Expand All @@ -431,8 +432,6 @@ func ingestLoad(
cacheID uint64,
pending []base.FileNum,
) (ingestLoadResult, error) {
ctx := context.TODO()

localFileNums := pending[:len(paths)]
sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
externalFileNums := pending[len(paths)+len(shared) : len(paths)+len(shared)+len(external)]
Expand Down Expand Up @@ -590,11 +589,15 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro
// ingestLinkLocal creates new objects which are backed by either hardlinks to or
// copies of the ingested files.
func ingestLinkLocal(
jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
ctx context.Context,
jobID JobID,
opts *Options,
objProvider objstorage.Provider,
localMetas []ingestLocalMeta,
) error {
for i := range localMetas {
objMeta, err := objProvider.LinkOrCopyFromLocal(
context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
ctx, opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true},
)
if err != nil {
Expand Down Expand Up @@ -1027,14 +1030,14 @@ func ingestTargetLevel(
// can produce a noticeable hiccup in performance. See
// https://github.com/cockroachdb/pebble/issues/25 for an idea for how to fix
// this hiccup.
func (d *DB) Ingest(paths []string) error {
func (d *DB) Ingest(ctx context.Context, paths []string) error {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return ErrReadOnly
}
_, err := d.ingest(paths, nil /* shared */, KeyRange{}, false, nil /* external */)
_, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */)
return err
}

Expand Down Expand Up @@ -1115,21 +1118,23 @@ type ExternalFile struct {

// IngestWithStats does the same as Ingest, and additionally returns
// IngestOperationStats.
func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) {
func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
}
if d.opts.ReadOnly {
return IngestOperationStats{}, ErrReadOnly
}
return d.ingest(paths, nil, KeyRange{}, false, nil)
return d.ingest(ctx, paths, nil, KeyRange{}, false, nil)
}

// IngestExternalFiles does the same as IngestWithStats, and additionally
// accepts external files (with locator info that can be resolved using
// d.opts.SharedStorage). These files must also be non-overlapping with
// each other, and must be resolvable through d.objProvider.
func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats, error) {
func (d *DB) IngestExternalFiles(
ctx context.Context, external []ExternalFile,
) (IngestOperationStats, error) {
if err := d.closed.Load(); err != nil {
panic(err)
}
Expand All @@ -1140,7 +1145,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
if d.opts.Experimental.RemoteStorage == nil {
return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured")
}
return d.ingest(nil, nil, KeyRange{}, false, external)
return d.ingest(ctx, nil, nil, KeyRange{}, false, external)
}

// IngestAndExcise does the same as IngestWithStats, and additionally accepts a
Expand All @@ -1154,6 +1159,7 @@ func (d *DB) IngestExternalFiles(external []ExternalFile) (IngestOperationStats,
// Panics if this DB instance was not instantiated with a remote.Storage and
// shared sstables are present.
func (d *DB) IngestAndExcise(
ctx context.Context,
paths []string,
shared []SharedSSTMeta,
external []ExternalFile,
Expand Down Expand Up @@ -1181,7 +1187,7 @@ func (d *DB) IngestAndExcise(
v, FormatMinForSharedObjects,
)
}
return d.ingest(paths, shared, exciseSpan, sstsContainExciseTombstone, external)
return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external)
}

// Both DB.mu and commitPipeline.mu must be held while this is called.
Expand Down Expand Up @@ -1303,6 +1309,7 @@ func (d *DB) handleIngestAsFlushable(

// See comment at Ingest() for details on how this works.
func (d *DB) ingest(
ctx context.Context,
paths []string,
shared []SharedSSTMeta,
exciseSpan KeyRange,
Expand All @@ -1325,7 +1332,6 @@ func (d *DB) ingest(
}
}
}
ctx := context.Background()
// Allocate file numbers for all of the files being ingested and mark them as
// pending in order to prevent them from being deleted. Note that this causes
// the file number ordering to be out of alignment with sequence number
Expand All @@ -1342,7 +1348,7 @@ func (d *DB) ingest(

// Load the metadata for all the files being ingested. This step detects
// and elides empty sstables.
loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
loadResult, err := ingestLoad(ctx, d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
if err != nil {
return IngestOperationStats{}, err
}
Expand All @@ -1362,7 +1368,7 @@ func (d *DB) ingest(
// (e.g. because the files reside on a different filesystem), ingestLinkLocal
// will fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
if err := ingestLinkLocal(ctx, jobID, d.opts, d.objProvider, loadResult.local); err != nil {
return IngestOperationStats{}, err
}

Expand Down Expand Up @@ -1697,7 +1703,7 @@ func (d *DB) ingest(
//
// The manifest lock must be held when calling this method.
func (d *DB) excise(
exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
ctx context.Context, exciseSpan base.UserKeyBounds, m *fileMetadata, ve *versionEdit, level int,
) ([]manifest.NewFileEntry, error) {
numCreatedFiles := 0
// Check if there's actually an overlap between m and exciseSpan.
Expand All @@ -1722,7 +1728,7 @@ func (d *DB) excise(
return nil
}
var err error
iters, err = d.newIters(context.TODO(), m, &IterOptions{
iters, err = d.newIters(ctx, m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
Expand Down Expand Up @@ -1982,6 +1988,7 @@ type ingestSplitFile struct {
//
// d.mu as well as the manifest lock must be held when calling this method.
func (d *DB) ingestSplit(
ctx context.Context,
ve *versionEdit,
updateMetrics func(*fileMetadata, int, []newFileEntry),
files []ingestSplitFile,
Expand Down Expand Up @@ -2047,7 +2054,7 @@ func (d *DB) ingestSplit(
// as we're guaranteed to not have any data overlap between splitFile and
// s.ingestFile. d.excise will return an error if we pass an inclusive user
// key bound _and_ we end up seeing data overlap at the end key.
added, err := d.excise(base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
added, err := d.excise(ctx, base.UserKeyBoundsFromInternal(s.ingestFile.Smallest, s.ingestFile.Largest), splitFile, ve, s.level)
if err != nil {
return err
}
Expand Down Expand Up @@ -2288,7 +2295,7 @@ func (d *DB) ingestApply(
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(exciseSpan.UserKeyBounds(), m, ve, level)
newFiles, err := d.excise(ctx, exciseSpan.UserKeyBounds(), m, ve, level)
if err != nil {
return nil, err
}
Expand All @@ -2308,7 +2315,7 @@ func (d *DB) ingestApply(
if len(filesToSplit) > 0 {
// For the same reasons as the above call to excise, we hold the db mutex
// while calling this method.
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
if err := d.ingestSplit(ctx, ve, updateLevelMetricsOnExcise, filesToSplit, replacedFiles); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit 38fd593

Please sign in to comment.