diff --git a/pkg/storage/batcheval/command.go b/pkg/storage/batcheval/command.go index 43f8b01b0dc0..61c430af0380 100644 --- a/pkg/storage/batcheval/command.go +++ b/pkg/storage/batcheval/command.go @@ -26,7 +26,7 @@ import ( // A Command is the implementation of a single request within a BatchRequest. type Command struct { - // DeclareKeys adds all keys this command touches to the given spanSet. + // DeclareKeys adds all keys this command touches to the given SpanSet. DeclareKeys func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet) // Eval evaluates a command on the given engine. It should populate diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index e18eca34c6b4..10ebcf979191 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2205,9 +2205,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard + sn *spanlatch.Snapshot ba roachpb.BatchRequest } +// optimistic returns whether the batch should execute optimistically. +func (ec *endCmds) optimistic() bool { return ec.sn != nil } + +// checkOptimisticConflicts determines whether any earlier requests that held +// latches at the time that this request optimistically evaluated conflict with +// the span of keys that this request interacted with. On conflict, the method +// waits to successfully acquire all latches. After that, the request needs to +// be retried because the optimistic evaluation may or may not have observed the +// effect of the earlier requests. +// +// To call, ec.optimistic must be true. After the method is called, ec.optimistic +// will return false. +func (ec *endCmds) checkOptimisticConflicts( + ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet, +) (bool, error) { + // We only use the latch manager snapshot once. + defer func() { + ec.sn.Close() + ec.sn = nil + }() + + // If the optimistic evaluation didn't run into an error, create a copy of + // the batch with the scan bounds of all requests constrained to the spans + // that they actually scanned over. If it did, check for conflicts over the + // entire original span set. + if pErr == nil { + baCopy := ec.ba + baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...) + for i := 0; i < len(baCopy.Requests); i++ { + req := baCopy.Requests[i].GetInner() + header := req.Header() + + resp := br.Responses[i].GetInner() + if resp.Header().ResumeSpan == nil { + continue + } + + switch t := resp.(type) { + case *roachpb.ScanResponse: + if header.Key.Equal(t.ResumeSpan.Key) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.EndKey = t.ResumeSpan.Key + case *roachpb.ReverseScanResponse: + if header.EndKey.Equal(t.ResumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.Key = t.ResumeSpan.EndKey + default: + continue + } + req = req.ShallowCopy() + req.SetHeader(header) + baCopy.Requests[i].MustSetInner(req) + } + + // Collect the batch's declared spans again, this time with the + // constrained span bounds. + var err error + spans, _, err = ec.repl.collectSpans(&baCopy) + if err != nil { + return false, err + } + } + + // Determine whether any actively held latch at the time of evaluation + // overlaps this constraint span set. If so, we have a conflict. + conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn) + if !conflict { + return false, nil + } + + // If there were conflicts, wait for latches to be released, like + // we should have in the first place. + log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating") + return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn) +} + // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalRetryReason) { @@ -2224,6 +2309,12 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry pr if ec.lg != nil { ec.repl.latchMgr.Release(ec.lg) } + + // Close the snapshot to release any resources that it holds. + if ec.sn != nil { + ec.sn.Close() + ec.sn = nil + } } // updateTimestampCache updates the timestamp cache in order to set a low water @@ -2339,8 +2430,22 @@ func (r *Replica) updateTimestampCache( } } -func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) { - spans := &spanset.SpanSet{} +// collectSpans collects all of the spans that the batch may touch into a +// SpanSet. +// +// It also determines whether the batch should evaluate optimistically based on +// the key limits on the batch header. When a batch evaluates optimistically, it +// doesn't wait to acquire all of its latches. Instead, if begins evaluating +// immediately and verifies that it would not have needed to wait on any latch +// acquisitions after-the-fact. This is useful for requests that need to declare +// a much larger set of keys than they expect to touch in practice (e.g. limited +// scans). +func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) { + r.mu.RLock() + desc := r.descRLocked() + liveCount := r.mu.state.Stats.LiveCount + r.mu.RUnlock() + // TODO(bdarnell): need to make this less global when local // latches are used more heavily. For example, a split will // have a large read-only span but also a write (see #10084). @@ -2351,19 +2456,19 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // // TODO(bdarnell): revisit as the local portion gets its appropriate // use. + spans := &spanset.SpanSet{} if ba.IsReadOnly() { spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } else { spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, len(ba.Requests)) } - desc := r.Desc() for _, union := range ba.Requests { inner := union.GetInner() if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(*desc, ba.Header, inner, spans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, false, errors.Errorf("unrecognized command %s", inner.Method()) } } @@ -2374,9 +2479,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // If any command gave us spans that are invalid, bail out early // (before passing them to the spanlatch manager, which may panic). if err := spans.Validate(); err != nil { - return nil, err + return nil, false, err } - return spans, nil + + // Evaluate batches optimistically if they have a key limit which is less + // than the number of live keys on the Range. Ignoring write latches can be + // massively beneficial because it can help avoid waiting on writes to keys + // that the batch will never actually need to read due to the overestimate + // of its key bounds. Only after it is clear exactly what spans were read do + // we verify whether there were any conflicts with concurrent writes. + // + // This case is not uncommon; for example, a Scan which requests the entire + // range but has a limit of 1 result. We want to avoid allowing overly broad + // spans from backing up the latch manager. + limit := ba.Header.MaxSpanRequestKeys + optimistic := limit > 0 && limit < liveCount + + return spans, optimistic, nil } // beginCmds waits for any in-flight, conflicting commands to complete. This @@ -2389,11 +2508,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // returns a cleanup function to be called when the commands are done and can be // removed from the queue, and whose returned error is to be used in place of // the supplied error. +// +// See collectSpans for a discussion on optimistic evaluation. func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool, ) (*endCmds, error) { + ec := endCmds{ + repl: r, + ba: *ba, + } + // Only acquire latches for consistent operations. - var lg *spanlatch.Guard if ba.ReadConsistency == roachpb.CONSISTENT { // Check for context cancellation before acquiring latches. if err := ctx.Err(); err != nil { @@ -2409,10 +2534,16 @@ func (r *Replica) beginCmds( // Acquire latches for all the request's declared spans to ensure // protected access and to avoid interacting requests from operating at // the same time. The latches will be held for the duration of request. - var err error - lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) - if err != nil { - return nil, err + if optimistic { + // Optimistic acquisition does not wait for existing latches to be + // released. + ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp) + } else { + var err error + ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) + if err != nil { + return nil, err + } } if !beforeLatch.IsZero() { @@ -2422,7 +2553,7 @@ func (r *Replica) beginCmds( if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(*ba); pErr != nil { - r.latchMgr.Release(lg) + ec.done(nil, pErr, proposalNoRetry) return nil, pErr.GoError() } } @@ -2471,8 +2602,9 @@ func (r *Replica) beginCmds( // The store will catch that error and resubmit the request after // mergeCompleteCh closes. See #27442 for the full context. log.Event(ctx, "waiting on in-progress merge") - r.latchMgr.Release(lg) - return nil, &roachpb.MergeInProgressError{} + err := &roachpb.MergeInProgressError{} + ec.done(nil, roachpb.NewError(err), proposalNoRetry) + return nil, err } } else { log.Event(ctx, "operation accepts inconsistent results") @@ -2495,12 +2627,7 @@ func (r *Replica) beginCmds( } } - ec := &endCmds{ - repl: r, - lg: lg, - ba: *ba, - } - return ec, nil + return &ec, nil } // applyTimestampCache moves the batch timestamp forward depending on @@ -2990,7 +3117,7 @@ func (r *Replica) executeReadOnlyBatch( } r.limitTxnMaxTimestamp(ctx, &ba, status) - spans, err := r.collectSpans(&ba) + spans, optimistic, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -2998,15 +3125,11 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic) if err != nil { return nil, roachpb.NewError(err) } - log.Event(ctx, "waiting for read lock") - r.readOnlyCmdMu.RLock() - defer r.readOnlyCmdMu.RUnlock() - // Guarantee we release the latches that we just acquired. It is // important that this is inside the readOnlyCmdMu lock so that the // timestamp cache update is synchronized. This is wrapped to delay @@ -3015,31 +3138,27 @@ func (r *Replica) executeReadOnlyBatch( endCmds.done(br, pErr, proposalNoRetry) }() - // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? - if _, err := r.IsDestroyed(); err != nil { - return nil, roachpb.NewError(err) - } - - rSpan, err := keys.Range(ba) - if err != nil { - return nil, roachpb.NewError(err) - } - - if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { - return nil, roachpb.NewError(err) - } - - // Evaluate read-only batch command. It checks for matching key range; note - // that holding readOnlyCmdMu throughout is important to avoid reads from the - // "wrong" key range being served after the range has been split. + // Loop to support optimistic evaluation. We'll only evalute up to twice. var result result.Result - rec := NewReplicaEvalContext(r, spans) - readOnly := r.store.Engine().NewReadOnly() - if util.RaceEnabled { - readOnly = spanset.NewReadWriter(readOnly, spans) + for { + br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans) + if !endCmds.optimistic() { + // If this was not an optimistic evaluation, break. + break + } + + // If this was an optimistic evaluation, determine whether the keys + // that it touched conflict with in-flight requests that were ignored. + if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil { + pErr = roachpb.NewError(err) + } else if conflict { + // If they do, retry the evaluation. This time, we will not be + // evaluating optimistically because checkOptimisticConflicts + // waited for all latch acquisitions to succeed. + continue + } + break } - defer readOnly.Close() - br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba) if result.Local.DetachMaybeWatchForMerge() { if err := r.maybeWatchForMerge(ctx); err != nil { @@ -3070,6 +3189,40 @@ func (r *Replica) executeReadOnlyBatch( return br, pErr } +// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its +// results. It expects that latches have already been acquired for the spans +// that it will touch. +func (r *Replica) evaluateReadOnlyBatch( + ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet, +) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + log.Event(ctx, "waiting for read lock") + r.readOnlyCmdMu.RLock() + defer r.readOnlyCmdMu.RUnlock() + + // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + if _, err := r.IsDestroyed(); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + rSpan, err := keys.Range(ba) + if err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + + // Evaluate read-only batch command. It checks for matching key range; note + // that holding readOnlyCmdMu throughout is important to avoid reads from the + // "wrong" key range being served after the range has been split. + rec := NewReplicaEvalContext(r, spans) + readOnly := r.store.Engine().NewReadOnly() + defer readOnly.Close() + if util.RaceEnabled { + readOnly = spanset.NewReadWriter(readOnly, spans) + } + return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba) +} + // executeWriteBatch is the entry point for client requests which may mutate the // range's replicated state. Requests taking this path are ultimately // serialized through Raft, but pass through additional machinery whose goal is @@ -3167,7 +3320,7 @@ func (r *Replica) tryExecuteWriteBatch( return nil, roachpb.NewError(err), proposalNoRetry } - spans, err := r.collectSpans(&ba) + spans, _, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } @@ -3180,7 +3333,7 @@ func (r *Replica) tryExecuteWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, &ba, spans, false /* optimistic */) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index be61ad2d5868..e12afa1210b1 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2598,6 +2598,122 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans evaluate +// optimistically without waiting for latches to be acquired. In some cases, +// this allows them to avoid waiting on writes that their over-estimated +// declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(&sArgs1, &sArgs2) + + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs storagebase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.StartWithStoreConfig(t, stopper, tsc) + + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + blockWriter.Store(false) + + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded // based on the read timestamp cache. func TestReplicaUseTSCache(t *testing.T) { diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index b43a2f82114f..2f6f9e79d3e0 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -201,14 +201,14 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // be released, it stops waiting and releases all latches that it has already // acquired. // -// It returns a Guard which must be provided to Release. +// The method returns a Guard which must be provided to Release. func (m *Manager) Acquire( ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, ) (*Guard, error) { lg, snap := m.sequence(spans, ts) - defer snap.close() + defer snap.Close() - err := m.wait(ctx, lg, &snap) + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) return nil, err @@ -216,11 +216,28 @@ func (m *Manager) Acquire( return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it optimistically +// assumes that there are no currently held latches that need to be waited on. +// This can be verified after the fact by passing the returned Snapshot to +// Overlaps. +// +// Regardless of whether existing latches are ignored or not by this method, +// future calls to Acquire will observe the latches acquired here and will wait +// for them to be Released, as usual. +// +// The method returns a Guard which must be provided to Release. It also returns +// a Snapshot which must be Closed when no longer in use. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, *Snapshot) { + lg, snap := m.sequence(spans, ts) + return lg, &snap +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, Snapshot) { lg := newGuard(spans, ts) m.mu.Lock() @@ -230,13 +247,14 @@ func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, sn return lg, snap } -// snapshot is an immutable view into the latch manager's state. -type snapshot struct { +// Snapshot is an immutable view into the latch manager's state. +type Snapshot struct { trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree } -// close closes the snapshot and releases any associated resources. -func (sn *snapshot) close() { +// Close closes the Snapshot and releases any associated resources. The method +// can be called multiple times. +func (sn *Snapshot) Close() { for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { sn.trees[s][a].Reset() @@ -246,8 +264,8 @@ func (sn *snapshot) close() { // snapshotLocked captures an immutable snapshot of the latch manager. It takes // a spanset to limit the amount of state captured. -func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot { - var snap snapshot +func (m *Manager) snapshotLocked(spans *spanset.SpanSet) Snapshot { + var snap Snapshot for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { sm := &m.scopes[s] reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 @@ -333,9 +351,9 @@ func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { } } -// wait waits for all interfering latches in the provided snapshot to complete +// Wait waits for all interfering latches in the provided snapshot to complete // before returning. -func (m *Manager) wait(ctx context.Context, lg *Guard, snap *snapshot) error { +func (m *Manager) Wait(ctx context.Context, lg *Guard, snap *Snapshot) error { timer := timeutil.NewTimer() timer.Reset(base.SlowRequestThreshold) defer timer.Stop() @@ -426,6 +444,58 @@ func (m *Manager) waitForSignal(ctx context.Context, t *timeutil.Timer, wait, he } } +// Overlaps determines if any of the spans in the provided spanset, at the +// specified timestamp, overlap with any of the latches in the snapshot. +func (m *Manager) Overlaps(spans *spanset.SpanSet, ts hlc.Timestamp, sn *Snapshot) bool { + // NB: the function has a similar structure to Manager.wait, but any attempt + // at code deduplication causes allocations and obscures the code. Avoiding + // these allocations would require further obfuscation. + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &sn.trees[s] + search.ts = ifGlobal(ts, s) + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return true + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return true + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return true + } + default: + panic("unknown access") + } + } + } + } + return false +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + held := it.Cur() + if ignore(search.ts, held.ts) { + continue + } + return true + } + return false +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index 29c16a1a8a11..86fb5e05b495 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -104,7 +104,8 @@ func (m *Manager) MustAcquireChCtx( ch := make(chan *Guard) lg, snap := m.sequence(spans, ts) go func() { - err := m.wait(ctx, lg, &snap) + defer snap.Close() + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) lg = nil @@ -424,14 +425,23 @@ func TestLatchManagerDependentLatches(t *testing.T) { var m Manager lg1 := m.MustAcquire(c.sp1, c.ts1) - lg2C := m.MustAcquireCh(c.sp2, c.ts2) + lg2, snap2 := m.sequence(c.sp2, c.ts2) + lg2C := make(chan *Guard) + defer snap2.Close() + go func() { + err := m.Wait(context.Background(), lg2, &snap2) + require.Nil(t, err) + lg2C <- lg2 + }() + + require.Equal(t, c.dependent, m.Overlaps(c.sp2, c.ts2, &snap2)) if c.dependent { testLatchBlocks(t, lg2C) m.Release(lg1) - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg2) } else { - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg1) m.Release(lg2) } @@ -512,7 +522,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { lg, snap := m.sequence(&spans[i], zeroTS) - snap.close() + snap.Close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) }