Skip to content

Commit

Permalink
db: rework backing refcount management
Browse files Browse the repository at this point in the history
Previously, the logic for maintaining the `virtualBackings` map was
very convoluted; in addition, modification happened outside `DB.mu`
which makes the map unusable outside the log lock (we will want to use
it for finding existing external backings).

`Apply` is no longer responsible for updating the backing refcounts
and the `virtualBackings` map (and for updating a zombies map). It is
more straightforward to do that separately (and under `DB.mu`); this
allows us to remove `AccumulateIncompleteAndApplySingleVE` as well,
which was semantically dubious (e.g. it modified `ve`).
  • Loading branch information
RaduBerinde committed Mar 8, 2024
1 parent 33f8f04 commit 8df4320
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 142 deletions.
4 changes: 2 additions & 2 deletions internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func TestLevelIterEquivalence(t *testing.T) {
amap[metas[i].FileNum] = metas[i]
}
b.Added[6] = amap
v, err := b.Apply(nil, base.DefaultComparer, 0, 0, nil)
v, err := b.Apply(nil, base.DefaultComparer, 0, 0)
require.NoError(t, err)
levelIter.Init(
keyspan.SpanIterOptions{}, base.DefaultComparer.Compare, tableNewIters,
Expand Down Expand Up @@ -454,7 +454,7 @@ func TestLevelIter(t *testing.T) {
amap[metas[i].FileNum] = metas[i]
}
b.Added[6] = amap
v, err := b.Apply(nil, base.DefaultComparer, 0, 0, nil)
v, err := b.Apply(nil, base.DefaultComparer, 0, 0)
require.NoError(t, err)
iter = NewLevelIter(
keyspan.SpanIterOptions{}, base.DefaultComparer.Compare,
Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/l0_sublevels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func readManifest(filename string) (*Version, error) {
if err := bve.Accumulate(&ve); err != nil {
return nil, err
}
if v, err = bve.Apply(v, base.DefaultComparer, 10<<20, 32000, nil); err != nil {
if v, err = bve.Apply(v, base.DefaultComparer, 10<<20, 32000); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func replayManifest(t *testing.T, opts *pebble.Options, dirname string) *manifes
}
v, err := bve.Apply(
nil /* version */, cmp, opts.FlushSplitBytes,
opts.Experimental.ReadCompactionRate, nil /* zombies */)
opts.Experimental.ReadCompactionRate)
require.NoError(t, err)
return v
}
11 changes: 0 additions & 11 deletions internal/manifest/testdata/version_edit_apply
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ edit
2:
000001:[a#1,SET-b#2,SET]
000004:[c#3,SET-d#4,SET]
zombies []

apply
L0
Expand Down Expand Up @@ -59,7 +58,6 @@ edit
000002:[c#3,SET-d#4,SET]
0.0:
000001:[a#1,SET-c#2,SET]
zombies []

apply
L0
Expand All @@ -76,7 +74,6 @@ edit
000001:[a#1,SET-c#2,SET]
0.0:
000004:[b#0,SET-d#0,SET]
zombies []


apply
Expand All @@ -90,15 +87,13 @@ edit
000004:[b#3,SET-d#5,SET]
0.0:
000001:[a#1,SET-c#2,SET]
zombies []

apply
L0
1:[a#1,SET-c#2,SET]
----
0.0:
000001:[a#1,SET-c#2,SET]
zombies []

apply
L2
Expand All @@ -125,7 +120,6 @@ edit
000005:[h#3,SET-h#2,SET]
000010:[j#3,SET-m#2,SET]
000002:[n#5,SET-q#3,SET]
zombies [1 4]

apply
edit
Expand All @@ -137,9 +131,6 @@ edit
2:
000006:[a#10,SET-a#7,SET]
000010:[j#3,SET-m#2,SET]
zombies []

# Verify that the zombies map is populated correctly.

apply
L0
Expand All @@ -153,7 +144,6 @@ edit
L1
2
----
zombies [1 2]

# Deletion of a non-existent table results in an error.

Expand Down Expand Up @@ -188,4 +178,3 @@ edit
----
2:
000005:[s#3,SET-z#4,SET]
zombies []
2 changes: 1 addition & 1 deletion internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (m *FileMetadata) LatestRef() {
}

// LatestUnref decrements the latest ref count associated with the backing
// sstable.
// sstable and returns the new refcount.
func (m *FileMetadata) LatestUnref() int32 {
if m.Virtual {
m.FileBacking.VirtualizedSize.Add(-m.Size)
Expand Down
102 changes: 5 additions & 97 deletions internal/manifest/version_edit.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,88 +879,17 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error {
return nil
}

// AccumulateIncompleteAndApplySingleVE should be called if a single version edit
// is to be applied to the provided curr Version and if the caller needs to
// update the versionSet.zombieTables map. This function exists separately from
// BulkVersionEdit.Apply because it is easier to reason about properties
// regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we
// know that exactly one version edit is being accumulated.
//
// Note that the version edit passed into this function may be incomplete
// because compactions don't have the ref counting information necessary to
// populate VersionEdit.RemovedBackingTables. This function will complete such a
// version edit by populating RemovedBackingTables.
//
// Invariant: Any file being deleted through ve must belong to the curr Version.
// We can't have a delete for some arbitrary file which does not exist in curr.
func AccumulateIncompleteAndApplySingleVE(
ve *VersionEdit,
curr *Version,
comparer *base.Comparer,
flushSplitBytes int64,
readCompactionRate int64,
backings *FileBackings,
) (_ *Version, zombies map[base.DiskFileNum]uint64, _ error) {
if len(ve.RemovedBackingTables) != 0 {
panic("pebble: invalid incomplete version edit")
}
var b BulkVersionEdit
err := b.Accumulate(ve)
if err != nil {
return nil, nil, err
}
zombies = make(map[base.DiskFileNum]uint64)
v, err := b.Apply(curr, comparer, flushSplitBytes, readCompactionRate, zombies)
if err != nil {
return nil, nil, err
}

for _, s := range b.AddedFileBacking {
backings.Add(s)
}

for fileNum := range zombies {
if _, ok := backings.Get(fileNum); ok {
// This table was backing some virtual sstable in the latest version,
// but is now a zombie. We add RemovedBackingTables entries for
// these, before the version edit is written to disk.
ve.RemovedBackingTables = append(
ve.RemovedBackingTables, fileNum,
)
backings.Remove(fileNum)
}
}
return v, zombies, nil
}

// Apply applies the delta b to the current version to produce a new
// version. The new version is consistent with respect to the comparer cmp.
//
// curr may be nil, which is equivalent to a pointer to a zero version.
// Apply updates the backing refcounts (Ref/Unref) as files are installed into
// the levels. It does not update the "latest" refcounts
// (LatestRef/LatestUnref).
//
// On success, if a non-nil zombies map is provided to Apply, the map is updated
// with file numbers and files sizes of deleted files. These files are
// considered zombies because they are no longer referenced by the returned
// Version, but cannot be deleted from disk as they are still in use by the
// incoming Version.
// curr may be nil, which is equivalent to a pointer to a zero version.
func (b *BulkVersionEdit) Apply(
curr *Version,
comparer *base.Comparer,
flushSplitBytes int64,
readCompactionRate int64,
zombies map[base.DiskFileNum]uint64,
curr *Version, comparer *base.Comparer, flushSplitBytes int64, readCompactionRate int64,
) (*Version, error) {
addZombie := func(state *FileBacking) {
if zombies != nil {
zombies[state.DiskFileNum] = state.Size
}
}
removeZombie := func(state *FileBacking) {
if zombies != nil {
delete(zombies, state.DiskFileNum)
}
}

v := &Version{
cmp: comparer,
}
Expand Down Expand Up @@ -1037,23 +966,6 @@ func (b *BulkVersionEdit) Apply(
return nil, err
}
}

// Note that a backing sst will only become a zombie if the
// references to it in the latest version is 0. We will remove the
// backing sst from the zombie list in the next loop if one of the
// addedFiles in any of the levels is referencing the backing sst.
// This is possible if a physical sstable is virtualized, or if it
// is moved.
latestRefCount := f.LatestRefs()
if latestRefCount <= 0 {
// If a file is present in deletedFilesMap for a level, then it
// must have already been added to the level previously, which
// means that its latest ref count cannot be 0.
err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum)
return nil, err
} else if f.LatestUnref() == 0 {
addZombie(f.FileBacking)
}
}

addedFiles := make([]*FileMetadata, 0, len(addedFilesMap))
Expand Down Expand Up @@ -1082,9 +994,6 @@ func (b *BulkVersionEdit) Apply(
f.InitAllowedSeeks = allowedSeeks

err := lm.insert(f)
// We're adding this file to the new version, so increment the
// latest refs count.
f.LatestRef()
if err != nil {
return nil, errors.Wrap(err, "pebble")
}
Expand All @@ -1094,7 +1003,6 @@ func (b *BulkVersionEdit) Apply(
return nil, errors.Wrap(err, "pebble")
}
}
removeZombie(f.FileBacking)
// Track the keys with the smallest and largest keys, so that we can
// check consistency of the modified span.
if sm == nil || base.InternalCompare(comparer.Compare, sm.Smallest, f.Smallest) > 0 {
Expand Down
17 changes: 2 additions & 15 deletions internal/manifest/version_edit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"io"
"os"
"reflect"
"slices"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -526,23 +525,11 @@ func TestVersionEditApply(t *testing.T) {
return err.Error()
}
}
zombies := make(map[base.DiskFileNum]uint64)
newv, err := bve.Apply(v, base.DefaultComparer, flushSplitBytes, 32000, zombies)
newv, err := bve.Apply(v, base.DefaultComparer, flushSplitBytes, 32000)
if err != nil {
return err.Error()
}

zombieFileNums := make([]base.DiskFileNum, 0, len(zombies))
if len(veList) == 1 {
// Only care about zombies if a single version edit was
// being applied.
for fileNum := range zombies {
zombieFileNums = append(zombieFileNums, fileNum)
}
slices.Sort(zombieFileNums)
}

return fmt.Sprintf("%szombies %d\n", newv, zombieFileNums)
return newv.String()

default:
return fmt.Sprintf("unknown command: %s", d.Cmd)
Expand Down
3 changes: 1 addition & 2 deletions replay/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
v, err = bve.Apply(v,
r.Opts.Comparer,
r.Opts.FlushSplitBytes,
r.Opts.Experimental.ReadCompactionRate,
nil /* zombies */)
r.Opts.Experimental.ReadCompactionRate)
bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
return v, err
}
Expand Down
2 changes: 1 addition & 1 deletion tool/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func (d *dbT) runProperties(cmd *cobra.Command, args []string) {
}
v, err := bve.Apply(
nil /* version */, cmp, d.opts.FlushSplitBytes,
d.opts.Experimental.ReadCompactionRate, nil, /* zombies */
d.opts.Experimental.ReadCompactionRate,
)
if err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions tool/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (m *manifestT) runDump(cmd *cobra.Command, args []string) {
v, err := bve.Apply(
nil /* version */, comparer, 0,
m.opts.Experimental.ReadCompactionRate,
nil, /* zombies */
)
if err != nil {
fmt.Fprintf(stdout, "%s\n", err)
Expand Down Expand Up @@ -552,7 +551,7 @@ func (m *manifestT) runCheck(cmd *cobra.Command, args []string) {
}
// TODO(sbhola): add option to Apply that reports all errors instead of
// one error.
newv, err := bve.Apply(v, cmp, 0, m.opts.Experimental.ReadCompactionRate, nil /* zombies */)
newv, err := bve.Apply(v, cmp, 0, m.opts.Experimental.ReadCompactionRate)
if err != nil {
fmt.Fprintf(stdout, "%s: offset: %d err: %s\n",
arg, offset, err)
Expand Down
Loading

0 comments on commit 8df4320

Please sign in to comment.