Skip to content

Commit

Permalink
db: reuse backings for external ingestions
Browse files Browse the repository at this point in the history
When we ingest the same external file multiple times (either in
separate ingestions or within the same ingestion), we now use the same
backing.

We use the recently added virtual backing protection mechanism to make
sure that the backings don't disappear between the point where we
decide to reuse and applying the version change. The metamorphic test
provides good coverage for this.
  • Loading branch information
RaduBerinde committed Mar 20, 2024
1 parent 9d0109c commit 3a7021c
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 64 deletions.
2 changes: 1 addition & 1 deletion flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, lr); err != nil {
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, lr.local); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
194 changes: 143 additions & 51 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,6 @@ func ingestLoad1External(
Virtual: true,
Size: e.Size,
}
// For simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable). Pass the underlying
// FileBacking's size to the same size as the virtualized view of the sstable.
// This ensures that we don't over-prioritize this sstable for compaction just
// yet, as we do not have a clear sense of what parts of this sstable are
// referenced by other nodes.
meta.InitProviderBacking(base.DiskFileNum(fileNum), e.Size)

// In the name of keeping this ingestion as fast as possible, we avoid
// *all* existence checks and synthesize a file metadata with smallest/largest
Expand Down Expand Up @@ -248,9 +241,6 @@ func ingestLoad1External(
meta.SyntheticPrefix = e.SyntheticPrefix
meta.SyntheticSuffix = e.SyntheticSuffix

if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil {
return nil, err
}
return meta, nil
}

Expand Down Expand Up @@ -420,6 +410,11 @@ type ingestSharedMeta struct {
type ingestExternalMeta struct {
*fileMetadata
external ExternalFile
// usedExistingBacking is true if the external file is reusing a backing
// that existed before this ingestion. In this case, we called
// VirtualBackings.Protect() on that backing; we will need to call
// Unprotect() after the ingestion.
usedExistingBacking bool
}

func (r *ingestLoadResult) fileCount() int {
Expand Down Expand Up @@ -591,18 +586,18 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro
return firstErr
}

// ingestLink creates new objects which are backed by either hardlinks to or
// copies of the ingested files. It also attaches shared objects to the provider.
func ingestLink(
jobID int, opts *Options, objProvider objstorage.Provider, lr ingestLoadResult,
// ingestLinkLocal creates new objects which are backed by either hardlinks to or
// copies of the ingested files.
func ingestLinkLocal(
jobID int, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
) error {
for i := range lr.local {
for i := range localMetas {
objMeta, err := objProvider.LinkOrCopyFromLocal(
context.TODO(), opts.FS, lr.local[i].path, fileTypeTable, lr.local[i].FileBacking.DiskFileNum,
context.TODO(), opts.FS, localMetas[i].path, fileTypeTable, localMetas[i].FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true},
)
if err != nil {
if err2 := ingestCleanup(objProvider, lr.local[:i]); err2 != nil {
if err2 := ingestCleanup(objProvider, localMetas[:i]); err2 != nil {
opts.Logger.Errorf("ingest cleanup failed: %v", err2)
}
return err
Expand All @@ -612,10 +607,21 @@ func ingestLink(
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(objMeta),
FileNum: base.PhysicalTableDiskFileNum(lr.local[i].FileNum),
FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
})
}
}
return nil
}

// ingestAttachRemote attaches remote objects to the storage provider.
//
// For external objects, we reuse existing FileBackings from the current version
// when possible.
//
// ingestUnprotectExternalBackings() must be called after this function (even in
// error cases).
func (d *DB) ingestAttachRemote(jobID int, lr ingestLoadResult) error {
remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
for i := range lr.shared {
backing, err := lr.shared[i].shared.Backing.Get()
Expand All @@ -628,20 +634,52 @@ func ingestLink(
Backing: backing,
})
}

d.findExistingBackingsForExternalObjects(lr.external)

newFileBackings := make(map[remote.ObjectKey]*fileBacking, len(lr.external))
for i := range lr.external {
// Try to resolve a reference to the external file.
backing, err := objProvider.CreateExternalObjectBacking(lr.external[i].external.Locator, lr.external[i].external.ObjName)
meta := lr.external[i].fileMetadata
if meta.FileBacking != nil {
// The backing was filled in by findExistingBackingsForExternalObjects().
continue
}
key := remote.MakeObjectKey(lr.external[i].external.Locator, lr.external[i].external.ObjName)
if backing, ok := newFileBackings[key]; ok {
// We already created the same backing in this loop.
meta.FileBacking = backing
continue
}
providerBacking, err := d.objProvider.CreateExternalObjectBacking(key.Locator, key.ObjectName)
if err != nil {
return err
}
// We have to attach the remote object (and assign it a DiskFileNum). For
// simplicity, we use the same number for both the FileNum and the
// DiskFileNum (even though this is a virtual sstable).
meta.InitProviderBacking(base.DiskFileNum(meta.FileNum), lr.external[i].external.Size)

// Set the underlying FileBacking's size to the same size as the virtualized
// view of the sstable. This ensures that we don't over-prioritize this
// sstable for compaction just yet, as we do not have a clear sense of
// what parts of this sstable are referenced by other nodes.
meta.FileBacking.Size = lr.external[i].external.Size
newFileBackings[key] = meta.FileBacking

remoteObjs = append(remoteObjs, objstorage.RemoteObjectToAttach{
FileNum: lr.external[i].FileBacking.DiskFileNum,
FileNum: meta.FileBacking.DiskFileNum,
FileType: fileTypeTable,
Backing: backing,
Backing: providerBacking,
})
}

remoteObjMetas, err := objProvider.AttachRemoteObjects(remoteObjs)
for i := range lr.external {
if err := lr.external[i].Validate(d.opts.Comparer.Compare, d.opts.Comparer.FormatKey); err != nil {
return err
}
}

remoteObjMetas, err := d.objProvider.AttachRemoteObjects(remoteObjs)
if err != nil {
return err
}
Expand All @@ -654,21 +692,21 @@ func ingestLink(
// open the db again after a crash/restart (see checkConsistency in open.go),
// plus it more accurately allows us to prioritize compactions of files
// that were originally created by us.
if remoteObjMetas[i].IsShared() && !objProvider.IsSharedForeign(remoteObjMetas[i]) {
size, err := objProvider.Size(remoteObjMetas[i])
if remoteObjMetas[i].IsShared() && !d.objProvider.IsSharedForeign(remoteObjMetas[i]) {
size, err := d.objProvider.Size(remoteObjMetas[i])
if err != nil {
return err
}
lr.shared[i].FileBacking.Size = uint64(size)
}
}

if opts.EventListener.TableCreated != nil {
if d.opts.EventListener.TableCreated != nil {
for i := range remoteObjMetas {
opts.EventListener.TableCreated(TableCreateInfo{
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
Reason: "ingesting",
Path: objProvider.Path(remoteObjMetas[i]),
Path: d.objProvider.Path(remoteObjMetas[i]),
FileNum: remoteObjMetas[i].DiskFileNum,
})
}
Expand All @@ -677,6 +715,49 @@ func ingestLink(
return nil
}

// findExistingBackingsForExternalObjects populates the FileBacking for external
// files which are already in use by the current version.
//
// We take a Ref and LatestRef on populated backings.
func (d *DB) findExistingBackingsForExternalObjects(metas []ingestExternalMeta) {
d.mu.Lock()
defer d.mu.Unlock()

for i := range metas {
diskFileNums := d.objProvider.GetExternalObjects(metas[i].external.Locator, metas[i].external.ObjName)
// We cross-check against fileBackings in the current version because it is
// possible that the external object is referenced by an sstable which only
// exists in a previous version. In that case, that object could be removed
// at any time so we cannot reuse it.
for _, n := range diskFileNums {
if backing, ok := d.mu.versions.virtualBackings.Get(n); ok {
// Protect this backing from being removed from the latest version. We
// will unprotect in ingestUnprotectExternalBackings.
d.mu.versions.virtualBackings.Protect(n)
metas[i].usedExistingBacking = true
metas[i].FileBacking = backing
break
}
}
}
}

// ingestUnprotectExternalBackings unprotects the file backings that were reused
// for external objects when the ingestion fails.
func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) {
d.mu.Lock()
defer d.mu.Unlock()

for _, meta := range lr.external {
if meta.usedExistingBacking {
// If the backing is not use anywhere else and the ingest failed (or the
// ingested tables were already compacted away), this call will cause in
// the next version update to remove the backing.
d.mu.versions.virtualBackings.Unprotect(meta.FileBacking.DiskFileNum)
}
}
}

func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format base.FormatKey) error {
setSeqFn := func(k base.InternalKey) base.InternalKey {
return base.MakeInternalKey(k.UserKey, seqNum, k.Kind())
Expand Down Expand Up @@ -1182,7 +1263,7 @@ type ExternalFile struct {
// - the backing sst must not contain multiple keys with the same prefix.
SyntheticSuffix []byte

// Level denotes the level at which this file was presetnt at read time
// Level denotes the level at which this file was present at read time
// if the external file was returned by a scan of an existing Pebble
// instance. If Level is 0, this field is ignored.
Level uint8
Expand Down Expand Up @@ -1420,10 +1501,16 @@ func (d *DB) ingest(

// Hard link the sstables into the DB directory. Since the sstables aren't
// referenced by a version, they won't be used. If the hard linking fails
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// (e.g. because the files reside on a different filesystem), ingestLinkLocal
// will fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, loadResult); err != nil {
if err := ingestLinkLocal(jobID, d.opts, d.objProvider, loadResult.local); err != nil {
return IngestOperationStats{}, err
}

err = d.ingestAttachRemote(jobID, loadResult)
defer d.ingestUnprotectExternalBackings(loadResult)
if err != nil {
return IngestOperationStats{}, err
}

Expand Down Expand Up @@ -2208,41 +2295,41 @@ func (d *DB) ingestApply(
// Determine the lowest level in the LSM for which the sstable doesn't
// overlap any existing files in the level.
var m *fileMetadata
sharedIdx := -1
externalIdx := -1
specifiedLevel := -1
externalFile := false
isShared := false
isExternal := false
if i < len(lr.local) {
// local file.
m = lr.local[i].fileMetadata
} else if (i - len(lr.local)) < len(lr.shared) {
// shared file.
sharedIdx = i - len(lr.local)
isShared = true
sharedIdx := i - len(lr.local)
m = lr.shared[sharedIdx].fileMetadata
specifiedLevel = int(lr.shared[sharedIdx].shared.Level)
} else {
// external file.
externalFile = true
externalIdx = i - (len(lr.local) + len(lr.shared))
isExternal = true
externalIdx := i - (len(lr.local) + len(lr.shared))
m = lr.external[externalIdx].fileMetadata
specifiedLevel = int(lr.external[externalIdx].external.Level)
if lr.externalFilesHaveLevel {
specifiedLevel = int(lr.external[externalIdx].external.Level)
}
}

// Add to CreatedBackingTables if this is a new backing.
//
// Shared files always have a new backing. External files have new backings
// iff the backing disk file num and the file num match (see ingestAttachRemote).
if isShared || (isExternal && m.FileBacking.DiskFileNum == base.DiskFileNum(m.FileNum)) {
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
}

f := &ve.NewFiles[i]
var err error
if sharedIdx >= 0 {
f.Level = specifiedLevel
if f.Level < sharedLevelsStart {
panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
f.Level, sharedLevelsStart))
}
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
} else if externalFile && lr.externalFilesHaveLevel {
if specifiedLevel != -1 {
f.Level = specifiedLevel
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
} else {
if externalFile {
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
}
var splitFile *fileMetadata
if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) {
// This file fits perfectly within the excise span. We can slot it at
Expand Down Expand Up @@ -2287,6 +2374,10 @@ func (d *DB) ingestApply(
d.mu.versions.logUnlock()
return nil, err
}
if isShared && f.Level < sharedLevelsStart {
panic(fmt.Sprintf("cannot slot a shared file higher than the highest shared level: %d < %d",
f.Level, sharedLevelsStart))
}
f.Meta = m
levelMetrics := metrics[f.Level]
if levelMetrics == nil {
Expand Down Expand Up @@ -2401,6 +2492,7 @@ func (d *DB) ingestApply(
if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(nil)
}); err != nil {
// Note: any error during logAndApply is fatal; this won't be reachable in production.
return nil, err
}

Expand Down
6 changes: 2 additions & 4 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func TestIngestLink(t *testing.T) {
opts.FS.Remove(meta[i].path)
}

lr := ingestLoadResult{local: meta}
err = ingestLink(0 /* jobID */, opts, objProvider, lr)
err = ingestLinkLocal(0 /* jobID */, opts, objProvider, meta)
if i < count {
if err == nil {
t.Fatalf("expected error, but found success")
Expand Down Expand Up @@ -402,8 +401,7 @@ func TestIngestLinkFallback(t *testing.T) {

meta := &fileMetadata{FileNum: 1}
meta.InitPhysicalBacking()
lr := ingestLoadResult{local: []ingestLocalMeta{{fileMetadata: meta, path: "source"}}}
err = ingestLink(0, opts, objProvider, lr)
err = ingestLinkLocal(0, opts, objProvider, []ingestLocalMeta{{fileMetadata: meta, path: "source"}})
require.NoError(t, err)

dest, err := mem.Open("000001.sst")
Expand Down
4 changes: 4 additions & 0 deletions objstorage/objstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ type Provider interface {
// Pebble and will never be removed by Pebble.
CreateExternalObjectBacking(locator remote.Locator, objName string) (RemoteObjectBacking, error)

// GetExternalObjects returns a list of DiskFileNums corresponding to all
// objects that are backed by the given external object.
GetExternalObjects(locator remote.Locator, objName string) []base.DiskFileNum

// AttachRemoteObjects registers existing remote objects with this provider.
//
// The objects are not guaranteed to be durable (accessible in case of
Expand Down
Loading

0 comments on commit 3a7021c

Please sign in to comment.