diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index e999b4bd6259..32a27ffcc704 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1033,9 +1033,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard + sn *spanlatch.Snapshot ba roachpb.BatchRequest } +// optimistic returns whether the batch should execute optimistically. +func (ec *endCmds) optimistic() bool { return ec.sn != nil } + +// checkOptimisticConflicts determines whether any earlier requests that held +// latches at the time that this request optimistically evaluated conflict with +// the span of keys that this request interacted with. On conflict, the method +// waits to successfully acquire all latches. After that, the request needs to +// be retried because the optimistic evaluation may or may not have observed the +// effect of the earlier requests. +// +// To call, ec.optimistic must be true. After the method is called, ec.optimistic +// will return false. +func (ec *endCmds) checkOptimisticConflicts( + ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet, +) (bool, error) { + // We only use the latch manager snapshot once. + defer func() { + ec.sn.Close() + ec.sn = nil + }() + + // If the optimistic evaluation didn't run into an error, create a copy of + // the batch with the scan bounds of all requests constrained to the spans + // that they actually scanned over. If it did, check for conflicts over the + // entire original span set. + if pErr == nil { + baCopy := ec.ba + baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...) + for i := 0; i < len(baCopy.Requests); i++ { + req := baCopy.Requests[i].GetInner() + header := req.Header() + + resp := br.Responses[i].GetInner() + if resp.Header().ResumeSpan == nil { + continue + } + + switch t := resp.(type) { + case *roachpb.ScanResponse: + if header.Key.Equal(t.ResumeSpan.Key) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.EndKey = t.ResumeSpan.Key + case *roachpb.ReverseScanResponse: + if header.EndKey.Equal(t.ResumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.Key = t.ResumeSpan.EndKey + default: + continue + } + req = req.ShallowCopy() + req.SetHeader(header) + baCopy.Requests[i].MustSetInner(req) + } + + // Collect the batch's declared spans again, this time with the + // constrained span bounds. + var err error + spans, _, err = ec.repl.collectSpans(&baCopy) + if err != nil { + return false, err + } + } + + // Determine whether any actively held latch at the time of evaluation + // overlaps this constraint span set. If so, we have a conflict. + conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn) + if !conflict { + return false, nil + } + + // If there were conflicts, wait for latches to be released, like + // we should have in the first place. + log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating") + return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn) +} + // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { @@ -1051,10 +1136,30 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { if ec.lg != nil { ec.repl.latchMgr.Release(ec.lg) } + + // Close the snapshot to release any resources that it holds. + if ec.sn != nil { + ec.sn.Close() + ec.sn = nil + } } -func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) { - spans := &spanset.SpanSet{} +// collectSpans collects all of the spans that the batch may touch into a +// SpanSet. +// +// It also determines whether the batch should evaluate optimistically based on +// the key limits on the batch header. When a batch evaluates optimistically, it +// doesn't wait to acquire all of its latches. Instead, if begins evaluating +// immediately and verifies that it would not have needed to wait on any latch +// acquisitions after-the-fact. This is useful for requests that need to declare +// a much larger set of keys than they expect to touch in practice (e.g. limited +// scans). +func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) { + r.mu.RLock() + desc := r.descRLocked() + liveCount := r.mu.state.Stats.LiveCount + r.mu.RUnlock() + // TODO(bdarnell): need to make this less global when local // latches are used more heavily. For example, a split will // have a large read-only span but also a write (see #10084). @@ -1065,6 +1170,7 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // // TODO(bdarnell): revisit as the local portion gets its appropriate // use. + spans := &spanset.SpanSet{} if ba.IsReadOnly() { spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } else { @@ -1076,14 +1182,13 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess) } - desc := r.Desc() batcheval.DeclareKeysForBatch(desc, ba.Header, spans) for _, union := range ba.Requests { inner := union.GetInner() if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(desc, ba.Header, inner, spans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, false, errors.Errorf("unrecognized command %s", inner.Method()) } } @@ -1094,9 +1199,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // If any command gave us spans that are invalid, bail out early // (before passing them to the spanlatch manager, which may panic). if err := spans.Validate(); err != nil { - return nil, err + return nil, false, err } - return spans, nil + + // Evaluate batches optimistically if they have a key limit which is less + // than the number of live keys on the Range. Ignoring write latches can be + // massively beneficial because it can help avoid waiting on writes to keys + // that the batch will never actually need to read due to the overestimate + // of its key bounds. Only after it is clear exactly what spans were read do + // we verify whether there were any conflicts with concurrent writes. + // + // This case is not uncommon; for example, a Scan which requests the entire + // range but has a limit of 1 result. We want to avoid allowing overly broad + // spans from backing up the latch manager. + limit := ba.Header.MaxSpanRequestKeys + optimistic := limit > 0 && limit < liveCount + + return spans, optimistic, nil } // beginCmds waits for any in-flight, conflicting commands to complete. This @@ -1109,11 +1228,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // returns a cleanup function to be called when the commands are done and can be // removed from the queue, and whose returned error is to be used in place of // the supplied error. +// +// See collectSpans for a discussion on optimistic evaluation. func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool, ) (*endCmds, error) { + ec := endCmds{ + repl: r, + ba: *ba, + } + // Only acquire latches for consistent operations. - var lg *spanlatch.Guard if ba.ReadConsistency == roachpb.CONSISTENT { // Check for context cancellation before acquiring latches. if err := ctx.Err(); err != nil { @@ -1129,10 +1254,16 @@ func (r *Replica) beginCmds( // Acquire latches for all the request's declared spans to ensure // protected access and to avoid interacting requests from operating at // the same time. The latches will be held for the duration of request. - var err error - lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) - if err != nil { - return nil, err + if optimistic { + // Optimistic acquisition does not wait for existing latches to be + // released. + ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp) + } else { + var err error + ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) + if err != nil { + return nil, err + } } if !beforeLatch.IsZero() { @@ -1142,7 +1273,7 @@ func (r *Replica) beginCmds( if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(*ba); pErr != nil { - r.latchMgr.Release(lg) + ec.done(nil, pErr) return nil, pErr.GoError() } } @@ -1191,8 +1322,9 @@ func (r *Replica) beginCmds( // The store will catch that error and resubmit the request after // mergeCompleteCh closes. See #27442 for the full context. log.Event(ctx, "waiting on in-progress merge") - r.latchMgr.Release(lg) - return nil, &roachpb.MergeInProgressError{} + err := &roachpb.MergeInProgressError{} + ec.done(nil, roachpb.NewError(err)) + return nil, err } } else { log.Event(ctx, "operation accepts inconsistent results") @@ -1208,12 +1340,7 @@ func (r *Replica) beginCmds( } } - ec := &endCmds{ - repl: r, - lg: lg, - ba: *ba, - } - return ec, nil + return &ec, nil } // executeAdminBatch executes the command directly. There is no interaction diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 321546466304..46bcefe7b18f 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -42,7 +42,7 @@ func (r *Replica) executeReadOnlyBatch( } r.limitTxnMaxTimestamp(ctx, &ba, status) - spans, err := r.collectSpans(&ba) + spans, optimistic, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -50,15 +50,11 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic) if err != nil { return nil, roachpb.NewError(err) } - log.Event(ctx, "waiting for read lock") - r.readOnlyCmdMu.RLock() - defer r.readOnlyCmdMu.RUnlock() - // Guarantee we release the latches that we just acquired. It is // important that this is inside the readOnlyCmdMu lock so that the // timestamp cache update is synchronized. This is wrapped to delay @@ -67,31 +63,53 @@ func (r *Replica) executeReadOnlyBatch( endCmds.done(br, pErr) }() - // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? - if _, err := r.IsDestroyed(); err != nil { - return nil, roachpb.NewError(err) - } + // Loop to support optimistic evaluation. We'll only evalute up to twice. + var result result.Result + for { + br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans) + if !endCmds.optimistic() { + // If this was not an optimistic evaluation, break. + break + } - rSpan, err := keys.Range(ba) - if err != nil { - return nil, roachpb.NewError(err) + // If this was an optimistic evaluation, determine whether the keys + // that it touched conflict with in-flight requests that were ignored. + if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil { + pErr = roachpb.NewError(err) + } else if conflict { + // If they do, retry the evaluation. This time, we will not be + // evaluating optimistically because checkOptimisticConflicts + // waited for all latch acquisitions to succeed. + continue + } + break } - if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { - return nil, roachpb.NewError(err) - } + // // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + // if _, err := r.IsDestroyed(); err != nil { + // return nil, roachpb.NewError(err) + // } - // Evaluate read-only batch command. It checks for matching key range; note - // that holding readOnlyCmdMu throughout is important to avoid reads from the - // "wrong" key range being served after the range has been split. - var result result.Result - rec := NewReplicaEvalContext(r, spans) - readOnly := r.store.Engine().NewReadOnly() - if util.RaceEnabled { - readOnly = spanset.NewReadWriter(readOnly, spans) - } - defer readOnly.Close() - br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) + // rSpan, err := keys.Range(ba) + // if err != nil { + // return nil, roachpb.NewError(err) + // } + + // if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + // return nil, roachpb.NewError(err) + // } + + // // Evaluate read-only batch command. It checks for matching key range; note + // // that holding readOnlyCmdMu throughout is important to avoid reads from the + // // "wrong" key range being served after the range has been split. + // var result result.Result + // rec := NewReplicaEvalContext(r, spans) + // readOnly := r.store.Engine().NewReadOnly() + // if util.RaceEnabled { + // readOnly = spanset.NewReadWriter(readOnly, spans) + // } + // defer readOnly.Close() + // br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) // A merge is (likely) about to be carried out, and this replica // needs to block all traffic until the merge either commits or @@ -124,3 +142,37 @@ func (r *Replica) executeReadOnlyBatch( } return br, pErr } + +// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its +// results. It expects that latches have already been acquired for the spans +// that it will touch. +func (r *Replica) evaluateReadOnlyBatch( + ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet, +) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + log.Event(ctx, "waiting for read lock") + r.readOnlyCmdMu.RLock() + defer r.readOnlyCmdMu.RUnlock() + + // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + if _, err := r.IsDestroyed(); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + rSpan, err := keys.Range(ba) + if err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + + // Evaluate read-only batch command. It checks for matching key range; note + // that holding readOnlyCmdMu throughout is important to avoid reads from the + // "wrong" key range being served after the range has been split. + rec := NewReplicaEvalContext(r, spans) + readOnly := r.store.Engine().NewReadOnly() + defer readOnly.Close() + if util.RaceEnabled { + readOnly = spanset.NewReadWriter(readOnly, spans) + } + return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) +} diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 97e421f405c2..7ed9b81f1894 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2638,18 +2638,29 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { } if test.interferes { + // Neither request should complete until the write is unblocked. select { case <-time.After(10 * time.Millisecond): // Expected. case pErr := <-errCh: t.Fatalf("expected interference: got error %s", pErr) } - } - // Verify no errors on waiting read and write. - blockCh <- struct{}{} - for j := 0; j < 2; j++ { + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. if pErr := <-errCh; pErr != nil { - t.Errorf("error %d: unexpected error: %s", j, pErr) + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) } } }) @@ -2690,6 +2701,122 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans evaluate +// optimistically without waiting for latches to be acquired. In some cases, +// this allows them to avoid waiting on writes that their over-estimated +// declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(&sArgs1, &sArgs2) + + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs storagebase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.StartWithStoreConfig(t, stopper, tsc) + + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + blockWriter.Store(false) + + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded // based on the read timestamp cache. func TestReplicaUseTSCache(t *testing.T) { diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 13df5e878395..d59f22c87806 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -149,6 +149,12 @@ func (r *Replica) updateTimestampCache( case *roachpb.ScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ScanResponse) if resp.ResumeSpan != nil { + if start.Equal(resp.ResumeSpan.Key) { + // If the forward scan was evaluated with a key limit of zero + // then it will return a resume span equal to its request + // span. In this case, don't update the timestamp cache. + continue + } // Note that for forward scan, the resume span will start at // the (last key read).Next(), which is actually the correct // end key for the span to update the timestamp cache. @@ -158,6 +164,12 @@ func (r *Replica) updateTimestampCache( case *roachpb.ReverseScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse) if resp.ResumeSpan != nil { + if end.Equal(resp.ResumeSpan.EndKey) { + // If the reverse scan was evaluated with a key limit of zero + // then it will return a resume span equal to its request + // span. In this case, don't update the timestamp cache. + continue + } // Note that for reverse scans, the resume span's end key is // an open interval. That means it was read as part of this op // and won't be read on resume. It is the correct start key for diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index ac6a1a0482a3..a1f2dea80f00 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -74,7 +74,7 @@ func (r *Replica) executeWriteBatch( return nil, roachpb.NewError(err) } - spans, err := r.collectSpans(&ba) + spans, _, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -87,7 +87,7 @@ func (r *Replica) executeWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, &ba, spans, false /* optimistic */) if err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index e72f36534cf4..0771ee2f450e 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -197,14 +197,14 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // be released, it stops waiting and releases all latches that it has already // acquired. // -// It returns a Guard which must be provided to Release. +// The method returns a Guard which must be provided to Release. func (m *Manager) Acquire( ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, ) (*Guard, error) { lg, snap := m.sequence(spans, ts) - defer snap.close() + defer snap.Close() - err := m.wait(ctx, lg, snap) + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) return nil, err @@ -212,11 +212,28 @@ func (m *Manager) Acquire( return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it optimistically +// assumes that there are no currently held latches that need to be waited on. +// This can be verified after the fact by passing the returned Snapshot to +// Overlaps. +// +// Regardless of whether existing latches are ignored or not by this method, +// future calls to Acquire will observe the latches acquired here and will wait +// for them to be Released, as usual. +// +// The method returns a Guard which must be provided to Release. It also returns +// a Snapshot which must be Closed when no longer in use. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, *Snapshot) { + lg, snap := m.sequence(spans, ts) + return lg, &snap +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, Snapshot) { lg := newGuard(spans, ts) m.mu.Lock() @@ -226,13 +243,14 @@ func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, sn return lg, snap } -// snapshot is an immutable view into the latch manager's state. -type snapshot struct { +// Snapshot is an immutable view into the latch manager's state. +type Snapshot struct { trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree } -// close closes the snapshot and releases any associated resources. -func (sn *snapshot) close() { +// Close closes the Snapshot and releases any associated resources. The method +// can be called multiple times. +func (sn *Snapshot) Close() { for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { sn.trees[s][a].Reset() @@ -242,8 +260,8 @@ func (sn *snapshot) close() { // snapshotLocked captures an immutable snapshot of the latch manager. It takes // a spanset to limit the amount of state captured. -func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot { - var snap snapshot +func (m *Manager) snapshotLocked(spans *spanset.SpanSet) Snapshot { + var snap Snapshot for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { sm := &m.scopes[s] reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 @@ -329,9 +347,9 @@ func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { } } -// wait waits for all interfering latches in the provided snapshot to complete +// Wait waits for all interfering latches in the provided snapshot to complete // before returning. -func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { +func (m *Manager) Wait(ctx context.Context, lg *Guard, snap *Snapshot) error { timer := timeutil.NewTimer() timer.Reset(base.SlowRequestThreshold) defer timer.Stop() @@ -422,6 +440,58 @@ func (m *Manager) waitForSignal(ctx context.Context, t *timeutil.Timer, wait, he } } +// Overlaps determines if any of the spans in the provided spanset, at the +// specified timestamp, overlap with any of the latches in the snapshot. +func (m *Manager) Overlaps(spans *spanset.SpanSet, ts hlc.Timestamp, sn *Snapshot) bool { + // NB: the function has a similar structure to Manager.wait, but any attempt + // at code deduplication causes allocations and obscures the code. Avoiding + // these allocations would require further obfuscation. + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &sn.trees[s] + search.ts = ifGlobal(ts, s) + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return true + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return true + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return true + } + default: + panic("unknown access") + } + } + } + } + return false +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + held := it.Cur() + if ignore(search.ts, held.ts) { + continue + } + return true + } + return false +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index dca176643d35..7d2be60721b5 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -104,7 +104,8 @@ func (m *Manager) MustAcquireChCtx( ch := make(chan *Guard) lg, snap := m.sequence(spans, ts) go func() { - err := m.wait(ctx, lg, snap) + defer snap.Close() + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) lg = nil @@ -424,14 +425,23 @@ func TestLatchManagerDependentLatches(t *testing.T) { var m Manager lg1 := m.MustAcquire(c.sp1, c.ts1) - lg2C := m.MustAcquireCh(c.sp2, c.ts2) + lg2, snap2 := m.sequence(c.sp2, c.ts2) + lg2C := make(chan *Guard) + defer snap2.Close() + go func() { + err := m.Wait(context.Background(), lg2, &snap2) + require.Nil(t, err) + lg2C <- lg2 + }() + + require.Equal(t, c.dependent, m.Overlaps(c.sp2, c.ts2, &snap2)) if c.dependent { testLatchBlocks(t, lg2C) m.Release(lg1) - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg2) } else { - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg1) m.Release(lg2) } @@ -512,7 +522,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { lg, snap := m.sequence(&spans[i], zeroTS) - snap.close() + snap.Close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) } diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 25031070644e..48639f99144f 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -50,6 +50,7 @@ type kv struct { cycleLength int64 readPercent int spanPercent int + spanLimit int seed int64 writeSeq string sequential bool @@ -97,6 +98,8 @@ var kvMeta = workload.Meta{ `Percent (0-100) of operations that are reads of existing keys.`) g.flags.IntVar(&g.spanPercent, `span-percent`, 0, `Percent (0-100) of operations that are spanning queries of all ranges.`) + g.flags.IntVar(&g.spanLimit, `span-limit`, 0, + `LIMIT count for each spanning query, or 0 for no limit`) g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`) g.flags.BoolVar(&g.zipfian, `zipfian`, false, `Pick keys in a zipfian distribution instead of randomly.`) @@ -239,7 +242,13 @@ func (w *kv) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, er writeStmtStr := buf.String() // Span statement - spanStmtStr := "SELECT count(v) FROM kv" + buf.Reset() + buf.WriteString(`SELECT count(v) FROM [SELECT v FROM kv`) + if w.spanLimit > 0 { + fmt.Fprintf(&buf, ` WHERE k >= $1 LIMIT %d`, w.spanLimit) + } + buf.WriteString(`]`) + spanStmtStr := buf.String() ql := workload.QueryLoad{SQLDatabase: sqlDatabase} seq := &sequence{config: w, val: int64(writeSeq)} @@ -308,10 +317,18 @@ func (o *kvOp) run(ctx context.Context) error { statementProbability -= o.config.readPercent if statementProbability < o.config.spanPercent { start := timeutil.Now() - _, err := o.spanStmt.Exec(ctx) + var args []interface{} + if o.config.spanLimit > 0 { + args = append(args, o.g.readKey()) + } + rows, err := o.spanStmt.Query(ctx, args...) + if err != nil { + return err + } + rows.Close() elapsed := timeutil.Since(start) o.hists.Get(`span`).Record(elapsed) - return err + return rows.Err() } const argCount = 2 args := make([]interface{}, argCount*o.config.batchSize)