From 681ae972dccb211e1619fa3446054c4ad113a532 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 21 Nov 2018 18:07:40 -0500 Subject: [PATCH] storage/spanlatch: create spanlatch.Manager using immutable btrees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Informs #4768. Informs #31904. This change was inspired by #31904 and is a progression of the thinking started in #4768 (comment). The change introduces `spanlatch.Manager`, which will replace the `CommandQueue` **in a future PR**. The new type isn't hooked up yet because doing so will require a lot of plumbing changes in that storage package that are best kept in a separate PR. The structure uses a new strategy that reduces lock contention, simplifies the code, avoids allocations, and makes #31904 easier to implement. The primary objective, reducing lock contention, is addressed by minimizing the amount of work we perform under the exclusive "sequencing" mutex while locking the structure. This is made possible by employing a copy-on-write strategy. Before this change, commands would lock the queue, create a large slice of prerequisites, insert into the queue and unlock. After the change, commands lock the manager, grab an immutable snapshot of the manager's trees in O(1) time, insert into the manager, and unlock. They can then iterate over the immutable tree snapshot outside of the lock. Effectively, this means that the work performed under lock is linear with respect to the number of spans that a command declares but NO LONGER linear with respect to the number of other commands that it will wait on. This is important because `Replica.beginCmds` repeatedly comes up as the largest source of mutex contention in our system, especially on hot ranges. The use of immutable snapshots also simplifies the code significantly. We're no longer copying our prereqs into a slice so we no longer need to carefully determine which transitive dependencies we do or don't need to wait on explicitly. This also makes lock cancellation trivial because we no longer explicitly hold on to our prereqs at all. Instead, we simply iterate through the snapshot outside of the lock. While rewriting the structure, I also spent some time optimizing its allocations. Under normal operation, acquiring a latch now incurs only a single allocation - that being for the `spanlatch.Guard`. All other allocations are avoided through object pooling where appropriate. The overhead of using a copy-on-write technique is almost entirely avoided by atomically reference counting btree nodes, which allows us to release them back into the btree node pools when they're no longer references by any btree snapshots. This means that we don't expect any allocations when inserting into the internal trees, even with the COW policy. Finally, this will make the approach taken in #31904 much more natural. Instead of tracking dependents and prerequisites for speculative reads and then iterating through them to find overlaps after, we can use the immutable snapshots directly! We can grab a snapshot and sequence ourselves as usual, but avoid waiting for prereqs. We then execute optimistically before finally checking whether we overlapped any of our prereqs. The great thing about this is that we already have the prereqs in an interval tree structure, so we get an efficient validation check for free. _### Naming changes | Before | After | |----------------------------|-----------------------------------| | `CommandQueue` | `spanlatch.Manager` | | "enter the command queue" | "acquire span latches" | | "exit the command queue" | "release span latches" | | "wait for prereq commands" | "wait for latches to be released" | The use of the word "latch" is based on the definition of latches presented by Goetz Graefe in https://15721.courses.cs.cmu.edu/spring2016/papers/a16-graefe.pdf (see https://i.stack.imgur.com/fSRzd.png). An important reason for avoiding the word "lock" here is that it is critical for understanding that we don't confuse the operational locking performed by the CommandQueue/spanlatch.Manager with the transaction-scoped locking enforced by intents and our transactional concurrency control model. _### Microbenchmarks NOTE: these are single-threaded benchmarks that don't benefit at all from the concurrency improvements enabled by this new structure. ``` name cmdq time/op spanlatch time/op delta ReadOnlyMix/size=1-4 897ns ±21% 917ns ±18% ~ (p=0.897 n=8+10) ReadOnlyMix/size=4-4 827ns ±22% 772ns ±15% ~ (p=0.448 n=10+10) ReadOnlyMix/size=16-4 905ns ±19% 770ns ±10% -14.90% (p=0.004 n=10+10) ReadOnlyMix/size=64-4 907ns ±20% 730ns ±15% -19.51% (p=0.001 n=10+10) ReadOnlyMix/size=128-4 926ns ±17% 731ns ±11% -21.04% (p=0.000 n=9+10) ReadOnlyMix/size=256-4 977ns ±19% 726ns ± 9% -25.65% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=0-4 12.5µs ± 4% 0.7µs ±17% -94.70% (p=0.000 n=8+9) ReadWriteMix/readsPerWrite=1-4 8.18µs ± 5% 0.63µs ± 6% -92.24% (p=0.000 n=10+9) ReadWriteMix/readsPerWrite=4-4 3.80µs ± 2% 0.66µs ± 5% -82.58% (p=0.000 n=8+10) ReadWriteMix/readsPerWrite=16-4 1.82µs ± 2% 0.70µs ± 5% -61.43% (p=0.000 n=9+10) ReadWriteMix/readsPerWrite=64-4 894ns ±12% 514ns ± 6% -42.48% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=128-4 717ns ± 5% 472ns ± 1% -34.21% (p=0.000 n=10+8) ReadWriteMix/readsPerWrite=256-4 607ns ± 5% 453ns ± 3% -25.35% (p=0.000 n=7+10) name cmdq alloc/op spanlatch alloc/op delta ReadOnlyMix/size=1-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadOnlyMix/size=4-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadOnlyMix/size=16-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadOnlyMix/size=64-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadOnlyMix/size=128-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadOnlyMix/size=256-4 223B ± 0% 191B ± 0% -14.35% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=0-4 915B ± 0% 144B ± 0% -84.26% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=1-4 730B ± 0% 144B ± 0% -80.29% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=4-4 486B ± 0% 144B ± 0% -70.35% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=16-4 350B ± 0% 144B ± 0% -58.86% (p=0.000 n=9+10) ReadWriteMix/readsPerWrite=64-4 222B ± 0% 144B ± 0% -35.14% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=128-4 199B ± 0% 144B ± 0% -27.64% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=256-4 188B ± 0% 144B ± 0% -23.40% (p=0.000 n=10+10) name cmdq allocs/op spanlatch allocs/op delta ReadOnlyMix/size=1-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadOnlyMix/size=4-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadOnlyMix/size=16-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadOnlyMix/size=64-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadOnlyMix/size=128-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadOnlyMix/size=256-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadWriteMix/readsPerWrite=0-4 34.0 ± 0% 1.0 ± 0% -97.06% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=1-4 22.0 ± 0% 1.0 ± 0% -95.45% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=4-4 10.0 ± 0% 1.0 ± 0% -90.00% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=16-4 4.00 ± 0% 1.00 ± 0% -75.00% (p=0.000 n=10+10) ReadWriteMix/readsPerWrite=64-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadWriteMix/readsPerWrite=128-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ReadWriteMix/readsPerWrite=256-4 1.00 ± 0% 1.00 ± 0% ~ (all equal) ``` Release note: None --- pkg/storage/command_queue_test.go | 65 ++- pkg/storage/spanlatch/doc.go | 42 ++ pkg/storage/spanlatch/interval_btree.go | 10 +- pkg/storage/spanlatch/interval_btree_test.go | 6 +- pkg/storage/spanlatch/manager.go | 439 +++++++++++++++ pkg/storage/spanlatch/manager_test.go | 533 +++++++++++++++++++ pkg/testutils/lint/lint_test.go | 1 + 7 files changed, 1059 insertions(+), 37 deletions(-) create mode 100644 pkg/storage/spanlatch/doc.go create mode 100644 pkg/storage/spanlatch/manager.go create mode 100644 pkg/storage/spanlatch/manager_test.go diff --git a/pkg/storage/command_queue_test.go b/pkg/storage/command_queue_test.go index 77e8fb1cf4a5..0ff1e1d2208a 100644 --- a/pkg/storage/command_queue_test.go +++ b/pkg/storage/command_queue_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) var zeroTS = hlc.Timestamp{} @@ -805,13 +806,14 @@ func assertExpectedPrereqs( } } -func BenchmarkCommandQueueGetPrereqsAllReadOnly(b *testing.B) { +func BenchmarkCommandQueueReadOnlyMix(b *testing.B) { // Test read-only getPrereqs performance for various number of command queue // entries. See #13627 where a previous implementation of // CommandQueue.getOverlaps had O(n) performance in this setup. Since reads // do not wait on other reads, expected performance is O(1). for _, size := range []int{1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + var mu syncutil.Mutex cq := NewCommandQueue(true) spans := []roachpb.Span{{ Key: roachpb.Key("aaaaaaaaaa"), @@ -823,7 +825,10 @@ func BenchmarkCommandQueueGetPrereqsAllReadOnly(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _ = cq.getPrereqs(true, zeroTS, spans) + mu.Lock() + prereqs := cq.getPrereqs(true, zeroTS, spans) + cq.add(true, zeroTS, prereqs, spans) + mu.Unlock() } }) } @@ -833,32 +838,40 @@ func BenchmarkCommandQueueReadWriteMix(b *testing.B) { // Test performance with a mixture of reads and writes with a high number // of reads per write. // See #15544. - for _, readsPerWrite := range []int{1, 4, 16, 64, 128, 256} { + for _, readsPerWrite := range []int{0, 1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("readsPerWrite=%d", readsPerWrite), func(b *testing.B) { - for i := 0; i < b.N; i++ { - totalCmds := 1 << 10 - liveCmdQueue := make(chan *cmd, 16) - cq := NewCommandQueue(true /* coveringOptimization */) - for j := 0; j < totalCmds; j++ { - a, b := randBytes(100), randBytes(100) - // Overwrite first byte so that we do not mix local and global ranges - a[0], b[0] = 'a', 'a' - if bytes.Compare(a, b) > 0 { - a, b = b, a - } - spans := []roachpb.Span{{ - Key: roachpb.Key(a), - EndKey: roachpb.Key(b), - }} - var cmd *cmd - readOnly := j%(readsPerWrite+1) != 0 - prereqs := cq.getPrereqs(readOnly, zeroTS, spans) - cmd = cq.add(readOnly, zeroTS, prereqs, spans) - if len(liveCmdQueue) == cap(liveCmdQueue) { - cq.remove(<-liveCmdQueue) - } - liveCmdQueue <- cmd + var mu syncutil.Mutex + cq := NewCommandQueue(true /* coveringOptimization */) + liveCmdQueue := make(chan *cmd, 16) + + spans := make([][]roachpb.Span, b.N) + for i := range spans { + a, b := randBytes(100), randBytes(100) + // Overwrite first byte so that we do not mix local and global ranges + a[0], b[0] = 'a', 'a' + if bytes.Compare(a, b) > 0 { + a, b = b, a + } + spans[i] = []roachpb.Span{{ + Key: roachpb.Key(a), + EndKey: roachpb.Key(b), + }} + } + + b.ResetTimer() + for i := range spans { + mu.Lock() + readOnly := i%(readsPerWrite+1) != 0 + prereqs := cq.getPrereqs(readOnly, zeroTS, spans[i]) + cmd := cq.add(readOnly, zeroTS, prereqs, spans[i]) + mu.Unlock() + + if len(liveCmdQueue) == cap(liveCmdQueue) { + mu.Lock() + cq.remove(<-liveCmdQueue) + mu.Unlock() } + liveCmdQueue <- cmd } }) } diff --git a/pkg/storage/spanlatch/doc.go b/pkg/storage/spanlatch/doc.go new file mode 100644 index 000000000000..2100fb42e1cb --- /dev/null +++ b/pkg/storage/spanlatch/doc.go @@ -0,0 +1,42 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +/* +Package spanlatch provides a latch management structure for serializing access +to keys and key ranges. Latch acquitions affecting keys or key ranges must wait +on already-acquired latches which overlap their key range to be released. + +Manager's complexity can best be understood as a series of incremental changes, +each in the name of increased lock granularity to reduce contention and enable +more concurrency between requests. The structure can trace its lineage back to a +simple sync.Mutex. From there, the structure evolved through the following +progression: + + * The structure began by enforcing strict mutual exclusion for access to any + keys. Conceptually, it was a sync.Mutex. + * Concurrent read-only access to keys and key ranges was permitted. Read and + writes were serialized with each other, writes were serialized with each other, + but no ordering was enforced between reads. Conceptually, the structure became + a sync.RWMutex. + * The structure became key range-aware and concurrent access to non-overlapping + key ranges was permitted. Conceptually, the structure became an interval + tree of sync.RWMutexes. + * The structure become timestamp-aware and concurrent access of non-causal + read and write pairs was permitted. The effect of this was that reads no + longer waited for writes at higher timestamps and writes no longer waited + for reads at lower timestamps. Conceptually, the structure became an interval + tree of timestamp-aware sync.RWMutexes. + +*/ +package spanlatch diff --git a/pkg/storage/spanlatch/interval_btree.go b/pkg/storage/spanlatch/interval_btree.go index 714f6237697b..f5da9f5e0eb2 100644 --- a/pkg/storage/spanlatch/interval_btree.go +++ b/pkg/storage/spanlatch/interval_btree.go @@ -31,12 +31,6 @@ const ( minLatches = degree - 1 ) -// TODO(nvanbenschoten): remove. -type latch struct { - id int64 - span roachpb.Span -} - // cmp returns a value indicating the sort order relationship between // a and b. The comparison is performed lexicographically on // (a.span.Key, a.span.EndKey, a.id) @@ -59,9 +53,9 @@ func cmp(a, b *latch) int { if c != 0 { return c } - if a.id < b.id { + if a.id() < b.id() { return -1 - } else if a.id > b.id { + } else if a.id() > b.id() { return 1 } else { return 0 diff --git a/pkg/storage/spanlatch/interval_btree_test.go b/pkg/storage/spanlatch/interval_btree_test.go index c7e2b55292f8..7a7b17e99b69 100644 --- a/pkg/storage/spanlatch/interval_btree_test.go +++ b/pkg/storage/spanlatch/interval_btree_test.go @@ -530,7 +530,7 @@ func TestBTreeCloneConcurrentOperations(t *testing.T) { func TestBTreeCmp(t *testing.T) { testCases := []struct { spanA, spanB roachpb.Span - idA, idB int64 + idA, idB uint64 exp int }{ { @@ -593,8 +593,8 @@ func TestBTreeCmp(t *testing.T) { for _, tc := range testCases { name := fmt.Sprintf("cmp(%s:%d,%s:%d)", tc.spanA, tc.idA, tc.spanB, tc.idB) t.Run(name, func(t *testing.T) { - laA := &latch{id: tc.idA, span: tc.spanA} - laB := &latch{id: tc.idB, span: tc.spanB} + laA := &latch{meta: tc.idA, span: tc.spanA} + laB := &latch{meta: tc.idB, span: tc.spanB} require.Equal(t, tc.exp, cmp(laA, laB)) }) } diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go new file mode 100644 index 000000000000..113a66a31feb --- /dev/null +++ b/pkg/storage/spanlatch/manager.go @@ -0,0 +1,439 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package spanlatch + +import ( + "context" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// A Manager maintains an interval tree of key and key range latches. Latch +// acquitions affecting keys or key ranges must wait on already-acquired latches +// which overlap their key ranges to be released. +// +// Latch acquisition attempts invoke Manager.Acquire and provide details about +// the spans that they plan to touch and the timestamps they plan to touch them +// at. Acquire inserts the latch into the Manager's tree and waits on +// prerequisite latch attempts that are already tracked by the Manager. +// Manager.Acquire blocks until the latch acquisition completes, at which point +// it returns a Guard, which is scoped to the lifetime of the latch ownership. +// +// When the latches are no longer needed, they are released by invoking +// Manager.Release with the Guard returned when the latches were originally +// acquired. Doing so removes the latches from the Manager's tree and signals to +// dependent latch acquisitions that they no longer need to wait on the released +// latches. +// +// Manager is safe for concurrent use by multiple goroutines. Concurrent access +// is made efficient using a copy-on-write technique to capture immutable +// snapshots of the type's inner btree structures. Using this strategy, tasks +// requiring mutual exclusion are limited to updating the type's trees and +// grabbing snapshots. Notably, scanning for and waiting on prerequisite latches +// is performed outside of the mutual exclusion zone. This means that the work +// performed under lock is linear with respect to the number of spans that a +// latch acquisition declares but NOT linear with respect to the number of other +// latch attempts that it will wait on. +type Manager struct { + mu syncutil.Mutex + idAlloc uint64 + scopes [spanset.NumSpanScope]scopedManager +} + +// scopedManager is a latch manager scoped to either local or global keys. +// See spanset.SpanScope. +type scopedManager struct { + rSet map[*latch]struct{} + trees [spanset.NumSpanAccess]btree +} + +// New creates a new Manager. +func New() *Manager { + m := new(Manager) + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + m.scopes[s] = scopedManager{ + rSet: make(map[*latch]struct{}), + } + } + return m +} + +// latches are stored in the Manager's btrees. The represent the latching of a +// single key span. +type latch struct { + meta uint64 // high bit: inRSet; lower 63 bits: id + span roachpb.Span + ts hlc.Timestamp + done *signal +} + +func (la *latch) inRSet() bool { + return la.meta>>63 == 1 +} + +func (la *latch) setInRSet(b bool) { + if b { + la.meta |= (1 << 63) + } else { + la.meta &^= (1 << 63) + } +} + +func (la *latch) id() uint64 { + return la.meta &^ (1 << 63) +} + +func (la *latch) setID(id uint64) { + la.meta = id &^ (1 << 63) +} + +// Guard is a handle to a set of acquired latches. It is returned by +// Manager.Acquire and accepted by Manager.Release. +type Guard struct { + done signal + // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. + latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer + latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 +} + +func (lg *Guard) latches(s spanset.SpanScope, a spanset.SpanAccess) []latch { + len := lg.latchesLens[s][a] + if len == 0 { + return nil + } + const maxArrayLen = 1 << 31 + return (*[maxArrayLen]latch)(lg.latchesPtrs[s][a])[:len:len] +} + +func (lg *Guard) setLatches(s spanset.SpanScope, a spanset.SpanAccess, latches []latch) { + lg.latchesPtrs[s][a] = unsafe.Pointer(&latches[0]) + lg.latchesLens[s][a] = int32(len(latches)) +} + +func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { + nLatches := 0 + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + nLatches += len(spans.GetSpans(a, s)) + } + } + + // Guard would be an ideal candidate for object pooling, but without + // reference counting its latches we can't know whether they're still + // referenced by other tree snapshots. The latches hold a reference to + // the signal living on the Guard, so the guard can't be recycled while + // latches still point to it. + var guard *Guard + var latches []latch + if nLatches <= 1 { + alloc := new(struct { + g Guard + latches [1]latch + }) + guard = &alloc.g + latches = alloc.latches[:nLatches] + } else if nLatches <= 2 { + alloc := new(struct { + g Guard + latches [2]latch + }) + guard = &alloc.g + latches = alloc.latches[:nLatches] + } else if nLatches <= 4 { + alloc := new(struct { + g Guard + latches [4]latch + }) + guard = &alloc.g + latches = alloc.latches[:nLatches] + } else if nLatches <= 8 { + alloc := new(struct { + g Guard + latches [8]latch + }) + guard = &alloc.g + latches = alloc.latches[:nLatches] + } else { + guard = new(Guard) + latches = make([]latch, nLatches) + } + + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + n := len(ss) + if n == 0 { + continue + } + + ssLatches := latches[:n] + for i := range ssLatches { + latch := &latches[i] + latch.span = ss[i] + latch.ts = ifGlobal(ts, s) + latch.done = &guard.done + // latch.setID() in Manager.insert, under lock. + } + guard.setLatches(s, a, ssLatches) + latches = latches[n:] + } + } + if len(latches) != 0 { + panic("alloc too large") + } + return guard +} + +// Acquire acquires latches from the Manager for each of the provided spans, at +// the specified timestamp. In doing so, it waits for latches over all +// overlapping spans to be released before returning. If the provided context +// is canceled before the method is done waiting for overlapping latches to +// be released, it stops waiting and releases all latches that it has already +// acquired. +// +// It returns a Guard which must be provided to Release. +func (m *Manager) Acquire( + ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, +) (*Guard, error) { + lg, snap := m.sequence(spans, ts) + defer snap.close() + + err := m.wait(ctx, lg, ts, snap) + if err != nil { + m.Release(lg) + return nil, err + } + return lg, nil +} + +// sequence locks the manager, captures an immutable snapshot, inserts latches +// for each of the specified spans into the manager's interval trees, and +// unlocks the manager. The role of the method is to sequence latch acquisition +// attempts. +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { + lg := newGuard(spans, ts) + + m.mu.Lock() + snap := m.snapshot(spans) + m.insert(lg) + m.mu.Unlock() + return lg, snap +} + +// snapshot is an immutable view into the latch manager's state. +type snapshot struct { + trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree +} + +// close closes the snapshot and releases any associated resources. +func (sn *snapshot) close() { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + sn.trees[s][a].Reset() + } + } +} + +// snapshot captures an immutable snapshot of the latch manager. It takes a +// spanset to limit the amount of state captured. Must be called with mu held. +func (m *Manager) snapshot(spans *spanset.SpanSet) snapshot { + var snap snapshot + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 + writing := len(spans.GetSpans(spanset.SpanReadWrite, s)) > 0 + + if writing { + if len(sm.rSet) > 0 { + sm.flushReadSet() + } + snap.trees[s][spanset.SpanReadOnly] = sm.trees[spanset.SpanReadOnly].Clone() + } + if writing || reading { + snap.trees[s][spanset.SpanReadWrite] = sm.trees[spanset.SpanReadWrite].Clone() + } + } + return snap +} + +// flushReadSet flushes the read set into the read interval tree. +func (sm *scopedManager) flushReadSet() { + // TODO(nvanbenschoten): never re-alloc in go1.11. + realloc := len(sm.rSet) > 16 + for latch := range sm.rSet { + latch.setInRSet(false) + sm.trees[spanset.SpanReadOnly].Set(latch) + if !realloc { + delete(sm.rSet, latch) + } + } + if realloc { + sm.rSet = make(map[*latch]struct{}) + } +} + +// insert inserts the latches owned by the provided Guard into the Manager. +// Must be called with mu held. +func (m *Manager) insert(lg *Guard) { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + latch.setID(m.nextID()) + switch a { + case spanset.SpanReadOnly: + // Add reads to the rSet. They only need to enter the read + // tree if they're flushed by a write capturing a snapshot. + latch.setInRSet(true) + sm.rSet[latch] = struct{}{} + case spanset.SpanReadWrite: + // Add writes directly to the write tree. + sm.trees[spanset.SpanReadWrite].Set(latch) + default: + panic("unknown access") + } + } + } + } +} + +func (m *Manager) nextID() uint64 { + m.idAlloc++ + return m.idAlloc +} + +// ignoreFn is used for non-interference of earlier reads with later writes. +// +// However, this is only desired for the global scope. Reads and writes to local +// keys are specified to always interfere, regardless of their timestamp. This +// is done to avoid confusion with local keys declared as part of proposer +// evaluated KV. +// +// This is also disabled in the global scope if either of the timestamps are +// empty. In those cases, we consider the latch without a timestamp to be a +// non-MVCC operation that affects all timestamps in the key range. +type ignoreFn func(ts, other hlc.Timestamp) bool + +func ignoreLater(ts, other hlc.Timestamp) bool { return !ts.IsEmpty() && ts.Less(other) } +func ignoreEarlier(ts, other hlc.Timestamp) bool { return !other.IsEmpty() && other.Less(ts) } +func ignoreNothing(ts, other hlc.Timestamp) bool { return false } + +func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { + switch s { + case spanset.SpanGlobal: + return ts + case spanset.SpanLocal: + // All local latches interfere. + return hlc.Timestamp{} + default: + panic("unknown scope") + } +} + +// wait waits for all interfering latches in the provided snapshot to complete +// before returning. +func (m *Manager) wait(ctx context.Context, lg *Guard, ts hlc.Timestamp, snap snapshot) error { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &snap.trees[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + switch a { + case spanset.SpanReadOnly: + // Wait for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreLater); err != nil { + return err + } + case spanset.SpanReadWrite: + // Wait for reads at equal or higher timestamps. + it := tr[spanset.SpanReadOnly].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreEarlier); err != nil { + return err + } + // Wait for all other writes. + it = tr[spanset.SpanReadWrite].MakeIter() + if err := iterAndWait(ctx, &it, latch, ts, ignoreNothing); err != nil { + return err + } + default: + panic("unknown access") + } + } + } + } + return nil +} + +// iterAndWait uses the provided iterator to wait on all latches that overlap +// with the search latch and which should not be ignored given their timestamp +// and the supplied ignoreFn. +func iterAndWait( + ctx context.Context, it *iterator, search *latch, ts hlc.Timestamp, ignore ignoreFn, +) error { + done := ctx.Done() + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + latch := it.Cur() + if latch.done.signaled() { + continue + } + if ignore(ts, latch.ts) { + continue + } + select { + case <-latch.done.signalChan(): + case <-done: + return ctx.Err() + } + } + return nil +} + +// Release releases the latches held by the provided Guard. After being called, +// dependent latch acquisition attempts can complete if not blocked on any other +// owned latches. +func (m *Manager) Release(lg *Guard) { + lg.done.signal() + + m.mu.Lock() + m.remove(lg) + m.mu.Unlock() +} + +// remove removes the latches owned by the provided Guard from the Manager. +// Must be called with mu held. +func (m *Manager) remove(lg *Guard) { + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + sm := &m.scopes[s] + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + latches := lg.latches(s, a) + for i := range latches { + latch := &latches[i] + if latch.inRSet() { + delete(sm.rSet, latch) + } else { + sm.trees[a].Delete(latch) + } + } + } + } +} diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go new file mode 100644 index 000000000000..031a269b2a71 --- /dev/null +++ b/pkg/storage/spanlatch/manager_test.go @@ -0,0 +1,533 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package spanlatch + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +var read = false +var write = true +var zeroTS = hlc.Timestamp{} + +func spans(from, to string, write bool) *spanset.SpanSet { + var span roachpb.Span + if to == "" { + span = roachpb.Span{Key: roachpb.Key(from)} + } else { + span = roachpb.Span{Key: roachpb.Key(from), EndKey: roachpb.Key(to)} + } + if strings.HasPrefix(from, "local") { + span.Key = append(keys.LocalRangePrefix, span.Key...) + if span.EndKey != nil { + span.EndKey = append(keys.LocalRangePrefix, span.EndKey...) + } + } + var spans spanset.SpanSet + access := spanset.SpanReadOnly + if write { + access = spanset.SpanReadWrite + } + spans.Add(access, span) + return &spans +} + +func testLatchSucceeds(t *testing.T, lgC <-chan *Guard) *Guard { + t.Helper() + select { + case lg := <-lgC: + return lg + case <-time.After(15 * time.Millisecond): + t.Fatal("latch acquisition should succeed") + } + return nil +} + +func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { + t.Helper() + select { + case <-lgC: + t.Fatal("latch acquisition should block") + case <-time.After(3 * time.Millisecond): + } +} + +// MustAcquire is like Acquire, except it can't return context cancellation +// errors. +func (m *Manager) MustAcquire(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { + lg, err := m.Acquire(context.Background(), spans, ts) + if err != nil { + panic(err) + } + return lg +} + +// MustAcquireCh is like Acquire, except it only sequences the latch latch +// attempt synchronously and waits on dependent latches asynchronously. It +// returns a channel that provides the Guard when the latches are acquired (i.e. +// after waiting). If the context expires, a nil Guard will be delivered on the +// channel. +func (m *Manager) MustAcquireCh(spans *spanset.SpanSet, ts hlc.Timestamp) <-chan *Guard { + return m.MustAcquireChCtx(context.Background(), spans, ts) +} + +// MustAcquireChCtx is like MustAcquireCh, except it accepts a context. +func (m *Manager) MustAcquireChCtx( + ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, +) <-chan *Guard { + ch := make(chan *Guard) + lg, snap := m.sequence(spans, ts) + go func() { + err := m.wait(ctx, lg, ts, snap) + if err != nil { + m.Release(lg) + lg = nil + } + ch <- lg + }() + return ch +} + +func TestLatchManager(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Try latch with no overlapping already-acquired lathes. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + m.Release(lg1) + + lg2 := m.MustAcquire(spans("a", "b", write), zeroTS) + m.Release(lg2) + + // Add a latch and verify overlapping latches wait on it. + lg3 := m.MustAcquire(spans("a", "b", write), zeroTS) + lg4C := m.MustAcquireCh(spans("a", "b", write), zeroTS) + + // Second write should block. + testLatchBlocks(t, lg4C) + + // First write completes, second grabs latch. + m.Release(lg3) + testLatchSucceeds(t, lg4C) +} + +func TestLatchManagerNoWaitOnReadOnly(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Acquire latch for read-only span. + m.MustAcquire(spans("a", "", read), zeroTS) + + // Verify no wait with another read-only span. + m.MustAcquire(spans("a", "", read), zeroTS) +} + +func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Acquire latch for read-only span. + lg1 := m.MustAcquire(spans("a", "", read), zeroTS) + // Acquire another one on top. + lg2 := m.MustAcquire(spans("a", "", read), zeroTS) + + // A write span should have to wait for **both** reads. + lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + + // Certainly blocks now. + testLatchBlocks(t, lg3C) + + // The second read releases latch, but the first one remains. + m.Release(lg2) + + // Should still block. + testLatchBlocks(t, lg3C) + + // First read releases latch. + m.Release(lg1) + + // Now it goes through. + testLatchSucceeds(t, lg3C) +} + +func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Acquire multiple latches. + lg1C := m.MustAcquireCh(spans("a", "", write), zeroTS) + lg2C := m.MustAcquireCh(spans("b", "c", write), zeroTS) + lg3C := m.MustAcquireCh(spans("a", "d", write), zeroTS) + + // Attempt to acquire latch which overlaps them all. + lg4C := m.MustAcquireCh(spans("0", "z", write), zeroTS) + testLatchBlocks(t, lg4C) + m.Release(<-lg1C) + testLatchBlocks(t, lg4C) + m.Release(<-lg2C) + testLatchBlocks(t, lg4C) + m.Release(<-lg3C) + testLatchSucceeds(t, lg4C) +} + +func TestLatchManagerMultipleOverlappingSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Acquire multiple latches. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + lg2 := m.MustAcquire(spans("b", "c", read), zeroTS) + lg3 := m.MustAcquire(spans("d", "f", write), zeroTS) + lg4 := m.MustAcquire(spans("g", "", write), zeroTS) + + // Attempt to acquire latches overlapping each of them. + var spans spanset.SpanSet + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a")}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("b")}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("e")}) + lg5C := m.MustAcquireCh(&spans, zeroTS) + + // Blocks until the first three prerequisite latches release. + testLatchBlocks(t, lg5C) + m.Release(lg2) + testLatchBlocks(t, lg5C) + m.Release(lg3) + testLatchBlocks(t, lg5C) + m.Release(lg1) + lg5 := testLatchSucceeds(t, lg5C) + m.Release(lg4) + m.Release(lg5) +} + +func TestLatchManagerDependentLatches(t *testing.T) { + defer leaktest.AfterTest(t)() + + cases := []struct { + name string + sp1 *spanset.SpanSet + ts1 hlc.Timestamp + sp2 *spanset.SpanSet + ts2 hlc.Timestamp + dependent bool + }{ + { + name: "point writes, same key", + sp1: spans("a", "", write), + sp2: spans("a", "", write), + dependent: true, + }, + { + name: "point writes, different key", + sp1: spans("a", "", write), + sp2: spans("b", "", write), + dependent: false, + }, + { + name: "range writes, overlapping span", + sp1: spans("a", "c", write), + sp2: spans("b", "d", write), + dependent: true, + }, + { + name: "range writes, non-overlapping span", + sp1: spans("a", "b", write), + sp2: spans("b", "c", write), + dependent: false, + }, + { + name: "point reads, same key", + sp1: spans("a", "", read), + sp2: spans("a", "", read), + dependent: false, + }, + { + name: "point reads, different key", + sp1: spans("a", "", read), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "range reads, overlapping span", + sp1: spans("a", "c", read), + sp2: spans("b", "d", read), + dependent: false, + }, + { + name: "range reads, non-overlapping span", + sp1: spans("a", "b", read), + sp2: spans("b", "c", read), + dependent: false, + }, + { + name: "read and write, same ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "read and write, causal ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 2}, + dependent: true, + }, + { + name: "read and write, non-causal ts", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 2}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: false, + }, + { + name: "read and write, zero ts read", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 0}, + dependent: true, + }, + { + name: "read and write, zero ts write", + sp1: spans("a", "", write), + ts1: hlc.Timestamp{WallTime: 0}, + sp2: spans("a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "read and write, non-overlapping", + sp1: spans("a", "b", write), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "local range writes, overlapping span", + sp1: spans("local a", "local c", write), + sp2: spans("local b", "local d", write), + dependent: true, + }, + { + name: "local range writes, non-overlapping span", + sp1: spans("local a", "local b", write), + sp2: spans("local b", "local c", write), + dependent: false, + }, + { + name: "local range reads, overlapping span", + sp1: spans("local a", "local c", read), + sp2: spans("local b", "local d", read), + dependent: false, + }, + { + name: "local range reads, non-overlapping span", + sp1: spans("local a", "local b", read), + sp2: spans("local b", "local c", read), + dependent: false, + }, + { + name: "local read and write, same ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, causal ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 2}, + dependent: true, + }, + { + name: "local read and write, non-causal ts", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 2}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, zero ts read", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 1}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 0}, + dependent: true, + }, + { + name: "local read and write, zero ts write", + sp1: spans("local a", "", write), + ts1: hlc.Timestamp{WallTime: 0}, + sp2: spans("local a", "", read), + ts2: hlc.Timestamp{WallTime: 1}, + dependent: true, + }, + { + name: "local read and write, non-overlapping", + sp1: spans("a", "b", write), + sp2: spans("b", "", read), + dependent: false, + }, + { + name: "local read and global write, overlapping", + sp1: spans("a", "b", write), + sp2: spans("local b", "", read), + dependent: false, + }, + { + name: "local write and global read, overlapping", + sp1: spans("local a", "local b", write), + sp2: spans("b", "", read), + dependent: false, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + testutils.RunTrueAndFalse(t, "inv", func(t *testing.T, inv bool) { + c := c + if inv { + c.sp1, c.sp2 = c.sp2, c.sp1 + c.ts1, c.ts2 = c.ts2, c.ts1 + } + + m := New() + lg1 := m.MustAcquire(c.sp1, c.ts1) + lg2C := m.MustAcquireCh(c.sp2, c.ts2) + if c.dependent { + testLatchBlocks(t, lg2C) + m.Release(lg1) + lg2 := testLatchSucceeds(t, lg2C) + m.Release(lg2) + } else { + lg2 := testLatchSucceeds(t, lg2C) + m.Release(lg1) + m.Release(lg2) + } + }) + }) + } +} + +func TestLatchManagerContextCancellation(t *testing.T) { + defer leaktest.AfterTest(t)() + m := New() + + // Attempt to acquire three latches that all block on each other. + lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + // The second one is given a cancelable context. + ctx2, cancel2 := context.WithCancel(context.Background()) + lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write), zeroTS) + lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + + // The second and third latch attempt block on the first. + testLatchBlocks(t, lg2C) + testLatchBlocks(t, lg3C) + + // Cancel the second acquisition's context. It should stop waiting. + cancel2() + require.Nil(t, <-lg2C) + + // The third latch attempt still blocks. + testLatchBlocks(t, lg3C) + + // Release the first latch. The third succeeds in acquiring the latch. + m.Release(lg1) + testLatchSucceeds(t, lg3C) +} + +func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { + for _, size := range []int{1, 4, 16, 64, 128, 256} { + b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { + m := New() + ss := spans("a", "b", read) + for i := 0; i < size; i++ { + _ = m.MustAcquire(ss, zeroTS) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = m.MustAcquire(ss, zeroTS) + } + }) + } +} + +func BenchmarkLatchManagerReadWriteMix(b *testing.B) { + for _, readsPerWrite := range []int{0, 1, 4, 16, 64, 128, 256} { + b.Run(fmt.Sprintf("readsPerWrite=%d", readsPerWrite), func(b *testing.B) { + m := New() + lgBuf := make(chan *Guard, 16) + + spans := make([]spanset.SpanSet, b.N) + for i := range spans { + a, b := randBytes(100), randBytes(100) + // Overwrite first byte so that we do not mix local and global ranges + a[0], b[0] = 'a', 'a' + if bytes.Compare(a, b) > 0 { + a, b = b, a + } + span := roachpb.Span{ + Key: roachpb.Key(a), + EndKey: roachpb.Key(b), + } + access := spanset.SpanReadOnly + if i%(readsPerWrite+1) == 0 { + access = spanset.SpanReadWrite + } + spans[i].Add(access, span) + } + + b.ResetTimer() + for i := range spans { + lg, snap := m.sequence(&spans[i], zeroTS) + snap.close() + if len(lgBuf) == cap(lgBuf) { + m.Release(<-lgBuf) + } + lgBuf <- lg + } + }) + } +} + +func randBytes(n int) []byte { + b := make([]byte, n) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + return b +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 0b11c4e0f8c3..225cfcc89c44 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -292,6 +292,7 @@ func TestLint(t *testing.T) { `\bsync\.(RW)?Mutex`, "--", "*.go", + ":!*/doc.go", ":!util/syncutil/mutex_sync.go", ":!util/syncutil/mutex_sync_race.go", )