From 91c2060d6a17af857f9051e0914c55d7f3a9ad96 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Dec 2018 16:45:15 -0500 Subject: [PATCH] storage: evaluate limited scans optimistically without latching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9521. Supersedes #31904. SQL has a tendency to create scans which cover a range's entire key span, though looking only to return a finite number of results. These requests end up blocking on writes that are holding latches over keys that the scan will not actually touch. In reality, when there is a scan with a limit, the actual affected key span ends up being a small subset of the range's key span. This change creates a new "optimistic evaluation" path for read-only requests. When evaluating optimistically, the batch will sequence itself with the latch manager, but will not wait to acquire all of its latches. Instead, it begins evaluating immediately and verifies that it would not have needed to wait on any latch acquisitions after-the-fact. When performing this verification, it uses knowledge about the limited set of keys that the scan actually looked at. If there are no conflicts, the scan succeeds. If there are conflicts, the request waits for all of its latch acquisition attempts to finish and re-evaluates. This PR replaces #31904. The major difference between the two is that this PR exploits the structure of the latch manager to efficiently perform optimistic latch acquisition and after-the-fact verification of conflicts. Doing this requires keeping no extra state because it uses the immutable snapshots that the latch manager now captures during sequencing. The other major difference is that this PR does not release latches after a failed optimistic evaluation. NOTE: a prevalent theory of the pathological case with this behavior was that overestimated read latches would serialize with write latches, causing all requests on a range to serialize. I wasn't seeing this in practice. It turns out that the "timestamp awareness" in the latch manager should avoid this behavior in most cases because later writes will have higher timestamps than earlier reads. The effect of this is that they won't be considered to interfere by the latch manager. Still, large clusters with a high amount of clock skew could see a bounded variant of this situation. _### Benchmark Results ``` name old ops/sec new ops/sec delta kv95/cores=16/nodes=3/splits=3 51.9k ± 0% 51.7k ± 1% ~ (p=0.400 n=3+3) kvS70-L1/cores=16/nodes=3/splits=3 24.1k ± 4% 27.7k ± 1% +14.75% (p=0.100 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 24.5k ± 1% 27.5k ± 1% +12.08% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 16.0k ± 1% 16.6k ± 2% +3.79% (p=0.100 n=3+3) name old p50(ms) new p50(ms) delta kv95/cores=16/nodes=3/splits=3 0.70 ± 0% 0.70 ± 0% ~ (all equal) kvS70-L1/cores=16/nodes=3/splits=3 1.07 ± 6% 0.90 ± 0% -15.62% (p=0.100 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 1.10 ± 0% 0.90 ± 0% -18.18% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 1.80 ± 0% 1.67 ± 4% -7.41% (p=0.100 n=3+3) name old p99(ms) new p99(ms) delta kv95/cores=16/nodes=3/splits=3 1.80 ± 0% 1.80 ± 0% ~ (all equal) kvS70-L1/cores=16/nodes=3/splits=3 5.77 ±32% 4.70 ± 0% ~ (p=0.400 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 5.00 ± 0% 4.70 ± 0% -6.00% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 6.90 ± 3% 7.33 ± 8% ~ (p=0.400 n=3+3) ``` _S = --span-percent=, L = --span-limit=_ Release note (performance improvement): improved performance on workloads which mix OLAP queries with inserts and updates. --- pkg/storage/batcheval/command.go | 2 +- pkg/storage/replica.go | 257 ++++++++++++++++++++------ pkg/storage/replica_test.go | 116 ++++++++++++ pkg/storage/spanlatch/manager.go | 94 ++++++++-- pkg/storage/spanlatch/manager_test.go | 20 +- 5 files changed, 419 insertions(+), 70 deletions(-) diff --git a/pkg/storage/batcheval/command.go b/pkg/storage/batcheval/command.go index 43f8b01b0dc0..61c430af0380 100644 --- a/pkg/storage/batcheval/command.go +++ b/pkg/storage/batcheval/command.go @@ -26,7 +26,7 @@ import ( // A Command is the implementation of a single request within a BatchRequest. type Command struct { - // DeclareKeys adds all keys this command touches to the given spanSet. + // DeclareKeys adds all keys this command touches to the given SpanSet. DeclareKeys func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet) // Eval evaluates a command on the given engine. It should populate diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index e18eca34c6b4..10ebcf979191 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2205,9 +2205,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard + sn *spanlatch.Snapshot ba roachpb.BatchRequest } +// optimistic returns whether the batch should execute optimistically. +func (ec *endCmds) optimistic() bool { return ec.sn != nil } + +// checkOptimisticConflicts determines whether any earlier requests that held +// latches at the time that this request optimistically evaluated conflict with +// the span of keys that this request interacted with. On conflict, the method +// waits to successfully acquire all latches. After that, the request needs to +// be retried because the optimistic evaluation may or may not have observed the +// effect of the earlier requests. +// +// To call, ec.optimistic must be true. After the method is called, ec.optimistic +// will return false. +func (ec *endCmds) checkOptimisticConflicts( + ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet, +) (bool, error) { + // We only use the latch manager snapshot once. + defer func() { + ec.sn.Close() + ec.sn = nil + }() + + // If the optimistic evaluation didn't run into an error, create a copy of + // the batch with the scan bounds of all requests constrained to the spans + // that they actually scanned over. If it did, check for conflicts over the + // entire original span set. + if pErr == nil { + baCopy := ec.ba + baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...) + for i := 0; i < len(baCopy.Requests); i++ { + req := baCopy.Requests[i].GetInner() + header := req.Header() + + resp := br.Responses[i].GetInner() + if resp.Header().ResumeSpan == nil { + continue + } + + switch t := resp.(type) { + case *roachpb.ScanResponse: + if header.Key.Equal(t.ResumeSpan.Key) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.EndKey = t.ResumeSpan.Key + case *roachpb.ReverseScanResponse: + if header.EndKey.Equal(t.ResumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.Key = t.ResumeSpan.EndKey + default: + continue + } + req = req.ShallowCopy() + req.SetHeader(header) + baCopy.Requests[i].MustSetInner(req) + } + + // Collect the batch's declared spans again, this time with the + // constrained span bounds. + var err error + spans, _, err = ec.repl.collectSpans(&baCopy) + if err != nil { + return false, err + } + } + + // Determine whether any actively held latch at the time of evaluation + // overlaps this constraint span set. If so, we have a conflict. + conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn) + if !conflict { + return false, nil + } + + // If there were conflicts, wait for latches to be released, like + // we should have in the first place. + log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating") + return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn) +} + // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalRetryReason) { @@ -2224,6 +2309,12 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry pr if ec.lg != nil { ec.repl.latchMgr.Release(ec.lg) } + + // Close the snapshot to release any resources that it holds. + if ec.sn != nil { + ec.sn.Close() + ec.sn = nil + } } // updateTimestampCache updates the timestamp cache in order to set a low water @@ -2339,8 +2430,22 @@ func (r *Replica) updateTimestampCache( } } -func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) { - spans := &spanset.SpanSet{} +// collectSpans collects all of the spans that the batch may touch into a +// SpanSet. +// +// It also determines whether the batch should evaluate optimistically based on +// the key limits on the batch header. When a batch evaluates optimistically, it +// doesn't wait to acquire all of its latches. Instead, if begins evaluating +// immediately and verifies that it would not have needed to wait on any latch +// acquisitions after-the-fact. This is useful for requests that need to declare +// a much larger set of keys than they expect to touch in practice (e.g. limited +// scans). +func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) { + r.mu.RLock() + desc := r.descRLocked() + liveCount := r.mu.state.Stats.LiveCount + r.mu.RUnlock() + // TODO(bdarnell): need to make this less global when local // latches are used more heavily. For example, a split will // have a large read-only span but also a write (see #10084). @@ -2351,19 +2456,19 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // // TODO(bdarnell): revisit as the local portion gets its appropriate // use. + spans := &spanset.SpanSet{} if ba.IsReadOnly() { spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } else { spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, len(ba.Requests)) } - desc := r.Desc() for _, union := range ba.Requests { inner := union.GetInner() if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(*desc, ba.Header, inner, spans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, false, errors.Errorf("unrecognized command %s", inner.Method()) } } @@ -2374,9 +2479,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // If any command gave us spans that are invalid, bail out early // (before passing them to the spanlatch manager, which may panic). if err := spans.Validate(); err != nil { - return nil, err + return nil, false, err } - return spans, nil + + // Evaluate batches optimistically if they have a key limit which is less + // than the number of live keys on the Range. Ignoring write latches can be + // massively beneficial because it can help avoid waiting on writes to keys + // that the batch will never actually need to read due to the overestimate + // of its key bounds. Only after it is clear exactly what spans were read do + // we verify whether there were any conflicts with concurrent writes. + // + // This case is not uncommon; for example, a Scan which requests the entire + // range but has a limit of 1 result. We want to avoid allowing overly broad + // spans from backing up the latch manager. + limit := ba.Header.MaxSpanRequestKeys + optimistic := limit > 0 && limit < liveCount + + return spans, optimistic, nil } // beginCmds waits for any in-flight, conflicting commands to complete. This @@ -2389,11 +2508,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // returns a cleanup function to be called when the commands are done and can be // removed from the queue, and whose returned error is to be used in place of // the supplied error. +// +// See collectSpans for a discussion on optimistic evaluation. func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool, ) (*endCmds, error) { + ec := endCmds{ + repl: r, + ba: *ba, + } + // Only acquire latches for consistent operations. - var lg *spanlatch.Guard if ba.ReadConsistency == roachpb.CONSISTENT { // Check for context cancellation before acquiring latches. if err := ctx.Err(); err != nil { @@ -2409,10 +2534,16 @@ func (r *Replica) beginCmds( // Acquire latches for all the request's declared spans to ensure // protected access and to avoid interacting requests from operating at // the same time. The latches will be held for the duration of request. - var err error - lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) - if err != nil { - return nil, err + if optimistic { + // Optimistic acquisition does not wait for existing latches to be + // released. + ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp) + } else { + var err error + ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) + if err != nil { + return nil, err + } } if !beforeLatch.IsZero() { @@ -2422,7 +2553,7 @@ func (r *Replica) beginCmds( if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(*ba); pErr != nil { - r.latchMgr.Release(lg) + ec.done(nil, pErr, proposalNoRetry) return nil, pErr.GoError() } } @@ -2471,8 +2602,9 @@ func (r *Replica) beginCmds( // The store will catch that error and resubmit the request after // mergeCompleteCh closes. See #27442 for the full context. log.Event(ctx, "waiting on in-progress merge") - r.latchMgr.Release(lg) - return nil, &roachpb.MergeInProgressError{} + err := &roachpb.MergeInProgressError{} + ec.done(nil, roachpb.NewError(err), proposalNoRetry) + return nil, err } } else { log.Event(ctx, "operation accepts inconsistent results") @@ -2495,12 +2627,7 @@ func (r *Replica) beginCmds( } } - ec := &endCmds{ - repl: r, - lg: lg, - ba: *ba, - } - return ec, nil + return &ec, nil } // applyTimestampCache moves the batch timestamp forward depending on @@ -2990,7 +3117,7 @@ func (r *Replica) executeReadOnlyBatch( } r.limitTxnMaxTimestamp(ctx, &ba, status) - spans, err := r.collectSpans(&ba) + spans, optimistic, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -2998,15 +3125,11 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic) if err != nil { return nil, roachpb.NewError(err) } - log.Event(ctx, "waiting for read lock") - r.readOnlyCmdMu.RLock() - defer r.readOnlyCmdMu.RUnlock() - // Guarantee we release the latches that we just acquired. It is // important that this is inside the readOnlyCmdMu lock so that the // timestamp cache update is synchronized. This is wrapped to delay @@ -3015,31 +3138,27 @@ func (r *Replica) executeReadOnlyBatch( endCmds.done(br, pErr, proposalNoRetry) }() - // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? - if _, err := r.IsDestroyed(); err != nil { - return nil, roachpb.NewError(err) - } - - rSpan, err := keys.Range(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - - if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { - return nil, roachpb.NewError(err) - } - - // Evaluate read-only batch command. It checks for matching key range; note - // that holding readOnlyCmdMu throughout is important to avoid reads from the - // "wrong" key range being served after the range has been split. + // Loop to support optimistic evaluation. We'll only evalute up to twice. var result result.Result - rec := NewReplicaEvalContext(r, spans) - readOnly := r.store.Engine().NewReadOnly() - if util.RaceEnabled { - readOnly = spanset.NewReadWriter(readOnly, spans) + for { + br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans) + if !endCmds.optimistic() { + // If this was not an optimistic evaluation, break. + break + } + + // If this was an optimistic evaluation, determine whether the keys + // that it touched conflict with in-flight requests that were ignored. + if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil { + pErr = roachpb.NewError(err) + } else if conflict { + // If they do, retry the evaluation. This time, we will not be + // evaluating optimistically because checkOptimisticConflicts + // waited for all latch acquisitions to succeed. + continue + } + break } - defer readOnly.Close() - br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba) if result.Local.DetachMaybeWatchForMerge() { if err := r.maybeWatchForMerge(ctx); err != nil { @@ -3070,6 +3189,40 @@ func (r *Replica) executeReadOnlyBatch( return br, pErr } +// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its +// results. It expects that latches have already been acquired for the spans +// that it will touch. +func (r *Replica) evaluateReadOnlyBatch( + ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet, +) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + log.Event(ctx, "waiting for read lock") + r.readOnlyCmdMu.RLock() + defer r.readOnlyCmdMu.RUnlock() + + // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + if _, err := r.IsDestroyed(); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + rSpan, err := keys.Range(ba) + if err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + + // Evaluate read-only batch command. It checks for matching key range; note + // that holding readOnlyCmdMu throughout is important to avoid reads from the + // "wrong" key range being served after the range has been split. + rec := NewReplicaEvalContext(r, spans) + readOnly := r.store.Engine().NewReadOnly() + defer readOnly.Close() + if util.RaceEnabled { + readOnly = spanset.NewReadWriter(readOnly, spans) + } + return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba) +} + // executeWriteBatch is the entry point for client requests which may mutate the // range's replicated state. Requests taking this path are ultimately // serialized through Raft, but pass through additional machinery whose goal is @@ -3167,7 +3320,7 @@ func (r *Replica) tryExecuteWriteBatch( return nil, roachpb.NewError(err), proposalNoRetry } - spans, err := r.collectSpans(&ba) + spans, _, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } @@ -3180,7 +3333,7 @@ func (r *Replica) tryExecuteWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, &ba, spans, false /* optimistic */) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index be61ad2d5868..e12afa1210b1 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2598,6 +2598,122 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans evaluate +// optimistically without waiting for latches to be acquired. In some cases, +// this allows them to avoid waiting on writes that their over-estimated +// declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(&sArgs1, &sArgs2) + + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs storagebase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.StartWithStoreConfig(t, stopper, tsc) + + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + blockWriter.Store(false) + + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded // based on the read timestamp cache. func TestReplicaUseTSCache(t *testing.T) { diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index b43a2f82114f..2f6f9e79d3e0 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -201,14 +201,14 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // be released, it stops waiting and releases all latches that it has already // acquired. // -// It returns a Guard which must be provided to Release. +// The method returns a Guard which must be provided to Release. func (m *Manager) Acquire( ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, ) (*Guard, error) { lg, snap := m.sequence(spans, ts) - defer snap.close() + defer snap.Close() - err := m.wait(ctx, lg, &snap) + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) return nil, err @@ -216,11 +216,28 @@ func (m *Manager) Acquire( return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it optimistically +// assumes that there are no currently held latches that need to be waited on. +// This can be verified after the fact by passing the returned Snapshot to +// Overlaps. +// +// Regardless of whether existing latches are ignored or not by this method, +// future calls to Acquire will observe the latches acquired here and will wait +// for them to be Released, as usual. +// +// The method returns a Guard which must be provided to Release. It also returns +// a Snapshot which must be Closed when no longer in use. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, *Snapshot) { + lg, snap := m.sequence(spans, ts) + return lg, &snap +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, Snapshot) { lg := newGuard(spans, ts) m.mu.Lock() @@ -230,13 +247,14 @@ func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, sn return lg, snap } -// snapshot is an immutable view into the latch manager's state. -type snapshot struct { +// Snapshot is an immutable view into the latch manager's state. +type Snapshot struct { trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree } -// close closes the snapshot and releases any associated resources. -func (sn *snapshot) close() { +// Close closes the Snapshot and releases any associated resources. The method +// can be called multiple times. +func (sn *Snapshot) Close() { for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { sn.trees[s][a].Reset() @@ -246,8 +264,8 @@ func (sn *snapshot) close() { // snapshotLocked captures an immutable snapshot of the latch manager. It takes // a spanset to limit the amount of state captured. -func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot { - var snap snapshot +func (m *Manager) snapshotLocked(spans *spanset.SpanSet) Snapshot { + var snap Snapshot for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { sm := &m.scopes[s] reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 @@ -333,9 +351,9 @@ func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { } } -// wait waits for all interfering latches in the provided snapshot to complete +// Wait waits for all interfering latches in the provided snapshot to complete // before returning. -func (m *Manager) wait(ctx context.Context, lg *Guard, snap *snapshot) error { +func (m *Manager) Wait(ctx context.Context, lg *Guard, snap *Snapshot) error { timer := timeutil.NewTimer() timer.Reset(base.SlowRequestThreshold) defer timer.Stop() @@ -426,6 +444,58 @@ func (m *Manager) waitForSignal(ctx context.Context, t *timeutil.Timer, wait, he } } +// Overlaps determines if any of the spans in the provided spanset, at the +// specified timestamp, overlap with any of the latches in the snapshot. +func (m *Manager) Overlaps(spans *spanset.SpanSet, ts hlc.Timestamp, sn *Snapshot) bool { + // NB: the function has a similar structure to Manager.wait, but any attempt + // at code deduplication causes allocations and obscures the code. Avoiding + // these allocations would require further obfuscation. + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &sn.trees[s] + search.ts = ifGlobal(ts, s) + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return true + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return true + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return true + } + default: + panic("unknown access") + } + } + } + } + return false +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + held := it.Cur() + if ignore(search.ts, held.ts) { + continue + } + return true + } + return false +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index 29c16a1a8a11..86fb5e05b495 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -104,7 +104,8 @@ func (m *Manager) MustAcquireChCtx( ch := make(chan *Guard) lg, snap := m.sequence(spans, ts) go func() { - err := m.wait(ctx, lg, &snap) + defer snap.Close() + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) lg = nil @@ -424,14 +425,23 @@ func TestLatchManagerDependentLatches(t *testing.T) { var m Manager lg1 := m.MustAcquire(c.sp1, c.ts1) - lg2C := m.MustAcquireCh(c.sp2, c.ts2) + lg2, snap2 := m.sequence(c.sp2, c.ts2) + lg2C := make(chan *Guard) + defer snap2.Close() + go func() { + err := m.Wait(context.Background(), lg2, &snap2) + require.Nil(t, err) + lg2C <- lg2 + }() + + require.Equal(t, c.dependent, m.Overlaps(c.sp2, c.ts2, &snap2)) if c.dependent { testLatchBlocks(t, lg2C) m.Release(lg1) - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg2) } else { - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg1) m.Release(lg2) } @@ -512,7 +522,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { lg, snap := m.sequence(&spans[i], zeroTS) - snap.close() + snap.Close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) }