From 8edd89dbb17b5e46becfa28af85b22c84f19e251 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 13 Nov 2018 01:59:59 -0500 Subject: [PATCH] storage/cmdq: O(1) copy-on-write btree clones and atomic refcount GC policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All commits from #32165 except the last one. This change introduces O(1) btree cloning and a new copy-on-write scheme, essentially giving the btree an immutable API (for which I took inspiration from https://docs.rs/crate/im/). This is made efficient by the second part of the change - a new garbage collection policy for btrees. Nodes are now reference counted atomically and freed into global `sync.Pools` when they are no longer referenced. One of the main ideas in #31997 is to treat the btrees backing the command queue as immutable structures. In doing so, we adopt a copy-on-write scheme. Trees are cloned under lock and then accessed concurrently. When future writers want to modify the tree, they can do so by cloning any nodes that they touch. This commit provides this functionality in a much more elegant manner than 6994347. Instead of giving each node a "copy-on-write context", we instead give each node a reference count. We then use the following rule: 1. trees with exclusive ownership (refcount == 1) over a node can modify it in-place. 2. trees without exclusive ownership over a node must clone the node in order to modify it. Once cloned, the tree will now have exclusive ownership over that node. When cloning the node, the reference count of all of the node's children must be incremented. In following the simple rules, we end up with a really nice property - trees gain more and more "ownership" as they make modifications, meaning that subsequent modifications are much less likely to need to clone nodes. Essentially, we transparently incorporates the idea of local mutations (e.g. Clojure's transients or Haskell's ST monad) without any external API needed. Even better, reference counting internal nodes ties directly into the new GC policy, which allows us to recycle old nodes and make the copy-on-write scheme zero-allocation in almost all cases. When a node's reference count drops to 0, we simply toss it into a `sync.Pool`. We keep two separate pools - one for leaf nodes and one for non-leaf nodes. This wasn't possible with the previous "copy-on-write context" approach. The atomic reference counting does have an effect on benchmarks, but its not a big one (single/double digit ns) and is negligible compared to the speedup observed in #32165. ``` name old time/op new time/op delta BTreeInsert/count=16-4 73.2ns ± 4% 84.4ns ± 4% +15.30% (p=0.008 n=5+5) BTreeInsert/count=128-4 152ns ± 4% 167ns ± 4% +9.89% (p=0.008 n=5+5) BTreeInsert/count=1024-4 250ns ± 1% 263ns ± 2% +5.21% (p=0.008 n=5+5) BTreeInsert/count=8192-4 381ns ± 1% 394ns ± 2% +3.36% (p=0.008 n=5+5) BTreeInsert/count=65536-4 720ns ± 6% 746ns ± 1% ~ (p=0.119 n=5+5) BTreeDelete/count=16-4 127ns ±15% 131ns ± 9% ~ (p=0.690 n=5+5) BTreeDelete/count=128-4 182ns ± 8% 192ns ± 8% ~ (p=0.222 n=5+5) BTreeDelete/count=1024-4 323ns ± 3% 340ns ± 4% +5.20% (p=0.032 n=5+5) BTreeDelete/count=8192-4 532ns ± 2% 556ns ± 1% +4.55% (p=0.008 n=5+5) BTreeDelete/count=65536-4 1.15µs ± 2% 1.22µs ± 7% ~ (p=0.222 n=5+5) BTreeDeleteInsert/count=16-4 166ns ± 4% 174ns ± 3% +4.70% (p=0.032 n=5+5) BTreeDeleteInsert/count=128-4 370ns ± 2% 383ns ± 1% +3.57% (p=0.008 n=5+5) BTreeDeleteInsert/count=1024-4 548ns ± 3% 575ns ± 5% +4.89% (p=0.032 n=5+5) BTreeDeleteInsert/count=8192-4 775ns ± 1% 789ns ± 1% +1.86% (p=0.016 n=5+5) BTreeDeleteInsert/count=65536-4 2.20µs ±22% 2.10µs ±18% ~ (p=0.841 n=5+5) ``` We can see how important the GC and memory re-use policy is by comparing the following few benchmarks. Specifically, notice the difference in operation speed and allocation count in `BenchmarkBTreeDeleteInsertCloneEachTime` between the tests that `Reset` old clones (allowing nodes to be freed into `sync.Pool`s) and the tests that don't `Reset` old clones. ``` name time/op BTreeDeleteInsert/count=16-4 198ns ±28% BTreeDeleteInsert/count=128-4 375ns ± 3% BTreeDeleteInsert/count=1024-4 577ns ± 2% BTreeDeleteInsert/count=8192-4 798ns ± 1% BTreeDeleteInsert/count=65536-4 2.00µs ±13% BTreeDeleteInsertCloneOnce/count=16-4 173ns ± 2% BTreeDeleteInsertCloneOnce/count=128-4 379ns ± 2% BTreeDeleteInsertCloneOnce/count=1024-4 584ns ± 4% BTreeDeleteInsertCloneOnce/count=8192-4 800ns ± 2% BTreeDeleteInsertCloneOnce/count=65536-4 2.04µs ±32% BTreeDeleteInsertCloneEachTime/reset=false/count=16-4 535ns ± 8% BTreeDeleteInsertCloneEachTime/reset=false/count=128-4 1.29µs ± 1% BTreeDeleteInsertCloneEachTime/reset=false/count=1024-4 2.22µs ± 5% BTreeDeleteInsertCloneEachTime/reset=false/count=8192-4 2.55µs ± 5% BTreeDeleteInsertCloneEachTime/reset=false/count=65536-4 5.89µs ±20% BTreeDeleteInsertCloneEachTime/reset=true/count=16-4 240ns ± 1% BTreeDeleteInsertCloneEachTime/reset=true/count=128-4 610ns ± 4% BTreeDeleteInsertCloneEachTime/reset=true/count=1024-4 1.20µs ± 2% BTreeDeleteInsertCloneEachTime/reset=true/count=8192-4 1.69µs ± 1% BTreeDeleteInsertCloneEachTime/reset=true/count=65536-4 3.52µs ±18% name alloc/op BTreeDeleteInsert/count=16-4 0.00B BTreeDeleteInsert/count=128-4 0.00B BTreeDeleteInsert/count=1024-4 0.00B BTreeDeleteInsert/count=8192-4 0.00B BTreeDeleteInsert/count=65536-4 0.00B BTreeDeleteInsertCloneOnce/count=16-4 0.00B BTreeDeleteInsertCloneOnce/count=128-4 0.00B BTreeDeleteInsertCloneOnce/count=1024-4 0.00B BTreeDeleteInsertCloneOnce/count=8192-4 0.00B BTreeDeleteInsertCloneOnce/count=65536-4 1.00B ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=16-4 288B ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=128-4 897B ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=1024-4 1.61kB ± 1% BTreeDeleteInsertCloneEachTime/reset=false/count=8192-4 1.47kB ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=65536-4 2.40kB ±12% BTreeDeleteInsertCloneEachTime/reset=true/count=16-4 0.00B BTreeDeleteInsertCloneEachTime/reset=true/count=128-4 0.00B BTreeDeleteInsertCloneEachTime/reset=true/count=1024-4 0.00B BTreeDeleteInsertCloneEachTime/reset=true/count=8192-4 0.00B BTreeDeleteInsertCloneEachTime/reset=true/count=65536-4 0.00B name allocs/op BTreeDeleteInsert/count=16-4 0.00 BTreeDeleteInsert/count=128-4 0.00 BTreeDeleteInsert/count=1024-4 0.00 BTreeDeleteInsert/count=8192-4 0.00 BTreeDeleteInsert/count=65536-4 0.00 BTreeDeleteInsertCloneOnce/count=16-4 0.00 BTreeDeleteInsertCloneOnce/count=128-4 0.00 BTreeDeleteInsertCloneOnce/count=1024-4 0.00 BTreeDeleteInsertCloneOnce/count=8192-4 0.00 BTreeDeleteInsertCloneOnce/count=65536-4 0.00 BTreeDeleteInsertCloneEachTime/reset=false/count=16-4 1.00 ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=128-4 2.00 ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=1024-4 3.00 ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=8192-4 3.00 ± 0% BTreeDeleteInsertCloneEachTime/reset=false/count=65536-4 4.40 ±14% BTreeDeleteInsertCloneEachTime/reset=true/count=16-4 0.00 BTreeDeleteInsertCloneEachTime/reset=true/count=128-4 0.00 BTreeDeleteInsertCloneEachTime/reset=true/count=1024-4 0.00 BTreeDeleteInsertCloneEachTime/reset=true/count=8192-4 0.00 BTreeDeleteInsertCloneEachTime/reset=true/count=65536-4 0.00 ``` Release note: None --- pkg/storage/cmdq/interval_btree.go | 167 ++++++++++++++++++++---- pkg/storage/cmdq/interval_btree_test.go | 124 ++++++++++++++++++ 2 files changed, 267 insertions(+), 24 deletions(-) diff --git a/pkg/storage/cmdq/interval_btree.go b/pkg/storage/cmdq/interval_btree.go index 62269fee3e0a..54d637927cb4 100644 --- a/pkg/storage/cmdq/interval_btree.go +++ b/pkg/storage/cmdq/interval_btree.go @@ -18,6 +18,8 @@ import ( "bytes" "sort" "strings" + "sync" + "sync/atomic" "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -106,21 +108,121 @@ func upperBound(c *cmd) keyBound { } type leafNode struct { - max keyBound + ref int32 count int16 leaf bool + max keyBound cmds [maxCmds]*cmd } -func newLeafNode() *node { - return (*node)(unsafe.Pointer(&leafNode{leaf: true})) -} - type node struct { leafNode children [maxCmds + 1]*node } +func leafToNode(ln *leafNode) *node { + return (*node)(unsafe.Pointer(ln)) +} + +func nodeToLeaf(n *node) *leafNode { + return (*leafNode)(unsafe.Pointer(n)) +} + +var leafPool = sync.Pool{ + New: func() interface{} { + return new(leafNode) + }, +} + +var nodePool = sync.Pool{ + New: func() interface{} { + return new(node) + }, +} + +func newLeafNode() *node { + n := leafToNode(leafPool.Get().(*leafNode)) + n.leaf = true + n.ref = 1 + return n +} + +func newNode() *node { + n := nodePool.Get().(*node) + n.ref = 1 + return n +} + +// mut creates and returns a mutable node reference. If the node is not shared +// with any other trees then it can be modified in place. Otherwise, it must be +// cloned to ensure unique ownership. In this way, we enforce a copy-on-write +// policy which transparently incorporates the idea of local mutations, like +// Clojure's transients or Haskell's ST monad, where nodes are only copied +// during the first time that they are modified between Clone operations. +// +// When a node is cloned, the provided pointer will be redirected to the new +// mutable node. +func mut(n **node) *node { + if atomic.LoadInt32(&(*n).ref) == 1 { + // Exclusive ownership. Can mutate in place. + return *n + } + // If we do not have unique ownership over the node then we + // clone it to gain unique ownership. After doing so, we can + // release our reference to the old node. + c := (*n).clone() + (*n).decRef(true /* recursive */) + *n = c + return *n +} + +// incRef acquires a reference to the node. +func (n *node) incRef() { + atomic.AddInt32(&n.ref, 1) +} + +// decRef releases a reference to the node. If requested, the method +// will recurse into child nodes and decrease their refcounts as well. +func (n *node) decRef(recursive bool) { + if atomic.AddInt32(&n.ref, -1) > 0 { + // Other references remain. Can't free. + return + } + // Clear and release node into memory pool. + if n.leaf { + ln := nodeToLeaf(n) + *ln = leafNode{} + leafPool.Put(ln) + } else { + // Release child references first, if requested. + if recursive { + for i := int16(0); i <= n.count; i++ { + n.children[i].decRef(true /* recursive */) + } + } + *n = node{} + nodePool.Put(n) + } +} + +// clone creates a clone of the receiver with a single reference count. +func (n *node) clone() *node { + var c *node + if n.leaf { + c = newLeafNode() + *nodeToLeaf(c) = *nodeToLeaf(n) + } else { + c = newNode() + *c = *n + // Increase refcount of each child. + for i := int16(0); i <= c.count; i++ { + c.children[i].incRef() + } + } + c.ref = 1 + return c +} + func (n *node) insertAt(index int, c *cmd, nd *node) { if index < int(n.count) { copy(n.cmds[index+1:n.count+1], n.cmds[index:n.count]) @@ -246,7 +348,7 @@ func (n *node) split(i int) (*cmd, *node) { if n.leaf { next = newLeafNode() } else { - next = &node{} + next = newNode() } next.count = n.count - int16(i+1) copy(next.cmds[:], n.cmds[i+1:n.count]) @@ -286,7 +388,7 @@ func (n *node) insert(c *cmd) (replaced, newBound bool) { return false, n.adjustUpperBoundOnInsertion(c, nil) } if n.children[i].count >= maxCmds { - splitcmd, splitNode := n.children[i].split(maxCmds / 2) + splitcmd, splitNode := mut(&n.children[i]).split(maxCmds / 2) n.insertAt(i, splitcmd, splitNode) switch cmp := cmp(c, n.cmds[i]); { @@ -299,7 +401,7 @@ func (n *node) insert(c *cmd) (replaced, newBound bool) { return true, false } } - replaced, newBound = n.children[i].insert(c) + replaced, newBound = mut(&n.children[i]).insert(c) if newBound { newBound = n.adjustUpperBoundOnInsertion(c, nil) } @@ -316,7 +418,7 @@ func (n *node) removeMax() *cmd { n.adjustUpperBoundOnRemoval(out, nil) return out } - child := n.children[n.count] + child := mut(&n.children[n.count]) if child.count <= minCmds { n.rebalanceOrMerge(int(n.count)) return n.removeMax() @@ -336,12 +438,12 @@ func (n *node) remove(c *cmd) (out *cmd, newBound bool) { } return nil, false } - child := n.children[i] - if child.count <= minCmds { + if n.children[i].count <= minCmds { // Child not large enough to remove from. n.rebalanceOrMerge(i) return n.remove(c) } + child := mut(&n.children[i]) if found { // Replace the cmd being removed with the max cmd in our left child. out = n.cmds[i] @@ -389,8 +491,8 @@ func (n *node) rebalanceOrMerge(i int) { // v // a // - left := n.children[i-1] - child := n.children[i] + left := mut(&n.children[i-1]) + child := mut(&n.children[i]) xCmd, grandChild := left.popBack() yCmd := n.cmds[i-1] child.pushFront(yCmd, grandChild) @@ -428,8 +530,8 @@ func (n *node) rebalanceOrMerge(i int) { // v // a // - right := n.children[i+1] - child := n.children[i] + right := mut(&n.children[i+1]) + child := mut(&n.children[i]) xCmd, grandChild := right.popFront() yCmd := n.cmds[i] child.pushBack(yCmd, grandChild) @@ -464,7 +566,9 @@ func (n *node) rebalanceOrMerge(i int) { if i >= int(n.count) { i = int(n.count - 1) } - child := n.children[i] + child := mut(&n.children[i]) + // Make mergeChild mutable, bumping the refcounts on its children if necessary. + _ = mut(&n.children[i+1]) mergeCmd, mergeChild := n.removeAt(i) child.cmds[child.count] = mergeCmd copy(child.cmds[child.count+1:], mergeChild.cmds[:mergeChild.count]) @@ -474,6 +578,7 @@ func (n *node) rebalanceOrMerge(i int) { child.count += mergeChild.count + 1 child.adjustUpperBoundOnInsertion(mergeCmd, mergeChild) + mergeChild.decRef(false /* recursive */) } } @@ -547,25 +652,39 @@ type btree struct { length int } -// Reset removes all cmds from the btree. +// Reset removes all cmds from the btree. In doing so, it allows memory +// held by the btree to be recycled. Failure to call this method before +// letting a btree be GCed is safe in that it won't cause a memory leak, +// but it will prevent btree nodes from being efficiently re-used. func (t *btree) Reset() { - t.root = nil + if t.root != nil { + t.root.decRef(true /* recursive */) + t.root = nil + } t.length = 0 } -// Silent unused warning. -var _ = (*btree).Reset +// Clone clones the btree, lazily. +func (t *btree) Clone() btree { + c := *t + if c.root != nil { + c.root.incRef() + } + return c +} // Delete removes a cmd equal to the passed in cmd from the tree. func (t *btree) Delete(c *cmd) { if t.root == nil || t.root.count == 0 { return } - if out, _ := t.root.remove(c); out != nil { + if out, _ := mut(&t.root).remove(c); out != nil { t.length-- } if t.root.count == 0 && !t.root.leaf { + old := t.root t.root = t.root.children[0] + old.decRef(false /* recursive */) } } @@ -575,8 +694,8 @@ func (t *btree) Set(c *cmd) { if t.root == nil { t.root = newLeafNode() } else if t.root.count >= maxCmds { - splitcmd, splitNode := t.root.split(maxCmds / 2) - newRoot := &node{} + splitcmd, splitNode := mut(&t.root).split(maxCmds / 2) + newRoot := newNode() newRoot.count = 1 newRoot.cmds[0] = splitcmd newRoot.children[0] = t.root @@ -584,7 +703,7 @@ func (t *btree) Set(c *cmd) { newRoot.max = newRoot.findUpperBound() t.root = newRoot } - if replaced, _ := t.root.insert(c); !replaced { + if replaced, _ := mut(&t.root).insert(c); !replaced { t.length++ } } diff --git a/pkg/storage/cmdq/interval_btree_test.go b/pkg/storage/cmdq/interval_btree_test.go index b3bd33a240e0..ee18c47b5be2 100644 --- a/pkg/storage/cmdq/interval_btree_test.go +++ b/pkg/storage/cmdq/interval_btree_test.go @@ -17,6 +17,8 @@ package cmdq import ( "fmt" "math/rand" + "reflect" + "sync" "testing" "github.com/stretchr/testify/require" @@ -443,6 +445,67 @@ func TestBTreeSeekOverlapRandom(t *testing.T) { } } +func TestBTreeCloneConcurrentOperations(t *testing.T) { + const cloneTestSize = 10000 + var populate func(tr *btree, start int, p []*cmd, wg *sync.WaitGroup, trees *[]*btree) + populate = func(tr *btree, start int, p []*cmd, wg *sync.WaitGroup, trees *[]*btree) { + t.Logf("Starting new clone at %v", start) + *trees = append(*trees, tr) + for i := start; i < cloneTestSize; i++ { + tr.Set(p[i]) + if i%(cloneTestSize/5) == 0 { + wg.Add(1) + c := tr.Clone() + go populate(&c, i+1, p, wg, trees) + } + } + wg.Done() + } + + var tr btree + var trees []*btree + p := perm(cloneTestSize) + var wg sync.WaitGroup + wg.Add(1) + go populate(&tr, 0, p, &wg, &trees) + wg.Wait() + + t.Logf("Starting equality checks on %d trees", len(trees)) + want := rang(0, cloneTestSize-1) + for i, tree := range trees { + if !reflect.DeepEqual(want, all(tree)) { + t.Errorf("tree %v mismatch", i) + } + } + + t.Log("Removing half of cmds from first half") + toRemove := want[cloneTestSize/2:] + for i := 0; i < len(trees)/2; i++ { + tree := trees[i] + wg.Add(1) + go func() { + for _, cmd := range toRemove { + tree.Delete(cmd) + } + wg.Done() + }() + } + wg.Wait() + + t.Log("Checking all values again") + for i, tree := range trees { + var wantpart []*cmd + if i < len(trees)/2 { + wantpart = want[:cloneTestSize/2] + } else { + wantpart = want + } + if got := all(tree); !reflect.DeepEqual(wantpart, got) { + t.Errorf("tree %v mismatch, want %v got %v", i, len(want), len(got)) + } + } +} + func TestBTreeCmp(t *testing.T) { testCases := []struct { spanA, spanB roachpb.Span @@ -528,6 +591,25 @@ func perm(n int) (out []*cmd) { return out } +// rang returns an ordered list of cmds with spans in the range [m, n]. +func rang(m, n int) (out []*cmd) { + for i := m; i <= n; i++ { + out = append(out, newCmd(spanWithEnd(i, i+1))) + } + return out +} + +// all extracts all cmds from a tree in order as a slice. +func all(tr *btree) (out []*cmd) { + it := tr.MakeIter() + it.First() + for it.Valid() { + out = append(out, it.Cmd()) + it.Next() + } + return out +} + func forBenchmarkSizes(b *testing.B, f func(b *testing.B, count int)) { for _, count := range []int{16, 128, 1024, 8192, 65536} { b.Run(fmt.Sprintf("count=%d", count), func(b *testing.B) { @@ -594,6 +676,48 @@ func BenchmarkBTreeDeleteInsert(b *testing.B) { }) } +func BenchmarkBTreeDeleteInsertCloneOnce(b *testing.B) { + forBenchmarkSizes(b, func(b *testing.B, count int) { + insertP := perm(count) + var tr btree + for _, cmd := range insertP { + tr.Set(cmd) + } + tr = tr.Clone() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cmd := insertP[i%count] + tr.Delete(cmd) + tr.Set(cmd) + } + }) +} + +func BenchmarkBTreeDeleteInsertCloneEachTime(b *testing.B) { + for _, reset := range []bool{false, true} { + b.Run(fmt.Sprintf("reset=%t", reset), func(b *testing.B) { + forBenchmarkSizes(b, func(b *testing.B, count int) { + insertP := perm(count) + var tr, trReset btree + for _, cmd := range insertP { + tr.Set(cmd) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + cmd := insertP[i%count] + if reset { + trReset.Reset() + trReset = tr + } + tr = tr.Clone() + tr.Delete(cmd) + tr.Set(cmd) + } + }) + }) + } +} + func BenchmarkBTreeIterSeekGE(b *testing.B) { forBenchmarkSizes(b, func(b *testing.B, count int) { var spans []roachpb.Span