Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: evaluate limited scans optimistically without latching #33373

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 148 additions & 21 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er
type endCmds struct {
repl *Replica
lg *spanlatch.Guard
sn *spanlatch.Snapshot
ba roachpb.BatchRequest
}

// optimistic returns whether the batch should execute optimistically.
func (ec *endCmds) optimistic() bool { return ec.sn != nil }

// checkOptimisticConflicts determines whether any earlier requests that held
// latches at the time that this request optimistically evaluated conflict with
// the span of keys that this request interacted with. On conflict, the method
// waits to successfully acquire all latches. After that, the request needs to
// be retried because the optimistic evaluation may or may not have observed the
// effect of the earlier requests.
//
// To call, ec.optimistic must be true. After the method is called, ec.optimistic
// will return false.
func (ec *endCmds) checkOptimisticConflicts(
ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet,
) (bool, error) {
// We only use the latch manager snapshot once.
defer func() {
ec.sn.Close()
ec.sn = nil
}()

// If the optimistic evaluation didn't run into an error, create a copy of
// the batch with the scan bounds of all requests constrained to the spans
// that they actually scanned over. If it did, check for conflicts over the
// entire original span set.
if pErr == nil {
baCopy := ec.ba
baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...)
for i := 0; i < len(baCopy.Requests); i++ {
req := baCopy.Requests[i].GetInner()
header := req.Header()

resp := br.Responses[i].GetInner()
if resp.Header().ResumeSpan == nil {
continue
}

switch t := resp.(type) {
case *roachpb.ScanResponse:
if header.Key.Equal(t.ResumeSpan.Key) {
// The request did not evaluate. Ignore it.
baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...)
i--
continue
}
header.EndKey = t.ResumeSpan.Key
case *roachpb.ReverseScanResponse:
if header.EndKey.Equal(t.ResumeSpan.EndKey) {
// The request did not evaluate. Ignore it.
baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...)
i--
continue
}
header.Key = t.ResumeSpan.EndKey
default:
continue
}
req = req.ShallowCopy()
req.SetHeader(header)
baCopy.Requests[i].MustSetInner(req)
}

// Collect the batch's declared spans again, this time with the
// constrained span bounds.
var err error
spans, _, err = ec.repl.collectSpans(&baCopy)
if err != nil {
return false, err
}
}

// Determine whether any actively held latch at the time of evaluation
// overlaps this constraint span set. If so, we have a conflict.
conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn)
if !conflict {
return false, nil
}

// If there were conflicts, wait for latches to be released, like
// we should have in the first place.
log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating")
return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn)
}

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
Expand All @@ -1051,10 +1136,30 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
if ec.lg != nil {
ec.repl.latchMgr.Release(ec.lg)
}

// Close the snapshot to release any resources that it holds.
if ec.sn != nil {
ec.sn.Close()
ec.sn = nil
}
}

func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) {
spans := &spanset.SpanSet{}
// collectSpans collects all of the spans that the batch may touch into a
// SpanSet.
//
// It also determines whether the batch should evaluate optimistically based on
// the key limits on the batch header. When a batch evaluates optimistically, it
// doesn't wait to acquire all of its latches. Instead, if begins evaluating
// immediately and verifies that it would not have needed to wait on any latch
// acquisitions after-the-fact. This is useful for requests that need to declare
// a much larger set of keys than they expect to touch in practice (e.g. limited
// scans).
func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) {
r.mu.RLock()
desc := r.descRLocked()
liveCount := r.mu.state.Stats.LiveCount
r.mu.RUnlock()

// TODO(bdarnell): need to make this less global when local
// latches are used more heavily. For example, a split will
// have a large read-only span but also a write (see #10084).
Expand All @@ -1065,6 +1170,7 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
//
// TODO(bdarnell): revisit as the local portion gets its appropriate
// use.
spans := &spanset.SpanSet{}
if ba.IsReadOnly() {
spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests))
} else {
Expand All @@ -1076,14 +1182,13 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess)
}

desc := r.Desc()
batcheval.DeclareKeysForBatch(desc, ba.Header, spans)
for _, union := range ba.Requests {
inner := union.GetInner()
if cmd, ok := batcheval.LookupCommand(inner.Method()); ok {
cmd.DeclareKeys(desc, ba.Header, inner, spans)
} else {
return nil, errors.Errorf("unrecognized command %s", inner.Method())
return nil, false, errors.Errorf("unrecognized command %s", inner.Method())
}
}

Expand All @@ -1094,9 +1199,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
// If any command gave us spans that are invalid, bail out early
// (before passing them to the spanlatch manager, which may panic).
if err := spans.Validate(); err != nil {
return nil, err
return nil, false, err
}
return spans, nil

// Evaluate batches optimistically if they have a key limit which is less
// than the number of live keys on the Range. Ignoring write latches can be
// massively beneficial because it can help avoid waiting on writes to keys
// that the batch will never actually need to read due to the overestimate
// of its key bounds. Only after it is clear exactly what spans were read do
// we verify whether there were any conflicts with concurrent writes.
//
// This case is not uncommon; for example, a Scan which requests the entire
// range but has a limit of 1 result. We want to avoid allowing overly broad
// spans from backing up the latch manager.
limit := ba.Header.MaxSpanRequestKeys
optimistic := limit > 0 && limit < liveCount

return spans, optimistic, nil
}

// beginCmds waits for any in-flight, conflicting commands to complete. This
Expand All @@ -1109,11 +1228,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
// returns a cleanup function to be called when the commands are done and can be
// removed from the queue, and whose returned error is to be used in place of
// the supplied error.
//
// See collectSpans for a discussion on optimistic evaluation.
func (r *Replica) beginCmds(
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool,
) (*endCmds, error) {
ec := endCmds{
repl: r,
ba: *ba,
}

// Only acquire latches for consistent operations.
var lg *spanlatch.Guard
if ba.ReadConsistency == roachpb.CONSISTENT {
// Check for context cancellation before acquiring latches.
if err := ctx.Err(); err != nil {
Expand All @@ -1129,10 +1254,16 @@ func (r *Replica) beginCmds(
// Acquire latches for all the request's declared spans to ensure
// protected access and to avoid interacting requests from operating at
// the same time. The latches will be held for the duration of request.
var err error
lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp)
if err != nil {
return nil, err
if optimistic {
// Optimistic acquisition does not wait for existing latches to be
// released.
ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp)
} else {
var err error
ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp)
if err != nil {
return nil, err
}
}

if !beforeLatch.IsZero() {
Expand All @@ -1142,7 +1273,7 @@ func (r *Replica) beginCmds(

if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil {
if pErr := filter(*ba); pErr != nil {
r.latchMgr.Release(lg)
ec.done(nil, pErr)
return nil, pErr.GoError()
}
}
Expand Down Expand Up @@ -1191,8 +1322,9 @@ func (r *Replica) beginCmds(
// The store will catch that error and resubmit the request after
// mergeCompleteCh closes. See #27442 for the full context.
log.Event(ctx, "waiting on in-progress merge")
r.latchMgr.Release(lg)
return nil, &roachpb.MergeInProgressError{}
err := &roachpb.MergeInProgressError{}
ec.done(nil, roachpb.NewError(err))
return nil, err
}
} else {
log.Event(ctx, "operation accepts inconsistent results")
Expand All @@ -1208,12 +1340,7 @@ func (r *Replica) beginCmds(
}
}

ec := &endCmds{
repl: r,
lg: lg,
ba: *ba,
}
return ec, nil
return &ec, nil
}

// executeAdminBatch executes the command directly. There is no interaction
Expand Down
106 changes: 79 additions & 27 deletions pkg/storage/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,19 @@ func (r *Replica) executeReadOnlyBatch(
}
r.limitTxnMaxTimestamp(ctx, &ba, status)

spans, err := r.collectSpans(&ba)
spans, optimistic, err := r.collectSpans(&ba)
if err != nil {
return nil, roachpb.NewError(err)
}

// Acquire latches to prevent overlapping commands from executing
// until this command completes.
log.Event(ctx, "acquire latches")
endCmds, err := r.beginCmds(ctx, &ba, spans)
endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic)
if err != nil {
return nil, roachpb.NewError(err)
}

log.Event(ctx, "waiting for read lock")
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()

// Guarantee we release the latches that we just acquired. It is
// important that this is inside the readOnlyCmdMu lock so that the
// timestamp cache update is synchronized. This is wrapped to delay
Expand All @@ -67,31 +63,53 @@ func (r *Replica) executeReadOnlyBatch(
endCmds.done(br, pErr)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
if _, err := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}
// Loop to support optimistic evaluation. We'll only evalute up to twice.
var result result.Result
for {
br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans)
if !endCmds.optimistic() {
// If this was not an optimistic evaluation, break.
break
}

rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
// If this was an optimistic evaluation, determine whether the keys
// that it touched conflict with in-flight requests that were ignored.
if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil {
pErr = roachpb.NewError(err)
} else if conflict {
// If they do, retry the evaluation. This time, we will not be
// evaluating optimistically because checkOptimisticConflicts
// waited for all latch acquisitions to succeed.
continue
}
break
}

if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}
// // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
// if _, err := r.IsDestroyed(); err != nil {
// return nil, roachpb.NewError(err)
// }

// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
var result result.Result
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
defer readOnly.Close()
br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)
// rSpan, err := keys.Range(ba)
// if err != nil {
// return nil, roachpb.NewError(err)
// }

// if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
// return nil, roachpb.NewError(err)
// }

// // Evaluate read-only batch command. It checks for matching key range; note
// // that holding readOnlyCmdMu throughout is important to avoid reads from the
// // "wrong" key range being served after the range has been split.
// var result result.Result
// rec := NewReplicaEvalContext(r, spans)
// readOnly := r.store.Engine().NewReadOnly()
// if util.RaceEnabled {
// readOnly = spanset.NewReadWriter(readOnly, spans)
// }
// defer readOnly.Close()
// br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)

// A merge is (likely) about to be carried out, and this replica
// needs to block all traffic until the merge either commits or
Expand Down Expand Up @@ -124,3 +142,37 @@ func (r *Replica) executeReadOnlyBatch(
}
return br, pErr
}

// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its
// results. It expects that latches have already been acquired for the spans
// that it will touch.
func (r *Replica) evaluateReadOnlyBatch(
ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet,
) (*roachpb.BatchResponse, result.Result, *roachpb.Error) {
log.Event(ctx, "waiting for read lock")
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
if _, err := r.IsDestroyed(); err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}
rSpan, err := keys.Range(ba)
if err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}

// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
defer readOnly.Close()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)
}
Loading