Skip to content

Commit

Permalink
ingest: attach external objects at link time
Browse files Browse the repository at this point in the history
Move the attaching of external objects to the link phase. We already
attach the shared objects at this phase. We now attach all remote
objects with a single call.
  • Loading branch information
RaduBerinde committed Jan 23, 2024
1 parent 5fec7ea commit c4c17a7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 36 deletions.
2 changes: 1 addition & 1 deletion flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */); err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */, nil /* external */); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
61 changes: 28 additions & 33 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,6 @@ func ingestLoad1External(
// DiskFileNum (even though this is a virtual sstable).
meta.InitProviderBacking(base.DiskFileNum(fileNum))

// Try to resolve a reference to the external file.
backing, err := objProvider.CreateExternalObjectBacking(e.Locator, e.ObjName)
if err != nil {
return nil, err
}
metas, err := objProvider.AttachRemoteObjects([]objstorage.RemoteObjectToAttach{{
FileNum: meta.FileBacking.DiskFileNum,
FileType: fileTypeTable,
Backing: backing,
}})
if err != nil {
return nil, err
}
if opts.EventListener.TableCreated != nil {
opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(metas[0]),
FileNum: meta.FileBacking.DiskFileNum,
})
}
// In the name of keeping this ingestion as fast as possible, we avoid
// *all* existence checks and synthesize a file metadata with smallest/largest
// keys that overlap whatever the passed-in span was.
Expand Down Expand Up @@ -579,6 +558,7 @@ func ingestLink(
objProvider objstorage.Provider,
lr ingestLoadResult,
shared []SharedSSTMeta,
external []ExternalFile,
) error {
for i := range lr.localPaths {
objMeta, err := objProvider.LinkOrCopyFromLocal(
Expand All @@ -600,48 +580,63 @@ func ingestLink(
})
}
}
sharedObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared))
remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(shared)+len(external))
for i := range shared {
backing, err := shared[i].Backing.Get()
if err != nil {
return err
}
sharedObjs = append(sharedObjs, objstorage.RemoteObjectToAttach{
remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
FileNum: lr.sharedMeta[i].FileBacking.DiskFileNum,
FileType: fileTypeTable,
Backing: backing,
})
}
sharedObjMetas, err := objProvider.AttachRemoteObjects(sharedObjs)
for i := range external {
// Try to resolve a reference to the external file.
backing, err := objProvider.CreateExternalObjectBacking(external[i].Locator, external[i].ObjName)
if err != nil {
return err
}
remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
FileNum: lr.externalMeta[i].FileBacking.DiskFileNum,
FileType: fileTypeTable,
Backing: backing,
})
}

remoteObjMetas, err := objProvider.AttachRemoteObjects(remoteObjs)
if err != nil {
return err
}
for i := range sharedObjMetas {

for i := range shared {
// One corner case around file sizes we need to be mindful of, is that
// if one of the shareObjs was initially created by us (and has boomeranged
// back from another node), we'll need to update the FileBacking's size
// to be the true underlying size. Otherwise, we could hit errors when we
// open the db again after a crash/restart (see checkConsistency in open.go),
// plus it more accurately allows us to prioritize compactions of files
// that were originally created by us.
if sharedObjMetas[i].IsShared() && !objProvider.IsSharedForeign(sharedObjMetas[i]) {
size, err := objProvider.Size(sharedObjMetas[i])
if remoteObjMetas[i].IsShared() && !objProvider.IsSharedForeign(remoteObjMetas[i]) {
size, err := objProvider.Size(remoteObjMetas[i])
if err != nil {
return err
}
lr.sharedMeta[i].FileBacking.Size = uint64(size)
}
if opts.EventListener.TableCreated != nil {
}

if opts.EventListener.TableCreated != nil {
for i := range remoteObjMetas {
opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(sharedObjMetas[i]),
FileNum: sharedObjMetas[i].DiskFileNum,
Path: objProvider.Path(remoteObjMetas[i]),
FileNum: remoteObjMetas[i].DiskFileNum,
})
}
}
// We do not need to do anything about lr.externalMetas. Those were already
// linked in ingestLoad.

return nil
}
Expand Down Expand Up @@ -1359,7 +1354,7 @@ func (d *DB) ingest(
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared, external); err != nil {
return IngestOperationStats{}, err
}

Expand Down
4 changes: 2 additions & 2 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestIngestLink(t *testing.T) {
}

lr := ingestLoadResult{localMeta: meta, localPaths: paths}
err = ingestLink(0 /* jobID */, opts, objProvider, lr, nil /* shared */)
err = ingestLink(0 /* jobID */, opts, objProvider, lr, nil /* shared */, nil /* external */)
if i < count {
if err == nil {
t.Fatalf("expected error, but found success")
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestIngestLinkFallback(t *testing.T) {
meta := []*fileMetadata{{FileNum: 1}}
meta[0].InitPhysicalBacking()
lr := ingestLoadResult{localMeta: meta, localPaths: []string{"source"}}
err = ingestLink(0, opts, objProvider, lr, nil /* shared */)
err = ingestLink(0, opts, objProvider, lr, nil /* shared */, nil /* external */)
require.NoError(t, err)

dest, err := mem.Open("000001.sst")
Expand Down

0 comments on commit c4c17a7

Please sign in to comment.