From c4c17a721d444fd5401101f0a33fb77eb79bec3c Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 22 Jan 2024 13:49:51 -0800 Subject: [PATCH] ingest: attach external objects at link time Move the attaching of external objects to the link phase. We already attach the shared objects at this phase. We now attach all remote objects with a single call. --- flushable_test.go | 2 +- ingest.go | 61 ++++++++++++++++++++++------------------------- ingest_test.go | 4 ++-- 3 files changed, 31 insertions(+), 36 deletions(-) diff --git a/flushable_test.go b/flushable_test.go index 8242556fdd..7ec4c24384 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -83,7 +83,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */, nil /* external */); err != nil { panic("couldn't hard link sstables") } diff --git a/ingest.go b/ingest.go index 00afcd93f0..05248d5158 100644 --- a/ingest.go +++ b/ingest.go @@ -187,27 +187,6 @@ func ingestLoad1External( // DiskFileNum (even though this is a virtual sstable). meta.InitProviderBacking(base.DiskFileNum(fileNum)) - // Try to resolve a reference to the external file. - backing, err := objProvider.CreateExternalObjectBacking(e.Locator, e.ObjName) - if err != nil { - return nil, err - } - metas, err := objProvider.AttachRemoteObjects([]objstorage.RemoteObjectToAttach{{ - FileNum: meta.FileBacking.DiskFileNum, - FileType: fileTypeTable, - Backing: backing, - }}) - if err != nil { - return nil, err - } - if opts.EventListener.TableCreated != nil { - opts.EventListener.TableCreated(TableCreateInfo{ - JobID: jobID, - Reason: "ingesting", - Path: objProvider.Path(metas[0]), - FileNum: meta.FileBacking.DiskFileNum, - }) - } // In the name of keeping this ingestion as fast as possible, we avoid // *all* existence checks and synthesize a file metadata with smallest/largest // keys that overlap whatever the passed-in span was. @@ -579,6 +558,7 @@ func ingestLink( objProvider objstorage.Provider, lr ingestLoadResult, shared []SharedSSTMeta, + external []ExternalFile, ) error { for i := range lr.localPaths { objMeta, err := objProvider.LinkOrCopyFromLocal( @@ -600,23 +580,37 @@ func ingestLink( }) } } - sharedObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared)) + remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared)+len(external)) for i := range shared { backing, err := shared[i].Backing.Get() if err != nil { return err } - sharedObjs = append(sharedObjs, objstorage.RemoteObjectToAttach{ + remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{ FileNum: lr.sharedMeta[i].FileBacking.DiskFileNum, FileType: fileTypeTable, Backing: backing, }) } - sharedObjMetas, err := objProvider.AttachRemoteObjects(sharedObjs) + for i := range external { + // Try to resolve a reference to the external file. + backing, err := objProvider.CreateExternalObjectBacking(external[i].Locator, external[i].ObjName) + if err != nil { + return err + } + remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{ + FileNum: lr.externalMeta[i].FileBacking.DiskFileNum, + FileType: fileTypeTable, + Backing: backing, + }) + } + + remoteObjMetas, err := objProvider.AttachRemoteObjects(remoteObjs) if err != nil { return err } - for i := range sharedObjMetas { + + for i := range shared { // One corner case around file sizes we need to be mindful of, is that // if one of the shareObjs was initially created by us (and has boomeranged // back from another node), we'll need to update the FileBacking's size @@ -624,24 +618,25 @@ func ingestLink( // open the db again after a crash/restart (see checkConsistency in open.go), // plus it more accurately allows us to prioritize compactions of files // that were originally created by us. - if sharedObjMetas[i].IsShared() && !objProvider.IsSharedForeign(sharedObjMetas[i]) { - size, err := objProvider.Size(sharedObjMetas[i]) + if remoteObjMetas[i].IsShared() && !objProvider.IsSharedForeign(remoteObjMetas[i]) { + size, err := objProvider.Size(remoteObjMetas[i]) if err != nil { return err } lr.sharedMeta[i].FileBacking.Size = uint64(size) } - if opts.EventListener.TableCreated != nil { + } + + if opts.EventListener.TableCreated != nil { + for i := range remoteObjMetas { opts.EventListener.TableCreated(TableCreateInfo{ JobID: jobID, Reason: "ingesting", - Path: objProvider.Path(sharedObjMetas[i]), - FileNum: sharedObjMetas[i].DiskFileNum, + Path: objProvider.Path(remoteObjMetas[i]), + FileNum: remoteObjMetas[i].DiskFileNum, }) } } - // We do not need to do anything about lr.externalMetas. Those were already - // linked in ingestLoad. return nil } @@ -1359,7 +1354,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared, external); err != nil { return IngestOperationStats{}, err } diff --git a/ingest_test.go b/ingest_test.go index 27f1e39f47..1948f73d4c 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -332,7 +332,7 @@ func TestIngestLink(t *testing.T) { } lr := ingestLoadResult{localMeta: meta, localPaths: paths} - err = ingestLink(0 /* jobID */, opts, objProvider, lr, nil /* shared */) + err = ingestLink(0 /* jobID */, opts, objProvider, lr, nil /* shared */, nil /* external */) if i < count { if err == nil { t.Fatalf("expected error, but found success") @@ -400,7 +400,7 @@ func TestIngestLinkFallback(t *testing.T) { meta := []*fileMetadata{{FileNum: 1}} meta[0].InitPhysicalBacking() lr := ingestLoadResult{localMeta: meta, localPaths: []string{"source"}} - err = ingestLink(0, opts, objProvider, lr, nil /* shared */) + err = ingestLink(0, opts, objProvider, lr, nil /* shared */, nil /* external */) require.NoError(t, err) dest, err := mem.Open("000001.sst")