diff --git a/pkg/storage/command_queue_test.go b/pkg/storage/command_queue_test.go index 77e8fb1cf4a5..0ff1e1d2208a 100644 --- a/pkg/storage/command_queue_test.go +++ b/pkg/storage/command_queue_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) var zeroTS = hlc.Timestamp{} @@ -805,13 +806,14 @@ func assertExpectedPrereqs( } } -func BenchmarkCommandQueueGetPrereqsAllReadOnly(b *testing.B) { +func BenchmarkCommandQueueReadOnlyMix(b *testing.B) { // Test read-only getPrereqs performance for various number of command queue // entries. See #13627 where a previous implementation of // CommandQueue.getOverlaps had O(n) performance in this setup. Since reads // do not wait on other reads, expected performance is O(1). for _, size := range []int{1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + var mu syncutil.Mutex cq := NewCommandQueue(true) spans := []roachpb.Span{{ Key: roachpb.Key("aaaaaaaaaa"), @@ -823,7 +825,10 @@ func BenchmarkCommandQueueGetPrereqsAllReadOnly(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _ = cq.getPrereqs(true, zeroTS, spans) + mu.Lock() + prereqs := cq.getPrereqs(true, zeroTS, spans) + cq.add(true, zeroTS, prereqs, spans) + mu.Unlock() } }) } @@ -833,32 +838,40 @@ func BenchmarkCommandQueueReadWriteMix(b *testing.B) { // Test performance with a mixture of reads and writes with a high number // of reads per write. // See #15544. - for _, readsPerWrite := range []int{1, 4, 16, 64, 128, 256} { + for _, readsPerWrite := range []int{0, 1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("readsPerWrite=%d", readsPerWrite), func(b *testing.B) { - for i := 0; i < b.N; i++ { - totalCmds := 1 << 10 - liveCmdQueue := make(chan *cmd, 16) - cq := NewCommandQueue(true /* coveringOptimization */) - for j := 0; j < totalCmds; j++ { - a, b := randBytes(100), randBytes(100) - // Overwrite first byte so that we do not mix local and global ranges - a[0], b[0] = 'a', 'a' - if bytes.Compare(a, b) > 0 { - a, b = b, a - } - spans := []roachpb.Span{{ - Key: roachpb.Key(a), - EndKey: roachpb.Key(b), - }} - var cmd *cmd - readOnly := j%(readsPerWrite+1) != 0 - prereqs := cq.getPrereqs(readOnly, zeroTS, spans) - cmd = cq.add(readOnly, zeroTS, prereqs, spans) - if len(liveCmdQueue) == cap(liveCmdQueue) { - cq.remove(<-liveCmdQueue) - } - liveCmdQueue <- cmd + var mu syncutil.Mutex + cq := NewCommandQueue(true /* coveringOptimization */) + liveCmdQueue := make(chan *cmd, 16) + + spans := make([][]roachpb.Span, b.N) + for i := range spans { + a, b := randBytes(100), randBytes(100) + // Overwrite first byte so that we do not mix local and global ranges + a[0], b[0] = 'a', 'a' + if bytes.Compare(a, b) > 0 { + a, b = b, a + } + spans[i] = []roachpb.Span{{ + Key: roachpb.Key(a), + EndKey: roachpb.Key(b), + }} + } + + b.ResetTimer() + for i := range spans { + mu.Lock() + readOnly := i%(readsPerWrite+1) != 0 + prereqs := cq.getPrereqs(readOnly, zeroTS, spans[i]) + cmd := cq.add(readOnly, zeroTS, prereqs, spans[i]) + mu.Unlock() + + if len(liveCmdQueue) == cap(liveCmdQueue) { + mu.Lock() + cq.remove(<-liveCmdQueue) + mu.Unlock() } + liveCmdQueue <- cmd } }) } diff --git a/pkg/storage/spanlatch/doc.go b/pkg/storage/spanlatch/doc.go new file mode 100644 index 000000000000..01b9c5fb0d2d --- /dev/null +++ b/pkg/storage/spanlatch/doc.go @@ -0,0 +1,42 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +/* +Package spanlatch provides a latch management structure for serializing access +to keys and key ranges. Latch acquitions affecting keys or key ranges must wait +on already-acquired latches which overlap their key range to be released. + +The evolution of complexity can best be understood as a series of incremental +changes, each in the name of increased lock granularity to reduce contention and +enable more concurrency between requests. The structure can trace its lineage +back to a simple sync.Mutex. From there, the structure evolved through the +following progression: + + * The structure began by enforcing strict mutual exclusion for access to any + keys. Conceptually, it was a sync.Mutex. + * Concurrent read-only access to keys and key ranges was permitted. Read and + writes were serialized with each other, writes were serialized with each other, + but no ordering was enforced between reads. Conceptually, the structure became + a sync.RWMutex. + * The structure became key range-aware and concurrent access to non-overlapping + key ranges was permitted. Conceptually, the structure became an interval + tree of sync.RWMutexes. + * The structure became timestamp-aware and concurrent access of non-causal + read and write pairs was permitted. The effect of this was that reads no + longer waited for writes at higher timestamps and writes no longer waited + for reads at lower timestamps. Conceptually, the structure became an interval + tree of timestamp-aware sync.RWMutexes. + +*/ +package spanlatch diff --git a/pkg/storage/cmdq/interval_btree.go b/pkg/storage/spanlatch/interval_btree.go similarity index 71% rename from pkg/storage/cmdq/interval_btree.go rename to pkg/storage/spanlatch/interval_btree.go index 770bbc90c28d..013584ac2ba8 100644 --- a/pkg/storage/cmdq/interval_btree.go +++ b/pkg/storage/spanlatch/interval_btree.go @@ -12,7 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. -package cmdq +package spanlatch import ( "bytes" @@ -25,23 +25,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" ) -// TODO(nvanbenschoten): -// 2. Add synchronized node and leafNode freelists -// 3. Introduce immutability and a copy-on-write policy: -// 4. Describe pedigree, changes, etc. of this implementation - const ( - degree = 16 - maxCmds = 2*degree - 1 - minCmds = degree - 1 + degree = 16 + maxLatches = 2*degree - 1 + minLatches = degree - 1 ) -// TODO(nvanbenschoten): remove. -type cmd struct { - id int64 - span roachpb.Span -} - // cmp returns a value indicating the sort order relationship between // a and b. The comparison is performed lexicographically on // (a.span.Key, a.span.EndKey, a.id) @@ -55,7 +44,7 @@ type cmd struct { // c == 0 if (a.span.Key, a.span.EndKey, a.id) == (b.span.Key, b.span.EndKey, b.id) // c == 1 if (a.span.Key, a.span.EndKey, a.id) > (b.span.Key, b.span.EndKey, b.id) // -func cmp(a, b *cmd) int { +func cmp(a, b *latch) int { c := bytes.Compare(a.span.Key, b.span.Key) if c != 0 { return c @@ -93,7 +82,7 @@ func (b keyBound) compare(o keyBound) int { return -1 } -func (b keyBound) contains(a *cmd) bool { +func (b keyBound) contains(a *latch) bool { c := bytes.Compare(a.span.Key, b.key) if c == 0 { return b.inc @@ -101,7 +90,7 @@ func (b keyBound) contains(a *cmd) bool { return c < 0 } -func upperBound(c *cmd) keyBound { +func upperBound(c *latch) keyBound { if len(c.span.EndKey) != 0 { return keyBound{key: c.span.EndKey} } @@ -109,16 +98,16 @@ func upperBound(c *cmd) keyBound { } type leafNode struct { - ref int32 - count int16 - leaf bool - max keyBound - cmds [maxCmds]*cmd + ref int32 + count int16 + leaf bool + max keyBound + latches [maxLatches]*latch } type node struct { leafNode - children [maxCmds + 1]*node + children [maxLatches + 1]*node } func leafToNode(ln *leafNode) *node { @@ -218,7 +207,7 @@ func (n *node) clone() *node { // triggering the race detector and looking like a data race. c.count = n.count c.max = n.max - c.cmds = n.cmds + c.latches = n.latches if !c.leaf { // Copy children and increase each refcount. c.children = n.children @@ -229,41 +218,41 @@ func (n *node) clone() *node { return c } -func (n *node) insertAt(index int, c *cmd, nd *node) { +func (n *node) insertAt(index int, la *latch, nd *node) { if index < int(n.count) { - copy(n.cmds[index+1:n.count+1], n.cmds[index:n.count]) + copy(n.latches[index+1:n.count+1], n.latches[index:n.count]) if !n.leaf { copy(n.children[index+2:n.count+2], n.children[index+1:n.count+1]) } } - n.cmds[index] = c + n.latches[index] = la if !n.leaf { n.children[index+1] = nd } n.count++ } -func (n *node) pushBack(c *cmd, nd *node) { - n.cmds[n.count] = c +func (n *node) pushBack(la *latch, nd *node) { + n.latches[n.count] = la if !n.leaf { n.children[n.count+1] = nd } n.count++ } -func (n *node) pushFront(c *cmd, nd *node) { +func (n *node) pushFront(la *latch, nd *node) { if !n.leaf { copy(n.children[1:n.count+2], n.children[:n.count+1]) n.children[0] = nd } - copy(n.cmds[1:n.count+1], n.cmds[:n.count]) - n.cmds[0] = c + copy(n.latches[1:n.count+1], n.latches[:n.count]) + n.latches[0] = la n.count++ } // removeAt removes a value at a given index, pulling all subsequent values // back. -func (n *node) removeAt(index int) (*cmd, *node) { +func (n *node) removeAt(index int) (*latch, *node) { var child *node if !n.leaf { child = n.children[index+1] @@ -271,17 +260,17 @@ func (n *node) removeAt(index int) (*cmd, *node) { n.children[n.count] = nil } n.count-- - out := n.cmds[index] - copy(n.cmds[index:n.count], n.cmds[index+1:n.count+1]) - n.cmds[n.count] = nil + out := n.latches[index] + copy(n.latches[index:n.count], n.latches[index+1:n.count+1]) + n.latches[n.count] = nil return out, child } // popBack removes and returns the last element in the list. -func (n *node) popBack() (*cmd, *node) { +func (n *node) popBack() (*latch, *node) { n.count-- - out := n.cmds[n.count] - n.cmds[n.count] = nil + out := n.latches[n.count] + n.latches[n.count] = nil if n.leaf { return out, nil } @@ -291,7 +280,7 @@ func (n *node) popBack() (*cmd, *node) { } // popFront removes and returns the first element in the list. -func (n *node) popFront() (*cmd, *node) { +func (n *node) popFront() (*latch, *node) { n.count-- var child *node if !n.leaf { @@ -299,23 +288,23 @@ func (n *node) popFront() (*cmd, *node) { copy(n.children[:n.count+1], n.children[1:n.count+2]) n.children[n.count+1] = nil } - out := n.cmds[0] - copy(n.cmds[:n.count], n.cmds[1:n.count+1]) - n.cmds[n.count] = nil + out := n.latches[0] + copy(n.latches[:n.count], n.latches[1:n.count+1]) + n.latches[n.count] = nil return out, child } -// find returns the index where the given cmd should be inserted into this -// list. 'found' is true if the cmd already exists in the list at the given +// find returns the index where the given latch should be inserted into this +// list. 'found' is true if the latch already exists in the list at the given // index. -func (n *node) find(c *cmd) (index int, found bool) { +func (n *node) find(la *latch) (index int, found bool) { // Logic copied from sort.Search. Inlining this gave // an 11% speedup on BenchmarkBTreeDeleteInsert. i, j := 0, int(n.count) for i < j { h := int(uint(i+j) >> 1) // avoid overflow when computing h // i ≤ h < j - v := cmp(c, n.cmds[h]) + v := cmp(la, n.latches[h]) if v == 0 { return h, true } else if v > 0 { @@ -328,8 +317,8 @@ func (n *node) find(c *cmd) (index int, found bool) { } // split splits the given node at the given index. The current node shrinks, -// and this function returns the cmd that existed at that index and a new node -// containing all cmds/children after it. +// and this function returns the latch that existed at that index and a new +// node containing all latches/children after it. // // Before: // @@ -348,8 +337,8 @@ func (n *node) find(c *cmd) (index int, found bool) { // | x | | z | // +-----------+ +-----------+ // -func (n *node) split(i int) (*cmd, *node) { - out := n.cmds[i] +func (n *node) split(i int) (*latch, *node) { + out := n.latches[i] var next *node if n.leaf { next = newLeafNode() @@ -357,9 +346,9 @@ func (n *node) split(i int) (*cmd, *node) { next = newNode() } next.count = n.count - int16(i+1) - copy(next.cmds[:], n.cmds[i+1:n.count]) + copy(next.latches[:], n.latches[i+1:n.count]) for j := int16(i); j < n.count; j++ { - n.cmds[j] = nil + n.latches[j] = nil } if !n.leaf { copy(next.children[:], n.children[i+1:n.count+1]) @@ -371,7 +360,7 @@ func (n *node) split(i int) (*cmd, *node) { next.max = next.findUpperBound() if n.max.compare(next.max) != 0 && n.max.compare(upperBound(out)) != 0 { - // If upper bound wasn't from new node or cmd + // If upper bound wasn't from new node or latch // at index i, it must still be from old node. } else { n.max = n.findUpperBound() @@ -379,64 +368,64 @@ func (n *node) split(i int) (*cmd, *node) { return out, next } -// insert inserts a cmd into the subtree rooted at this node, making sure no -// nodes in the subtree exceed maxCmds cmds. Returns true if an existing cmd was -// replaced and false if a command was inserted. Also returns whether the node's -// upper bound changes. -func (n *node) insert(c *cmd) (replaced, newBound bool) { - i, found := n.find(c) +// insert inserts a latch into the subtree rooted at this node, making sure no +// nodes in the subtree exceed maxLatches latches. Returns true if an existing +// latch was replaced and false if a latch was inserted. Also returns whether +// the node's upper bound changes. +func (n *node) insert(la *latch) (replaced, newBound bool) { + i, found := n.find(la) if found { - n.cmds[i] = c + n.latches[i] = la return true, false } if n.leaf { - n.insertAt(i, c, nil) - return false, n.adjustUpperBoundOnInsertion(c, nil) + n.insertAt(i, la, nil) + return false, n.adjustUpperBoundOnInsertion(la, nil) } - if n.children[i].count >= maxCmds { - splitcmd, splitNode := mut(&n.children[i]).split(maxCmds / 2) - n.insertAt(i, splitcmd, splitNode) + if n.children[i].count >= maxLatches { + splitLa, splitNode := mut(&n.children[i]).split(maxLatches / 2) + n.insertAt(i, splitLa, splitNode) - switch cmp := cmp(c, n.cmds[i]); { + switch cmp := cmp(la, n.latches[i]); { case cmp < 0: // no change, we want first split node case cmp > 0: i++ // we want second split node default: - n.cmds[i] = c + n.latches[i] = la return true, false } } - replaced, newBound = mut(&n.children[i]).insert(c) + replaced, newBound = mut(&n.children[i]).insert(la) if newBound { - newBound = n.adjustUpperBoundOnInsertion(c, nil) + newBound = n.adjustUpperBoundOnInsertion(la, nil) } return replaced, newBound } -// removeMax removes and returns the maximum cmd from the subtree rooted at -// this node. -func (n *node) removeMax() *cmd { +// removeMax removes and returns the maximum latch from the subtree rooted +// at this node. +func (n *node) removeMax() *latch { if n.leaf { n.count-- - out := n.cmds[n.count] - n.cmds[n.count] = nil + out := n.latches[n.count] + n.latches[n.count] = nil n.adjustUpperBoundOnRemoval(out, nil) return out } child := mut(&n.children[n.count]) - if child.count <= minCmds { + if child.count <= minLatches { n.rebalanceOrMerge(int(n.count)) return n.removeMax() } return child.removeMax() } -// remove removes a cmd from the subtree rooted at this node. Returns -// the cmd that was removed or nil if no matching command was found. +// remove removes a latch from the subtree rooted at this node. Returns +// the latch that was removed or nil if no matching latch was found. // Also returns whether the node's upper bound changes. -func (n *node) remove(c *cmd) (out *cmd, newBound bool) { - i, found := n.find(c) +func (n *node) remove(la *latch) (out *latch, newBound bool) { + i, found := n.find(la) if n.leaf { if found { out, _ = n.removeAt(i) @@ -444,20 +433,20 @@ func (n *node) remove(c *cmd) (out *cmd, newBound bool) { } return nil, false } - if n.children[i].count <= minCmds { + if n.children[i].count <= minLatches { // Child not large enough to remove from. n.rebalanceOrMerge(i) - return n.remove(c) + return n.remove(la) } child := mut(&n.children[i]) if found { - // Replace the cmd being removed with the max cmd in our left child. - out = n.cmds[i] - n.cmds[i] = child.removeMax() + // Replace the latch being removed with the max latch in our left child. + out = n.latches[i] + n.latches[i] = child.removeMax() return out, n.adjustUpperBoundOnRemoval(out, nil) } - // Cmd is not in this node and child is large enough to remove from. - out, newBound = child.remove(c) + // Latch is not in this node and child is large enough to remove from. + out, newBound = child.remove(la) if newBound { newBound = n.adjustUpperBoundOnRemoval(out, nil) } @@ -465,10 +454,10 @@ func (n *node) remove(c *cmd) (out *cmd, newBound bool) { } // rebalanceOrMerge grows child 'i' to ensure it has sufficient room to remove -// a cmd from it while keeping it at or above minCmds. +// a latch from it while keeping it at or above minLatches. func (n *node) rebalanceOrMerge(i int) { switch { - case i > 0 && n.children[i-1].count > minCmds: + case i > 0 && n.children[i-1].count > minLatches: // Rebalance from left sibling. // // +-----------+ @@ -499,15 +488,15 @@ func (n *node) rebalanceOrMerge(i int) { // left := mut(&n.children[i-1]) child := mut(&n.children[i]) - xCmd, grandChild := left.popBack() - yCmd := n.cmds[i-1] - child.pushFront(yCmd, grandChild) - n.cmds[i-1] = xCmd + xLa, grandChild := left.popBack() + yLa := n.latches[i-1] + child.pushFront(yLa, grandChild) + n.latches[i-1] = xLa - left.adjustUpperBoundOnRemoval(xCmd, grandChild) - child.adjustUpperBoundOnInsertion(yCmd, grandChild) + left.adjustUpperBoundOnRemoval(xLa, grandChild) + child.adjustUpperBoundOnInsertion(yLa, grandChild) - case i < int(n.count) && n.children[i+1].count > minCmds: + case i < int(n.count) && n.children[i+1].count > minLatches: // Rebalance from right sibling. // // +-----------+ @@ -538,13 +527,13 @@ func (n *node) rebalanceOrMerge(i int) { // right := mut(&n.children[i+1]) child := mut(&n.children[i]) - xCmd, grandChild := right.popFront() - yCmd := n.cmds[i] - child.pushBack(yCmd, grandChild) - n.cmds[i] = xCmd + xLa, grandChild := right.popFront() + yLa := n.latches[i] + child.pushBack(yLa, grandChild) + n.latches[i] = xLa - right.adjustUpperBoundOnRemoval(xCmd, grandChild) - child.adjustUpperBoundOnInsertion(yCmd, grandChild) + right.adjustUpperBoundOnRemoval(xLa, grandChild) + child.adjustUpperBoundOnInsertion(yLa, grandChild) default: // Merge with either the left or right sibling. @@ -575,15 +564,15 @@ func (n *node) rebalanceOrMerge(i int) { child := mut(&n.children[i]) // Make mergeChild mutable, bumping the refcounts on its children if necessary. _ = mut(&n.children[i+1]) - mergeCmd, mergeChild := n.removeAt(i) - child.cmds[child.count] = mergeCmd - copy(child.cmds[child.count+1:], mergeChild.cmds[:mergeChild.count]) + mergeLa, mergeChild := n.removeAt(i) + child.latches[child.count] = mergeLa + copy(child.latches[child.count+1:], mergeChild.latches[:mergeChild.count]) if !child.leaf { copy(child.children[child.count+1:], mergeChild.children[:mergeChild.count+1]) } child.count += mergeChild.count + 1 - child.adjustUpperBoundOnInsertion(mergeCmd, mergeChild) + child.adjustUpperBoundOnInsertion(mergeLa, mergeChild) mergeChild.decRef(false /* recursive */) } } @@ -593,7 +582,7 @@ func (n *node) rebalanceOrMerge(i int) { func (n *node) findUpperBound() keyBound { var max keyBound for i := int16(0); i < n.count; i++ { - up := upperBound(n.cmds[i]) + up := upperBound(n.latches[i]) if max.compare(up) < 0 { max = up } @@ -610,10 +599,10 @@ func (n *node) findUpperBound() keyBound { } // adjustUpperBoundOnInsertion adjusts the upper key bound for this node -// given a cmd and an optional child node that was inserted. Returns true -// is the upper bound was changed and false if not. -func (n *node) adjustUpperBoundOnInsertion(c *cmd, child *node) bool { - up := upperBound(c) +// given a latch and an optional child node that was inserted. Returns +// true is the upper bound was changed and false if not. +func (n *node) adjustUpperBoundOnInsertion(la *latch, child *node) bool { + up := upperBound(la) if child != nil { if up.compare(child.max) < 0 { up = child.max @@ -627,29 +616,31 @@ func (n *node) adjustUpperBoundOnInsertion(c *cmd, child *node) bool { } // adjustUpperBoundOnRemoval adjusts the upper key bound for this node -// given a cmd and an optional child node that were removed. Returns true -// is the upper bound was changed and false if not. -func (n *node) adjustUpperBoundOnRemoval(c *cmd, child *node) bool { - up := upperBound(c) +// given a latch and an optional child node that were removed. Returns +// true is the upper bound was changed and false if not. +func (n *node) adjustUpperBoundOnRemoval(la *latch, child *node) bool { + up := upperBound(la) if child != nil { if up.compare(child.max) < 0 { up = child.max } } if n.max.compare(up) == 0 { + // up was previous upper bound of n. n.max = n.findUpperBound() - return true + return n.max.compare(up) != 0 } return false } // btree is an implementation of an augmented interval B-Tree. // -// btree stores cmds in an ordered structure, allowing easy insertion, +// btree stores latches in an ordered structure, allowing easy insertion, // removal, and iteration. It represents intervals and permits an interval // search operation following the approach laid out in CLRS, Chapter 14. -// The B-Tree stores cmds in order based on their start key and each B-Tree -// node maintains the upper-bound end key of all cmds in its subtree. +// The B-Tree stores latches in order based on their start key and each +// B-Tree node maintains the upper-bound end key of all latches in its +// subtree. // // Write operations are not safe for concurrent mutation by multiple // goroutines, but Read operations are. @@ -658,7 +649,7 @@ type btree struct { length int } -// Reset removes all cmds from the btree. In doing so, it allows memory +// Reset removes all latches from the btree. In doing so, it allows memory // held by the btree to be recycled. Failure to call this method before // letting a btree be GCed is safe in that it won't cause a memory leak, // but it will prevent btree nodes from being efficiently re-used. @@ -679,12 +670,12 @@ func (t *btree) Clone() btree { return c } -// Delete removes a cmd equal to the passed in cmd from the tree. -func (t *btree) Delete(c *cmd) { +// Delete removes a latch equal to the passed in latch from the tree. +func (t *btree) Delete(la *latch) { if t.root == nil || t.root.count == 0 { return } - if out, _ := mut(&t.root).remove(c); out != nil { + if out, _ := mut(&t.root).remove(la); out != nil { t.length-- } if t.root.count == 0 && !t.root.leaf { @@ -694,22 +685,22 @@ func (t *btree) Delete(c *cmd) { } } -// Set adds the given cmd to the tree. If a cmd in the tree already equals -// the given one, it is replaced with the new cmd. -func (t *btree) Set(c *cmd) { +// Set adds the given latch to the tree. If a latch in the tree already +// equals the given one, it is replaced with the new latch. +func (t *btree) Set(la *latch) { if t.root == nil { t.root = newLeafNode() - } else if t.root.count >= maxCmds { - splitcmd, splitNode := mut(&t.root).split(maxCmds / 2) + } else if t.root.count >= maxLatches { + splitLa, splitNode := mut(&t.root).split(maxLatches / 2) newRoot := newNode() newRoot.count = 1 - newRoot.cmds[0] = splitcmd + newRoot.latches[0] = splitLa newRoot.children[0] = t.root newRoot.children[1] = splitNode newRoot.max = newRoot.findUpperBound() t.root = newRoot } - if replaced, _ := mut(&t.root).insert(c); !replaced { + if replaced, _ := mut(&t.root).insert(la); !replaced { t.length++ } } @@ -735,7 +726,7 @@ func (t *btree) Height() int { return h } -// Len returns the number of cmds currently in the tree. +// Len returns the number of latches currently in the tree. func (t *btree) Len() int { return t.length } @@ -757,7 +748,7 @@ func (n *node) writeString(b *strings.Builder) { if i != 0 { b.WriteString(",") } - b.WriteString(n.cmds[i].span.String()) + b.WriteString(n.latches[i].span.String()) } return } @@ -766,7 +757,7 @@ func (n *node) writeString(b *strings.Builder) { n.children[i].writeString(b) b.WriteString(")") if i < n.count { - b.WriteString(n.cmds[i].span.String()) + b.WriteString(n.latches[i].span.String()) } } } @@ -856,14 +847,15 @@ func (i *iterator) ascend() { i.pos = f.pos } -// SeekGE seeks to the first cmd greater-than or equal to the provided cmd. -func (i *iterator) SeekGE(c *cmd) { +// SeekGE seeks to the first latch greater-than or equal to the provided +// latch. +func (i *iterator) SeekGE(la *latch) { i.reset() if i.n == nil { return } for { - pos, found := i.n.find(c) + pos, found := i.n.find(la) i.pos = int16(pos) if found { return @@ -878,14 +870,14 @@ func (i *iterator) SeekGE(c *cmd) { } } -// SeekLT seeks to the first cmd less-than the provided cmd. -func (i *iterator) SeekLT(c *cmd) { +// SeekLT seeks to the first latch less-than the provided latch. +func (i *iterator) SeekLT(la *latch) { i.reset() if i.n == nil { return } for { - pos, found := i.n.find(c) + pos, found := i.n.find(la) i.pos = int16(pos) if found || i.n.leaf { i.Prev() @@ -895,7 +887,7 @@ func (i *iterator) SeekLT(c *cmd) { } } -// First seeks to the first cmd in the btree. +// First seeks to the first latch in the btree. func (i *iterator) First() { i.reset() if i.n == nil { @@ -907,7 +899,7 @@ func (i *iterator) First() { i.pos = 0 } -// Last seeks to the last cmd in the btree. +// Last seeks to the last latch in the btree. func (i *iterator) Last() { i.reset() if i.n == nil { @@ -919,7 +911,7 @@ func (i *iterator) Last() { i.pos = i.n.count - 1 } -// Next positions the iterator to the cmd immediately following +// Next positions the iterator to the latch immediately following // its current position. func (i *iterator) Next() { if i.n == nil { @@ -944,7 +936,7 @@ func (i *iterator) Next() { i.pos = 0 } -// Prev positions the iterator to the cmd immediately preceding +// Prev positions the iterator to the latch immediately preceding // its current position. func (i *iterator) Prev() { if i.n == nil { @@ -975,31 +967,31 @@ func (i *iterator) Valid() bool { return i.pos >= 0 && i.pos < i.n.count } -// Cmd returns the cmd at the iterator's current position. It is illegal -// to call Cmd if the iterator is not valid. -func (i *iterator) Cmd() *cmd { - return i.n.cmds[i.pos] +// Cur returns the latch at the iterator's current position. It is illegal +// to call Latch if the iterator is not valid. +func (i *iterator) Cur() *latch { + return i.n.latches[i.pos] } -// An overlap scan is a scan over all cmds that overlap with the provided cmd -// in order of the overlapping cmds' start keys. The goal of the scan is to -// minimize the number of key comparisons performed in total. The algorithm -// operates based on the following two invariants maintained by augmented -// interval btree: -// 1. all cmds are sorted in the btree based on their start key. -// 2. all btree nodes maintain the upper bound end key of all cmds +// An overlap scan is a scan over all latches that overlap with the provided +// latch in order of the overlapping latches' start keys. The goal of the scan +// is to minimize the number of key comparisons performed in total. The +// algorithm operates based on the following two invariants maintained by +// augmented interval btree: +// 1. all latches are sorted in the btree based on their start key. +// 2. all btree nodes maintain the upper bound end key of all latches // in their subtree. // // The scan algorithm starts in "unconstrained minimum" and "unconstrained // maximum" states. To enter a "constrained minimum" state, the scan must reach -// cmds in the tree with start keys above the search range's start key. Because -// cmds in the tree are sorted by start key, once the scan enters the +// latches in the tree with start keys above the search range's start key. +// Because latches in the tree are sorted by start key, once the scan enters the // "constrained minimum" state it will remain there. To enter a "constrained // maximum" state, the scan must determine the first child btree node in a given -// subtree that can have cmds with start keys above the search range's end key. -// The scan then remains in the "constrained maximum" state until it traverse -// into this child node, at which point it moves to the "unconstrained maximum" -// state again. +// subtree that can have latches with start keys above the search range's end +// key. The scan then remains in the "constrained maximum" state until it +// traverse into this child node, at which point it moves to the "unconstrained +// maximum" state again. // // The scan algorithm works like a standard btree forward scan with the // following augmentations: @@ -1014,19 +1006,19 @@ func (i *iterator) Cmd() *cmd { // than the soft lower bound constraint. // 4. once the initial tranversal completes and the scan is in the left-most // btree node whose upper bound overlaps the search range, key comparisons -// must be performed with each cmd in the tree. This is necessary because -// any of these cmds may have end keys that cause them to overlap with the +// must be performed with each latch in the tree. This is necessary because +// any of these latches may have end keys that cause them to overlap with the // search range. -// 5. once the scan reaches the lower bound constraint position (the first cmd +// 5. once the scan reaches the lower bound constraint position (the first latch // with a start key equal to or greater than the search range's start key), // it can begin scaning without performing key comparisons. This is allowed -// because all commands from this point forward will have end keys that are +// because all latches from this point forward will have end keys that are // greater than the search range's start key. // 6. once the scan reaches the upper bound constraint position, it terminates. -// It does so because the cmd at this position is the first cmd with a start -// key larger than the search range's end key. +// It does so because the latch at this position is the first latch with a +// start key larger than the search range's end key. type overlapScan struct { - c *cmd // search cmd + la *latch // search latch // The "soft" lower-bound constraint. constrMinN *node @@ -1038,27 +1030,27 @@ type overlapScan struct { constrMaxPos int16 } -// FirstOverlap seeks to the first cmd in the btree that overlaps with the -// provided search cmd. -func (i *iterator) FirstOverlap(c *cmd) { +// FirstOverlap seeks to the first latch in the btree that overlaps with the +// provided search latch. +func (i *iterator) FirstOverlap(la *latch) { i.reset() if i.n == nil { return } i.pos = 0 - i.o = overlapScan{c: c} + i.o = overlapScan{la: la} i.constrainMinSearchBounds() i.constrainMaxSearchBounds() i.findNextOverlap() } -// NextOverlap positions the iterator to the cmd immediately following -// its current position that overlaps with the search cmd. +// NextOverlap positions the iterator to the latch immediately following +// its current position that overlaps with the search latch. func (i *iterator) NextOverlap() { if i.n == nil { return } - if i.o.c == nil { + if i.o.la == nil { // Invalid. Mixed overlap scan with non-overlap scan. i.pos = i.n.count return @@ -1068,18 +1060,18 @@ func (i *iterator) NextOverlap() { } func (i *iterator) constrainMinSearchBounds() { - k := i.o.c.span.Key + k := i.o.la.span.Key j := sort.Search(int(i.n.count), func(j int) bool { - return bytes.Compare(k, i.n.cmds[j].span.Key) <= 0 + return bytes.Compare(k, i.n.latches[j].span.Key) <= 0 }) i.o.constrMinN = i.n i.o.constrMinPos = int16(j) } func (i *iterator) constrainMaxSearchBounds() { - up := upperBound(i.o.c) + up := upperBound(i.o.la) j := sort.Search(int(i.n.count), func(j int) bool { - return !up.contains(i.n.cmds[j]) + return !up.contains(i.n.latches[j]) }) i.o.constrMaxN = i.n i.o.constrMaxPos = int16(j) @@ -1092,7 +1084,7 @@ func (i *iterator) findNextOverlap() { i.ascend() } else if !i.n.leaf { // Iterate down tree. - if i.o.constrMinReached || i.n.children[i.pos].max.contains(i.o.c) { + if i.o.constrMinReached || i.n.children[i.pos].max.contains(i.o.la) { par := i.n pos := i.pos i.descend(par, pos) @@ -1121,14 +1113,14 @@ func (i *iterator) findNextOverlap() { // Iterate across node. if i.pos < i.n.count { - // Check for overlapping cmd. + // Check for overlapping latch. if i.o.constrMinReached { // Fast-path to avoid span comparison. i.o.constrMinReached - // tells us that all cmds have end keys above our search + // tells us that all latches have end keys above our search // span's start key. return } - if upperBound(i.n.cmds[i.pos]).contains(i.o.c) { + if upperBound(i.n.latches[i.pos]).contains(i.o.la) { return } } diff --git a/pkg/storage/cmdq/interval_btree_test.go b/pkg/storage/spanlatch/interval_btree_test.go similarity index 73% rename from pkg/storage/cmdq/interval_btree_test.go rename to pkg/storage/spanlatch/interval_btree_test.go index 30fb6daa90cb..e50ac6395c29 100644 --- a/pkg/storage/cmdq/interval_btree_test.go +++ b/pkg/storage/spanlatch/interval_btree_test.go @@ -12,7 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. -package cmdq +package spanlatch import ( "fmt" @@ -62,14 +62,14 @@ func (t *btree) verifyCountAllowed(tt *testing.T) { func (n *node) verifyCountAllowed(t *testing.T, root bool) { if !root { - require.True(t, n.count >= minCmds, "cmd count %d must be in range [%d,%d]", n.count, minCmds, maxCmds) - require.True(t, n.count <= maxCmds, "cmd count %d must be in range [%d,%d]", n.count, minCmds, maxCmds) + require.True(t, n.count >= minLatches, "latch count %d must be in range [%d,%d]", n.count, minLatches, maxLatches) + require.True(t, n.count <= maxLatches, "latch count %d must be in range [%d,%d]", n.count, minLatches, maxLatches) } - for i, cmd := range n.cmds { + for i, la := range n.latches { if i < int(n.count) { - require.NotNil(t, cmd, "cmd below count") + require.NotNil(t, la, "latch below count") } else { - require.Nil(t, cmd, "cmd above count") + require.Nil(t, la, "latch above count") } } if !n.leaf { @@ -92,15 +92,15 @@ func (t *btree) isSorted(tt *testing.T) { func (n *node) isSorted(t *testing.T) { for i := int16(1); i < n.count; i++ { - require.True(t, cmp(n.cmds[i-1], n.cmds[i]) <= 0) + require.True(t, cmp(n.latches[i-1], n.latches[i]) <= 0) } if !n.leaf { for i := int16(0); i < n.count; i++ { prev := n.children[i] next := n.children[i+1] - require.True(t, cmp(prev.cmds[prev.count-1], n.cmds[i]) <= 0) - require.True(t, cmp(n.cmds[i], next.cmds[0]) <= 0) + require.True(t, cmp(prev.latches[prev.count-1], n.latches[i]) <= 0) + require.True(t, cmp(n.latches[i], next.latches[0]) <= 0) } } n.recurse(func(child *node, _ int16) { @@ -115,7 +115,7 @@ func (t *btree) isUpperBoundCorrect(tt *testing.T) { func (n *node) isUpperBoundCorrect(t *testing.T) { require.Equal(t, 0, n.findUpperBound().compare(n.max)) for i := int16(1); i < n.count; i++ { - require.True(t, upperBound(n.cmds[i]).compare(n.max) <= 0) + require.True(t, upperBound(n.latches[i]).compare(n.max) <= 0) } if !n.leaf { for i := int16(0); i <= n.count; i++ { @@ -170,6 +170,15 @@ func spanWithEnd(start, end int) roachpb.Span { } } +func spanWithMemo(i int, memo map[int]roachpb.Span) roachpb.Span { + if s, ok := memo[i]; ok { + return s + } + s := span(i) + memo[i] = s + return s +} + func randomSpan(rng *rand.Rand, n int) roachpb.Span { start := rng.Intn(n) end := rng.Intn(n + 1) @@ -179,17 +188,17 @@ func randomSpan(rng *rand.Rand, n int) roachpb.Span { return spanWithEnd(start, end) } -func newCmd(s roachpb.Span) *cmd { - return &cmd{span: s} +func newLatch(s roachpb.Span) *latch { + return &latch{span: s} } -func checkIter(t *testing.T, it iterator, start, end int) { +func checkIter(t *testing.T, it iterator, start, end int, spanMemo map[int]roachpb.Span) { i := start for it.First(); it.Valid(); it.Next() { - cmd := it.Cmd() - expected := span(i) - if !expected.Equal(cmd.span) { - t.Fatalf("expected %s, but found %s", expected, cmd.span) + la := it.Cur() + expected := spanWithMemo(i, spanMemo) + if !expected.Equal(la.span) { + t.Fatalf("expected %s, but found %s", expected, la.span) } i++ } @@ -199,22 +208,22 @@ func checkIter(t *testing.T, it iterator, start, end int) { for it.Last(); it.Valid(); it.Prev() { i-- - cmd := it.Cmd() - expected := span(i) - if !expected.Equal(cmd.span) { - t.Fatalf("expected %s, but found %s", expected, cmd.span) + la := it.Cur() + expected := spanWithMemo(i, spanMemo) + if !expected.Equal(la.span) { + t.Fatalf("expected %s, but found %s", expected, la.span) } } if i != start { t.Fatalf("expected %d, but at %d: %+v", start, i, it) } - all := newCmd(spanWithEnd(start, end)) + all := newLatch(spanWithEnd(start, end)) for it.FirstOverlap(all); it.Valid(); it.NextOverlap() { - cmd := it.Cmd() - expected := span(i) - if !expected.Equal(cmd.span) { - t.Fatalf("expected %s, but found %s", expected, cmd.span) + la := it.Cur() + expected := spanWithMemo(i, spanMemo) + if !expected.Equal(la.span) { + t.Fatalf("expected %s, but found %s", expected, la.span) } i++ } @@ -225,6 +234,7 @@ func checkIter(t *testing.T, it iterator, start, end int) { func TestBTree(t *testing.T) { var tr btree + spanMemo := make(map[int]roachpb.Span) // With degree == 16 (max-items/node == 31) we need 513 items in order for // there to be 3 levels in the tree. The count here is comfortably above @@ -233,42 +243,42 @@ func TestBTree(t *testing.T) { // Add keys in sorted order. for i := 0; i < count; i++ { - tr.Set(newCmd(span(i))) + tr.Set(newLatch(span(i))) tr.Verify(t) if e := i + 1; e != tr.Len() { t.Fatalf("expected length %d, but found %d", e, tr.Len()) } - checkIter(t, tr.MakeIter(), 0, i+1) + checkIter(t, tr.MakeIter(), 0, i+1, spanMemo) } // Delete keys in sorted order. for i := 0; i < count; i++ { - tr.Delete(newCmd(span(i))) + tr.Delete(newLatch(span(i))) tr.Verify(t) if e := count - (i + 1); e != tr.Len() { t.Fatalf("expected length %d, but found %d", e, tr.Len()) } - checkIter(t, tr.MakeIter(), i+1, count) + checkIter(t, tr.MakeIter(), i+1, count, spanMemo) } // Add keys in reverse sorted order. for i := 0; i < count; i++ { - tr.Set(newCmd(span(count - i))) + tr.Set(newLatch(span(count - i))) tr.Verify(t) if e := i + 1; e != tr.Len() { t.Fatalf("expected length %d, but found %d", e, tr.Len()) } - checkIter(t, tr.MakeIter(), count-i, count+1) + checkIter(t, tr.MakeIter(), count-i, count+1, spanMemo) } // Delete keys in reverse sorted order. for i := 0; i < count; i++ { - tr.Delete(newCmd(span(count - i))) + tr.Delete(newLatch(span(count - i))) tr.Verify(t) if e := count - (i + 1); e != tr.Len() { t.Fatalf("expected length %d, but found %d", e, tr.Len()) } - checkIter(t, tr.MakeIter(), 1, count-i) + checkIter(t, tr.MakeIter(), 1, count-i, spanMemo) } } @@ -277,38 +287,38 @@ func TestBTreeSeek(t *testing.T) { var tr btree for i := 0; i < count; i++ { - tr.Set(newCmd(span(i * 2))) + tr.Set(newLatch(span(i * 2))) } it := tr.MakeIter() for i := 0; i < 2*count-1; i++ { - it.SeekGE(newCmd(span(i))) + it.SeekGE(newLatch(span(i))) if !it.Valid() { t.Fatalf("%d: expected valid iterator", i) } - cmd := it.Cmd() + la := it.Cur() expected := span(2 * ((i + 1) / 2)) - if !expected.Equal(cmd.span) { - t.Fatalf("%d: expected %s, but found %s", i, expected, cmd.span) + if !expected.Equal(la.span) { + t.Fatalf("%d: expected %s, but found %s", i, expected, la.span) } } - it.SeekGE(newCmd(span(2*count - 1))) + it.SeekGE(newLatch(span(2*count - 1))) if it.Valid() { t.Fatalf("expected invalid iterator") } for i := 1; i < 2*count; i++ { - it.SeekLT(newCmd(span(i))) + it.SeekLT(newLatch(span(i))) if !it.Valid() { t.Fatalf("%d: expected valid iterator", i) } - cmd := it.Cmd() + la := it.Cur() expected := span(2 * ((i - 1) / 2)) - if !expected.Equal(cmd.span) { - t.Fatalf("%d: expected %s, but found %s", i, expected, cmd.span) + if !expected.Equal(la.span) { + t.Fatalf("%d: expected %s, but found %s", i, expected, la.span) } } - it.SeekLT(newCmd(span(0))) + it.SeekLT(newLatch(span(0))) if it.Valid() { t.Fatalf("expected invalid iterator") } @@ -316,17 +326,17 @@ func TestBTreeSeek(t *testing.T) { func TestBTreeSeekOverlap(t *testing.T) { const count = 513 - const size = 2 * maxCmds + const size = 2 * maxLatches var tr btree for i := 0; i < count; i++ { - tr.Set(newCmd(spanWithEnd(i, i+size+1))) + tr.Set(newLatch(spanWithEnd(i, i+size+1))) } // Iterate over overlaps with a point scan. it := tr.MakeIter() for i := 0; i < count+size; i++ { - it.FirstOverlap(newCmd(spanWithEnd(i, i))) + it.FirstOverlap(newLatch(spanWithEnd(i, i))) for j := 0; j < size+1; j++ { expStart := i - size + j if expStart < 0 { @@ -339,19 +349,19 @@ func TestBTreeSeekOverlap(t *testing.T) { if !it.Valid() { t.Fatalf("%d/%d: expected valid iterator", i, j) } - cmd := it.Cmd() + la := it.Cur() expected := spanWithEnd(expStart, expStart+size+1) - if !expected.Equal(cmd.span) { - t.Fatalf("%d: expected %s, but found %s", i, expected, cmd.span) + if !expected.Equal(la.span) { + t.Fatalf("%d: expected %s, but found %s", i, expected, la.span) } it.NextOverlap() } if it.Valid() { - t.Fatalf("%d: expected invalid iterator %v", i, it.Cmd()) + t.Fatalf("%d: expected invalid iterator %v", i, it.Cur()) } } - it.FirstOverlap(newCmd(span(count + size + 1))) + it.FirstOverlap(newLatch(span(count + size + 1))) if it.Valid() { t.Fatalf("expected invalid iterator") } @@ -359,7 +369,7 @@ func TestBTreeSeekOverlap(t *testing.T) { // Iterate over overlaps with a range scan. it = tr.MakeIter() for i := 0; i < count+size; i++ { - it.FirstOverlap(newCmd(spanWithEnd(i, i+size+1))) + it.FirstOverlap(newLatch(spanWithEnd(i, i+size+1))) for j := 0; j < 2*size+1; j++ { expStart := i - size + j if expStart < 0 { @@ -372,19 +382,19 @@ func TestBTreeSeekOverlap(t *testing.T) { if !it.Valid() { t.Fatalf("%d/%d: expected valid iterator", i, j) } - cmd := it.Cmd() + la := it.Cur() expected := spanWithEnd(expStart, expStart+size+1) - if !expected.Equal(cmd.span) { - t.Fatalf("%d: expected %s, but found %s", i, expected, cmd.span) + if !expected.Equal(la.span) { + t.Fatalf("%d: expected %s, but found %s", i, expected, la.span) } it.NextOverlap() } if it.Valid() { - t.Fatalf("%d: expected invalid iterator %v", i, it.Cmd()) + t.Fatalf("%d: expected invalid iterator %v", i, it.Cur()) } } - it.FirstOverlap(newCmd(span(count + size + 1))) + it.FirstOverlap(newLatch(span(count + size + 1))) if it.Valid() { t.Fatalf("expected invalid iterator") } @@ -398,55 +408,55 @@ func TestBTreeSeekOverlapRandom(t *testing.T) { var tr btree const count = 1000 - cmds := make([]*cmd, count) - cmdSpans := make([]int, count) + latches := make([]*latch, count) + latchSpans := make([]int, count) for j := 0; j < count; j++ { - var cmd *cmd + var la *latch end := rng.Intn(count + 10) if end <= j { end = j - cmd = newCmd(spanWithEnd(j, end)) + la = newLatch(spanWithEnd(j, end)) } else { - cmd = newCmd(spanWithEnd(j, end+1)) + la = newLatch(spanWithEnd(j, end+1)) } - tr.Set(cmd) - cmds[j] = cmd - cmdSpans[j] = end + tr.Set(la) + latches[j] = la + latchSpans[j] = end } const scanTrials = 100 for j := 0; j < scanTrials; j++ { - var scanCmd *cmd + var scanLa *latch scanStart := rng.Intn(count) scanEnd := rng.Intn(count + 10) if scanEnd <= scanStart { scanEnd = scanStart - scanCmd = newCmd(spanWithEnd(scanStart, scanEnd)) + scanLa = newLatch(spanWithEnd(scanStart, scanEnd)) } else { - scanCmd = newCmd(spanWithEnd(scanStart, scanEnd+1)) + scanLa = newLatch(spanWithEnd(scanStart, scanEnd+1)) } - var exp, found []*cmd - for startKey, endKey := range cmdSpans { + var exp, found []*latch + for startKey, endKey := range latchSpans { if startKey <= scanEnd && endKey >= scanStart { - exp = append(exp, cmds[startKey]) + exp = append(exp, latches[startKey]) } } it := tr.MakeIter() - it.FirstOverlap(scanCmd) + it.FirstOverlap(scanLa) for it.Valid() { - found = append(found, it.Cmd()) + found = append(found, it.Cur()) it.NextOverlap() } - require.Equal(t, len(exp), len(found), "search for %v", scanCmd.span) + require.Equal(t, len(exp), len(found), "search for %v", scanLa.span) } } } func TestBTreeCloneConcurrentOperations(t *testing.T) { - const cloneTestSize = 10000 + const cloneTestSize = 1000 p := perm(cloneTestSize) var trees []*btree @@ -489,14 +499,14 @@ func TestBTreeCloneConcurrentOperations(t *testing.T) { } } - t.Log("Removing half of cmds from first half") + t.Log("Removing half of latches from first half") toRemove := want[cloneTestSize/2:] for i := 0; i < len(trees)/2; i++ { tree := trees[i] wg.Add(1) go func() { - for _, cmd := range toRemove { - tree.Delete(cmd) + for _, la := range toRemove { + tree.Delete(la) } wg.Done() }() @@ -505,7 +515,7 @@ func TestBTreeCloneConcurrentOperations(t *testing.T) { t.Log("Checking all values again") for i, tree := range trees { - var wantpart []*cmd + var wantpart []*latch if i < len(trees)/2 { wantpart = want[:cloneTestSize/2] } else { @@ -520,7 +530,7 @@ func TestBTreeCloneConcurrentOperations(t *testing.T) { func TestBTreeCmp(t *testing.T) { testCases := []struct { spanA, spanB roachpb.Span - idA, idB int64 + idA, idB uint64 exp int }{ { @@ -583,9 +593,9 @@ func TestBTreeCmp(t *testing.T) { for _, tc := range testCases { name := fmt.Sprintf("cmp(%s:%d,%s:%d)", tc.spanA, tc.idA, tc.spanB, tc.idB) t.Run(name, func(t *testing.T) { - cmdA := &cmd{id: tc.idA, span: tc.spanA} - cmdB := &cmd{id: tc.idB, span: tc.spanB} - require.Equal(t, tc.exp, cmp(cmdA, cmdB)) + laA := &latch{id: tc.idA, span: tc.spanA} + laB := &latch{id: tc.idB, span: tc.spanB} + require.Equal(t, tc.exp, cmp(laA, laB)) }) } } @@ -610,28 +620,28 @@ func TestIterStack(t *testing.T) { // Benchmarks // ////////////////////////////////////////// -// perm returns a random permutation of cmds with spans in the range [0, n). -func perm(n int) (out []*cmd) { +// perm returns a random permutation of latches with spans in the range [0, n). +func perm(n int) (out []*latch) { for _, i := range rand.Perm(n) { - out = append(out, newCmd(spanWithEnd(i, i+1))) + out = append(out, newLatch(spanWithEnd(i, i+1))) } return out } -// rang returns an ordered list of cmds with spans in the range [m, n]. -func rang(m, n int) (out []*cmd) { +// rang returns an ordered list of latches with spans in the range [m, n]. +func rang(m, n int) (out []*latch) { for i := m; i <= n; i++ { - out = append(out, newCmd(spanWithEnd(i, i+1))) + out = append(out, newLatch(spanWithEnd(i, i+1))) } return out } -// all extracts all cmds from a tree in order as a slice. -func all(tr *btree) (out []*cmd) { +// all extracts all latches from a tree in order as a slice. +func all(tr *btree) (out []*latch) { it := tr.MakeIter() it.First() for it.Valid() { - out = append(out, it.Cmd()) + out = append(out, it.Cur()) it.Next() } return out @@ -651,8 +661,8 @@ func BenchmarkBTreeInsert(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; { var tr btree - for _, cmd := range insertP { - tr.Set(cmd) + for _, la := range insertP { + tr.Set(la) i++ if i >= b.N { return @@ -669,12 +679,12 @@ func BenchmarkBTreeDelete(b *testing.B) { for i := 0; i < b.N; { b.StopTimer() var tr btree - for _, cmd := range insertP { - tr.Set(cmd) + for _, la := range insertP { + tr.Set(la) } b.StartTimer() - for _, cmd := range removeP { - tr.Delete(cmd) + for _, la := range removeP { + tr.Delete(la) i++ if i >= b.N { return @@ -691,14 +701,14 @@ func BenchmarkBTreeDeleteInsert(b *testing.B) { forBenchmarkSizes(b, func(b *testing.B, count int) { insertP := perm(count) var tr btree - for _, cmd := range insertP { - tr.Set(cmd) + for _, la := range insertP { + tr.Set(la) } b.ResetTimer() for i := 0; i < b.N; i++ { - cmd := insertP[i%count] - tr.Delete(cmd) - tr.Set(cmd) + la := insertP[i%count] + tr.Delete(la) + tr.Set(la) } }) } @@ -707,15 +717,15 @@ func BenchmarkBTreeDeleteInsertCloneOnce(b *testing.B) { forBenchmarkSizes(b, func(b *testing.B, count int) { insertP := perm(count) var tr btree - for _, cmd := range insertP { - tr.Set(cmd) + for _, la := range insertP { + tr.Set(la) } tr = tr.Clone() b.ResetTimer() for i := 0; i < b.N; i++ { - cmd := insertP[i%count] - tr.Delete(cmd) - tr.Set(cmd) + la := insertP[i%count] + tr.Delete(la) + tr.Set(la) } }) } @@ -726,19 +736,19 @@ func BenchmarkBTreeDeleteInsertCloneEachTime(b *testing.B) { forBenchmarkSizes(b, func(b *testing.B, count int) { insertP := perm(count) var tr, trReset btree - for _, cmd := range insertP { - tr.Set(cmd) + for _, la := range insertP { + tr.Set(la) } b.ResetTimer() for i := 0; i < b.N; i++ { - cmd := insertP[i%count] + la := insertP[i%count] if reset { trReset.Reset() trReset = tr } tr = tr.Clone() - tr.Delete(cmd) - tr.Set(cmd) + tr.Delete(la) + tr.Set(la) } }) }) @@ -761,7 +771,7 @@ func BenchmarkBTreeIterSeekGE(b *testing.B) { for i := 0; i < count; i++ { s := span(i) spans = append(spans, s) - tr.Set(newCmd(s)) + tr.Set(newLatch(s)) } rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) @@ -770,13 +780,13 @@ func BenchmarkBTreeIterSeekGE(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { s := spans[rng.Intn(len(spans))] - it.SeekGE(newCmd(s)) + it.SeekGE(newLatch(s)) if testing.Verbose() { if !it.Valid() { b.Fatal("expected to find key") } - if !s.Equal(it.Cmd().span) { - b.Fatalf("expected %s, but found %s", s, it.Cmd().span) + if !s.Equal(it.Cur().span) { + b.Fatalf("expected %s, but found %s", s, it.Cur().span) } } } @@ -791,7 +801,7 @@ func BenchmarkBTreeIterSeekLT(b *testing.B) { for i := 0; i < count; i++ { s := span(i) spans = append(spans, s) - tr.Set(newCmd(s)) + tr.Set(newLatch(s)) } rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) @@ -801,7 +811,7 @@ func BenchmarkBTreeIterSeekLT(b *testing.B) { for i := 0; i < b.N; i++ { j := rng.Intn(len(spans)) s := spans[j] - it.SeekLT(newCmd(s)) + it.SeekLT(newLatch(s)) if testing.Verbose() { if j == 0 { if it.Valid() { @@ -812,8 +822,8 @@ func BenchmarkBTreeIterSeekLT(b *testing.B) { b.Fatal("expected to find key") } s := spans[j-1] - if !s.Equal(it.Cmd().span) { - b.Fatalf("expected %s, but found %s", s, it.Cmd().span) + if !s.Equal(it.Cur().span) { + b.Fatalf("expected %s, but found %s", s, it.Cur().span) } } } @@ -824,15 +834,15 @@ func BenchmarkBTreeIterSeekLT(b *testing.B) { func BenchmarkBTreeIterFirstOverlap(b *testing.B) { forBenchmarkSizes(b, func(b *testing.B, count int) { var spans []roachpb.Span - var cmds []*cmd + var latches []*latch var tr btree for i := 0; i < count; i++ { s := spanWithEnd(i, i+1) spans = append(spans, s) - cmd := newCmd(s) - cmds = append(cmds, cmd) - tr.Set(cmd) + la := newLatch(s) + latches = append(latches, la) + tr.Set(la) } rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) @@ -842,14 +852,14 @@ func BenchmarkBTreeIterFirstOverlap(b *testing.B) { for i := 0; i < b.N; i++ { j := rng.Intn(len(spans)) s := spans[j] - cmd := cmds[j] - it.FirstOverlap(cmd) + la := latches[j] + it.FirstOverlap(la) if testing.Verbose() { if !it.Valid() { b.Fatal("expected to find key") } - if !s.Equal(it.Cmd().span) { - b.Fatalf("expected %s, but found %s", s, it.Cmd().span) + if !s.Equal(it.Cur().span) { + b.Fatalf("expected %s, but found %s", s, it.Cur().span) } } } @@ -860,10 +870,10 @@ func BenchmarkBTreeIterNext(b *testing.B) { var tr btree const count = 8 << 10 - const size = 2 * maxCmds + const size = 2 * maxLatches for i := 0; i < count; i++ { - cmd := newCmd(spanWithEnd(i, i+size+1)) - tr.Set(cmd) + la := newLatch(spanWithEnd(i, i+size+1)) + tr.Set(la) } it := tr.MakeIter() @@ -880,10 +890,10 @@ func BenchmarkBTreeIterPrev(b *testing.B) { var tr btree const count = 8 << 10 - const size = 2 * maxCmds + const size = 2 * maxLatches for i := 0; i < count; i++ { - cmd := newCmd(spanWithEnd(i, i+size+1)) - tr.Set(cmd) + la := newLatch(spanWithEnd(i, i+size+1)) + tr.Set(la) } it := tr.MakeIter() @@ -900,13 +910,13 @@ func BenchmarkBTreeIterNextOverlap(b *testing.B) { var tr btree const count = 8 << 10 - const size = 2 * maxCmds + const size = 2 * maxLatches for i := 0; i < count; i++ { - cmd := newCmd(spanWithEnd(i, i+size+1)) - tr.Set(cmd) + la := newLatch(spanWithEnd(i, i+size+1)) + tr.Set(la) } - allCmd := newCmd(spanWithEnd(0, count+1)) + allCmd := newLatch(spanWithEnd(0, count+1)) it := tr.MakeIter() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -922,17 +932,17 @@ func BenchmarkBTreeIterOverlapScan(b *testing.B) { rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) const count = 8 << 10 - const size = 2 * maxCmds + const size = 2 * maxLatches for i := 0; i < count; i++ { - tr.Set(newCmd(spanWithEnd(i, i+size+1))) + tr.Set(newLatch(spanWithEnd(i, i+size+1))) } - cmd := new(cmd) + la := new(latch) b.ResetTimer() for i := 0; i < b.N; i++ { - cmd.span = randomSpan(rng, count) + la.span = randomSpan(rng, count) it := tr.MakeIter() - it.FirstOverlap(cmd) + it.FirstOverlap(la) for it.Valid() { it.NextOverlap() } diff --git a/pkg/storage/spanlatch/list.go b/pkg/storage/spanlatch/list.go new file mode 100644 index 000000000000..f4736a77e8d4 --- /dev/null +++ b/pkg/storage/spanlatch/list.go @@ -0,0 +1,54 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package spanlatch + +// latchList is a double-linked circular list of *latch elements. +type latchList struct { + root latch + len int +} + +func (ll *latchList) front() *latch { + if ll.len == 0 { + return nil + } + return ll.root.next +} + +func (ll *latchList) lazyInit() { + if ll.root.next == nil { + ll.root.next = &ll.root + ll.root.prev = &ll.root + } +} + +func (ll *latchList) pushBack(la *latch) { + ll.lazyInit() + at := ll.root.prev + n := at.next + at.next = la + la.prev = at + la.next = n + n.prev = la + ll.len++ +} + +func (ll *latchList) remove(la *latch) { + la.prev.next = la.next + la.next.prev = la.prev + la.next = nil // avoid memory leaks + la.prev = nil // avoid memory leaks + ll.len-- +} diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go new file mode 100644 index 000000000000..4d5d071e8e5e --- /dev/null +++ b/pkg/storage/spanlatch/manager.go @@ -0,0 +1,406 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package spanlatch + +import ( + "context" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// A Manager maintains an interval tree of key and key range latches. Latch +// acquitions affecting keys or key ranges must wait on already-acquired latches +// which overlap their key ranges to be released. +// +// Latch acquisition attempts invoke Manager.Acquire and provide details about +// the spans that they plan to touch and the timestamps they plan to touch them +// at. Acquire inserts the latch into the Manager's tree and waits on +// prerequisite latch attempts that are already tracked by the Manager. +// Manager.Acquire blocks until the latch acquisition completes, at which point +// it returns a Guard, which is scoped to the lifetime of the latch ownership. +// +// When the latches are no longer needed, they are released by invoking +// Manager.Release with the Guard returned when the latches were originally +// acquired. Doing so removes the latches from the Manager's tree and signals to +// dependent latch acquisitions that they no longer need to wait on the released +// latches. +// +// Manager is safe for concurrent use by multiple goroutines. Concurrent access +// is made efficient using a copy-on-write technique to capture immutable +// snapshots of the type's inner btree structures. Using this strategy, tasks +// requiring mutual exclusion are limited to updating the type's trees and +// grabbing snapshots. Notably, scanning for and waiting on prerequisite latches +// is performed outside of the mutual exclusion zone. This means that the work +// performed under lock is linear with respect to the number of spans that a +// latch acquisition declares but NOT linear with respect to the number of other +// latch attempts that it will wait on. +// +// Manager's zero value can be used directly. +type Manager struct { + mu syncutil.Mutex + idAlloc uint64 + scopes [spanset.NumSpanScope]scopedManager +} + +// scopedManager is a latch manager scoped to either local or global keys. +// See spanset.SpanScope. +type scopedManager struct { + readSet latchList + trees [spanset.NumSpanAccess]btree +} + +// latches are stored in the Manager's btrees. They represent the latching +// of a single key span. +type latch struct { + id uint64 + span roachpb.Span + ts hlc.Timestamp + done *signal + next, prev *latch // readSet linked-list. +} + +func (la *latch) inReadSet() bool { + return la.next != nil +} + +// Guard is a handle to a set of acquired latches. It is returned by +// Manager.Acquire and accepted by Manager.Release. +type Guard struct { + done signal + // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. + latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer + latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 +} + +func (lg *Guard) latches(s spanset.SpanScope, a spanset.SpanAccess) []latch { + len := lg.latchesLens[s][a] + if len == 0 { + return nil + } + const maxArrayLen = 1 << 31 + return (*[maxArrayLen]latch)(lg.latchesPtrs[s][a])[:len:len] +} + +func (lg *Guard) setLatches(s spanset.SpanScope, a spanset.SpanAccess, latches []latch) { + lg.latchesPtrs[s][a] = unsafe.Pointer(&latches[0]) + lg.latchesLens[s][a] = int32(len(latches)) +} + +func allocGuardAndLatches(nLatches int) (*Guard, []latch) { + // Guard would be an ideal candidate for object pooling, but without + // reference counting its latches we can't know whether they're still + // referenced by other tree snapshots. The latches hold a reference to + // the signal living on the Guard, so the guard can't be recycled while + // latches still point to it. + if nLatches <= 1 { + alloc := new(struct { + g Guard + latches [1]latch + }) + return &alloc.g, alloc.latches[:nLatches] + } else if nLatches <= 2 { + alloc := new(struct { + g Guard + latches [2]latch + }) + return &alloc.g, alloc.latches[:nLatches] + } else if nLatches <= 4 { + alloc := new(struct { + g Guard + latches [4]latch + }) + return &alloc.g, alloc.latches[:nLatches] + } else if nLatches <= 8 { + alloc := new(struct { + g Guard + latches [8]latch + }) + return &alloc.g, alloc.latches[:nLatches] + } + return new(Guard), make([]latch, nLatches) +} + +func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { + nLatches := 0 + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + nLatches += len(spans.GetSpans(a, s)) + } + } + + guard, latches := allocGuardAndLatches(nLatches) + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + n := len(ss) + if n == 0 { + continue + } + + ssLatches := latches[:n] + for i := range ssLatches { + latch := &latches[i] + latch.span = ss[i] + latch.ts = ifGlobal(ts, s) + latch.done = &guard.done + // latch.setID() in Manager.insert, under lock. + } + guard.setLatches(s, a, ssLatches) + latches = latches[n:] + } + } + if len(latches) != 0 { + panic("alloc too large") + } + return guard +} + +// Acquire acquires latches from the Manager for each of the provided spans, at +// the specified timestamp. In doing so, it waits for latches over all +// overlapping spans to be released before returning. If the provided context +// is canceled before the method is done waiting for overlapping latches to +// be released, it stops waiting and releases all latches that it has already +// acquired. +// +// It returns a Guard which must be provided to Release. +func (m *Manager) Acquire( + ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, +) (*Guard, error) { + lg, snap := m.sequence(spans, ts) + defer snap.close() + + err := m.wait(ctx, lg, ts, snap) + if err != nil { + m.Release(lg) + return nil, err + } + return lg, nil +} + +// sequence locks the manager, captures an immutable snapshot, inserts latches +// for each of the specified spans into the manager's interval trees, and +// unlocks the manager. The role of the method is to sequence latch acquisition +// attempts. +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { + lg := newGuard(spans, ts) + + m.mu.Lock() + snap := m.snapshotLocked(spans) + m.insertLocked(lg) + m.mu.Unlock() + return lg, snap +} + +// snapshot is an immutable view into the latch manager's state. +type snapshot struct { + trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree +} + +// close closes the snapshot and releases any associated resources. +func (sn *snapshot) close() { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + sn.trees[s][a].Reset() + } + } +} + +// snapshotLocked captures an immutable snapshot of the latch manager. It takes +// a spanset to limit the amount of state captured. +func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot { + var snap snapshot + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 + writing := len(spans.GetSpans(spanset.SpanReadWrite, s)) > 0 + + if writing { + sm.flushReadSetLocked() + snap.trees[s][spanset.SpanReadOnly] = sm.trees[spanset.SpanReadOnly].Clone() + } + if writing || reading { + snap.trees[s][spanset.SpanReadWrite] = sm.trees[spanset.SpanReadWrite].Clone() + } + } + return snap +} + +// flushReadSetLocked flushes the read set into the read interval tree. +func (sm *scopedManager) flushReadSetLocked() { + for sm.readSet.len > 0 { + latch := sm.readSet.front() + sm.readSet.remove(latch) + sm.trees[spanset.SpanReadOnly].Set(latch) + } +} + +// insertLocked inserts the latches owned by the provided Guard into the +// Manager. +func (m *Manager) insertLocked(lg *Guard) { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + latch.id = m.nextID() + switch a { + case spanset.SpanReadOnly: + // Add reads to the readSet. They only need to enter + // the read tree if they're flushed by a write capturing + // a snapshot. + sm.readSet.pushBack(latch) + case spanset.SpanReadWrite: + // Add writes directly to the write tree. + sm.trees[spanset.SpanReadWrite].Set(latch) + default: + panic("unknown access") + } + } + } + } +} + +func (m *Manager) nextID() uint64 { + m.idAlloc++ + return m.idAlloc +} + +// ignoreFn is used for non-interference of earlier reads with later writes. +// +// However, this is only desired for the global scope. Reads and writes to local +// keys are specified to always interfere, regardless of their timestamp. This +// is done to avoid confusion with local keys declared as part of proposer +// evaluated KV. +// +// This is also disabled in the global scope if either of the timestamps are +// empty. In those cases, we consider the latch without a timestamp to be a +// non-MVCC operation that affects all timestamps in the key range. +type ignoreFn func(ts, other hlc.Timestamp) bool + +func ignoreLater(ts, other hlc.Timestamp) bool { return !ts.IsEmpty() && ts.Less(other) } +func ignoreEarlier(ts, other hlc.Timestamp) bool { return !other.IsEmpty() && other.Less(ts) } +func ignoreNothing(ts, other hlc.Timestamp) bool { return false } + +func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { + switch s { + case spanset.SpanGlobal: + return ts + case spanset.SpanLocal: + // All local latches interfere. + return hlc.Timestamp{} + default: + panic("unknown scope") + } +} + +// wait waits for all interfering latches in the provided snapshot to complete +// before returning. +func (m *Manager) wait(ctx context.Context, lg *Guard, ts hlc.Timestamp, snap snapshot) error { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &snap.trees[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + switch a { + case spanset.SpanReadOnly: + // Wait for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreLater); err != nil { + return err + } + case spanset.SpanReadWrite: + // Wait for all other writes. + // + // It is cheaper to wait on an already released latch than + // it is an unreleased latch so we prefer waiting on longer + // latches first. We expect writes to take longer than reads + // to release their latches, so we wait on them first. + it := tr[spanset.SpanReadWrite].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreNothing); err != nil { + return err + } + // Wait for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreEarlier); err != nil { + return err + } + default: + panic("unknown access") + } + } + } + } + return nil +} + +// iterAndWait uses the provided iterator to wait on all latches that overlap +// with the search latch and which should not be ignored given their timestamp +// and the supplied ignoreFn. +func iterAndWait( + ctx context.Context, it *iterator, search *latch, ts hlc.Timestamp, ignore ignoreFn, +) error { + done := ctx.Done() + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + latch := it.Cur() + if latch.done.signaled() { + continue + } + if ignore(ts, latch.ts) { + continue + } + select { + case <-latch.done.signalChan(): + case <-done: + return ctx.Err() + } + } + return nil +} + +// Release releases the latches held by the provided Guard. After being called, +// dependent latch acquisition attempts can complete if not blocked on any other +// owned latches. +func (m *Manager) Release(lg *Guard) { + lg.done.signal() + + m.mu.Lock() + m.removeLocked(lg) + m.mu.Unlock() +} + +// removeLocked removes the latches owned by the provided Guard from the +// Manager. Must be called with mu held. +func (m *Manager) removeLocked(lg *Guard) { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + if latch.inReadSet() { + sm.readSet.remove(latch) + } else { + sm.trees[a].Delete(latch) + } + } + } + } +} diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go new file mode 100644 index 000000000000..8641e2d3e857 --- /dev/null +++ b/pkg/storage/spanlatch/manager_test.go @@ -0,0 +1,533 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package spanlatch + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +var read = false +var write = true +var zeroTS = hlc.Timestamp{} + +func spans(from, to string, write bool) *spanset.SpanSet { + var span roachpb.Span + if to == "" { + span = roachpb.Span{Key: roachpb.Key(from)} + } else { + span = roachpb.Span{Key: roachpb.Key(from), EndKey: roachpb.Key(to)} + } + if strings.HasPrefix(from, "local") { + span.Key = append(keys.LocalRangePrefix, span.Key...) + if span.EndKey != nil { + span.EndKey = append(keys.LocalRangePrefix, span.EndKey...) + } + } + var spans spanset.SpanSet + access := spanset.SpanReadOnly + if write { + access = spanset.SpanReadWrite + } + spans.Add(access, span) + return &spans +} + +func testLatchSucceeds(t *testing.T, lgC <-chan *Guard) *Guard { + t.Helper() + select { + case lg := <-lgC: + return lg + case <-time.After(15 * time.Millisecond): + t.Fatal("latch acquisition should succeed") + } + return nil +} + +func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { + t.Helper() + select { + case <-lgC: + t.Fatal("latch acquisition should block") + case <-time.After(3 * time.Millisecond): + } +} + +// MustAcquire is like Acquire, except it can't return context cancellation +// errors. +func (m *Manager) MustAcquire(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { + lg, err := m.Acquire(context.Background(), spans, ts) + if err != nil { + panic(err) + } + return lg +} + +// MustAcquireCh is like Acquire, except it only sequences the latch latch +// attempt synchronously and waits on dependent latches asynchronously. It +// returns a channel that provides the Guard when the latches are acquired (i.e. +// after waiting). If the context expires, a nil Guard will be delivered on the +// channel. +func (m *Manager) MustAcquireCh(spans *spanset.SpanSet, ts hlc.Timestamp) <-chan *Guard { + return m.MustAcquireChCtx(context.Background(), spans, ts) +} + +// MustAcquireChCtx is like MustAcquireCh, except it accepts a context. +func (m *Manager) MustAcquireChCtx( + ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, +) <-chan *Guard { + ch := make(chan *Guard) + lg, snap := m.sequence(spans, ts) + go func() { + err := m.wait(ctx, lg, ts, snap) + if err != nil { + m.Release(lg) + lg = nil + } + ch <- lg + }() + return ch +} + +func TestLatchManager(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Try latch with no overlapping already-acquired lathes. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + m.Release(lg1) + + lg2 := m.MustAcquire(spans("a", "b", write), zeroTS) + m.Release(lg2) + + // Add a latch and verify overlapping latches wait on it. + lg3 := m.MustAcquire(spans("a", "b", write), zeroTS) + lg4C := m.MustAcquireCh(spans("a", "b", write), zeroTS) + + // Second write should block. + testLatchBlocks(t, lg4C) + + // First write completes, second grabs latch. + m.Release(lg3) + testLatchSucceeds(t, lg4C) +} + +func TestLatchManagerNoWaitOnReadOnly(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire latch for read-only span. + m.MustAcquire(spans("a", "", read), zeroTS) + + // Verify no wait with another read-only span. + m.MustAcquire(spans("a", "", read), zeroTS) +} + +func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire latch for read-only span. + lg1 := m.MustAcquire(spans("a", "", read), zeroTS) + // Acquire another one on top. + lg2 := m.MustAcquire(spans("a", "", read), zeroTS) + + // A write span should have to wait for **both** reads. + lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + + // Certainly blocks now. + testLatchBlocks(t, lg3C) + + // The second read releases latch, but the first one remains. + m.Release(lg2) + + // Should still block. + testLatchBlocks(t, lg3C) + + // First read releases latch. + m.Release(lg1) + + // Now it goes through. + testLatchSucceeds(t, lg3C) +} + +func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire multiple latches. + lg1C := m.MustAcquireCh(spans("a", "", write), zeroTS) + lg2C := m.MustAcquireCh(spans("b", "c", write), zeroTS) + lg3C := m.MustAcquireCh(spans("a", "d", write), zeroTS) + + // Attempt to acquire latch which overlaps them all. + lg4C := m.MustAcquireCh(spans("0", "z", write), zeroTS) + testLatchBlocks(t, lg4C) + m.Release(<-lg1C) + testLatchBlocks(t, lg4C) + m.Release(<-lg2C) + testLatchBlocks(t, lg4C) + m.Release(<-lg3C) + testLatchSucceeds(t, lg4C) +} + +func TestLatchManagerMultipleOverlappingSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Acquire multiple latches. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + lg2 := m.MustAcquire(spans("b", "c", read), zeroTS) + lg3 := m.MustAcquire(spans("d", "f", write), zeroTS) + lg4 := m.MustAcquire(spans("g", "", write), zeroTS) + + // Attempt to acquire latches overlapping each of them. + var spans spanset.SpanSet + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a")}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("b")}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("e")}) + lg5C := m.MustAcquireCh(&spans, zeroTS) + + // Blocks until the first three prerequisite latches release. + testLatchBlocks(t, lg5C) + m.Release(lg2) + testLatchBlocks(t, lg5C) + m.Release(lg3) + testLatchBlocks(t, lg5C) + m.Release(lg1) + lg5 := testLatchSucceeds(t, lg5C) + m.Release(lg4) + m.Release(lg5) +} + +func TestLatchManagerDependentLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + + cases := []struct { + name string + sp1 *spanset.SpanSet + ts1 hlc.Timestamp + sp2 *spanset.SpanSet + ts2 hlc.Timestamp + dependent bool + }{ + { + name: "point writes, same key", + sp1: spans("a", "", write), + sp2: spans("a", "", write), + dependent: true, + }, + { + name: "point writes, different key", + sp1: spans("a", "", write), + sp2: spans("b", "", write), + dependent: false, + }, + { + name: "range writes, overlapping span", + sp1: spans("a", "c", write), + sp2: spans("b", "d", write), + dependent: true, + }, + { + name: "range writes, non-overlapping span", + sp1: spans("a", "b", write), + sp2: spans("b", "c", write), + dependent: false, + }, + { + name: "point reads, same key", + sp1: spans("a", "", read), + sp2: spans("a", "", read), + dependent: false, + }, + { + name: "point reads, different key", + sp1: spans("a", "", read), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "range reads, overlapping span", + sp1: spans("a", "c", read), + sp2: spans("b", "d", read), + dependent: false, + }, + { + name: "range reads, non-overlapping span", + sp1: spans("a", "b", read), + sp2: spans("b", "c", read), + dependent: false, + }, + { + name: "read and write, same ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "read and write, causal ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 2}, + dependent: true, + }, + { + name: "read and write, non-causal ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 2}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: false, + }, + { + name: "read and write, zero ts read", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 0}, + dependent: true, + }, + { + name: "read and write, zero ts write", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 0}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "read and write, non-overlapping", + sp1: spans("a", "b", write), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "local range writes, overlapping span", + sp1: spans("local a", "local c", write), + sp2: spans("local b", "local d", write), + dependent: true, + }, + { + name: "local range writes, non-overlapping span", + sp1: spans("local a", "local b", write), + sp2: spans("local b", "local c", write), + dependent: false, + }, + { + name: "local range reads, overlapping span", + sp1: spans("local a", "local c", read), + sp2: spans("local b", "local d", read), + dependent: false, + }, + { + name: "local range reads, non-overlapping span", + sp1: spans("local a", "local b", read), + sp2: spans("local b", "local c", read), + dependent: false, + }, + { + name: "local read and write, same ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, causal ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 2}, + dependent: true, + }, + { + name: "local read and write, non-causal ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 2}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, zero ts read", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 0}, + dependent: true, + }, + { + name: "local read and write, zero ts write", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 0}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, non-overlapping", + sp1: spans("a", "b", write), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "local read and global write, overlapping", + sp1: spans("a", "b", write), + sp2: spans("local b", "", read), + dependent: false, + }, + { + name: "local write and global read, overlapping", + sp1: spans("local a", "local b", write), + sp2: spans("b", "", read), + dependent: false, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + testutils.RunTrueAndFalse(t, "inv", func(t *testing.T, inv bool) { + c := c + if inv { + c.sp1, c.sp2 = c.sp2, c.sp1 + c.ts1, c.ts2 = c.ts2, c.ts1 + } + + var m Manager + lg1 := m.MustAcquire(c.sp1, c.ts1) + lg2C := m.MustAcquireCh(c.sp2, c.ts2) + if c.dependent { + testLatchBlocks(t, lg2C) + m.Release(lg1) + lg2 := testLatchSucceeds(t, lg2C) + m.Release(lg2) + } else { + lg2 := testLatchSucceeds(t, lg2C) + m.Release(lg1) + m.Release(lg2) + } + }) + }) + } +} + +func TestLatchManagerContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + // Attempt to acquire three latches that all block on each other. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + // The second one is given a cancelable context. + ctx2, cancel2 := context.WithCancel(context.Background()) + lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write), zeroTS) + lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + + // The second and third latch attempt block on the first. + testLatchBlocks(t, lg2C) + testLatchBlocks(t, lg3C) + + // Cancel the second acquisition's context. It should stop waiting. + cancel2() + require.Nil(t, <-lg2C) + + // The third latch attempt still blocks. + testLatchBlocks(t, lg3C) + + // Release the first latch. The third succeeds in acquiring the latch. + m.Release(lg1) + testLatchSucceeds(t, lg3C) +} + +func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { + for _, size := range []int{1, 4, 16, 64, 128, 256} { + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + var m Manager + ss := spans("a", "b", read) + for i := 0; i < size; i++ { + _ = m.MustAcquire(ss, zeroTS) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = m.MustAcquire(ss, zeroTS) + } + }) + } +} + +func BenchmarkLatchManagerReadWriteMix(b *testing.B) { + for _, readsPerWrite := range []int{0, 1, 4, 16, 64, 128, 256} { + b.Run(fmt.Sprintf("readsPerWrite=%d", readsPerWrite), func(b *testing.B) { + var m Manager + lgBuf := make(chan *Guard, 16) + + spans := make([]spanset.SpanSet, b.N) + for i := range spans { + a, b := randBytes(100), randBytes(100) + // Overwrite first byte so that we do not mix local and global ranges + a[0], b[0] = 'a', 'a' + if bytes.Compare(a, b) > 0 { + a, b = b, a + } + span := roachpb.Span{ + Key: roachpb.Key(a), + EndKey: roachpb.Key(b), + } + access := spanset.SpanReadOnly + if i%(readsPerWrite+1) == 0 { + access = spanset.SpanReadWrite + } + spans[i].Add(access, span) + } + + b.ResetTimer() + for i := range spans { + lg, snap := m.sequence(&spans[i], zeroTS) + snap.close() + if len(lgBuf) == cap(lgBuf) { + m.Release(<-lgBuf) + } + lgBuf <- lg + } + }) + } +} + +func randBytes(n int) []byte { + b := make([]byte, n) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + return b +} diff --git a/pkg/storage/cmdq/signal.go b/pkg/storage/spanlatch/signal.go similarity index 99% rename from pkg/storage/cmdq/signal.go rename to pkg/storage/spanlatch/signal.go index e71a58948554..340483b0797f 100644 --- a/pkg/storage/cmdq/signal.go +++ b/pkg/storage/spanlatch/signal.go @@ -12,7 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. -package cmdq +package spanlatch import ( "sync/atomic" diff --git a/pkg/storage/cmdq/signal_test.go b/pkg/storage/spanlatch/signal_test.go similarity index 99% rename from pkg/storage/cmdq/signal_test.go rename to pkg/storage/spanlatch/signal_test.go index 58755fabd841..1713aea8fa60 100644 --- a/pkg/storage/cmdq/signal_test.go +++ b/pkg/storage/spanlatch/signal_test.go @@ -12,7 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. -package cmdq +package spanlatch import ( "sync" diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 0b11c4e0f8c3..225cfcc89c44 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -292,6 +292,7 @@ func TestLint(t *testing.T) { `\bsync\.(RW)?Mutex`, "--", "*.go", + ":!*/doc.go", ":!util/syncutil/mutex_sync.go", ":!util/syncutil/mutex_sync_race.go", )