Skip to content

Commit

Permalink
storage: evaluate limited scans optimistically without latching
Browse files Browse the repository at this point in the history
Fixes cockroachdb#9521.
Supersedes cockroachdb#31904.

SQL has a tendency to create scans which cover a range's entire key
span, though looking only to return a finite number of results. These
requests end up blocking on writes that are holding latches over keys
that the scan will not actually touch. In reality, when there is a
scan with a limit, the actual affected key span ends up being a small
subset of the range's key span.

This change creates a new "optimistic evaluation" path for read-only
requests. When evaluating optimistically, the batch will sequence itself
with the latch manager, but will not wait to acquire all of its latches.
Instead, it begins evaluating immediately and verifies that it would not
have needed to wait on any latch acquisitions after-the-fact. When
performing this verification, it uses knowledge about the limited set
of keys that the scan actually looked at. If there are no conflicts, the
scan succeeds. If there are conflicts, the request waits for all of its
latch acquisition attempts to finish and re-evaluates.

This PR replaces cockroachdb#31904. The major difference between the two is that
this PR exploits the structure of the latch manager to efficiently
perform optimistic latch acquisition and after-the-fact verification
of conflicts. Doing this requires keeping no extra state because it
uses the immutable snapshots that the latch manager now captures during
sequencing. The other major difference is that this PR does not release
latches after a failed optimistic evaluation.

NOTE: a prevalent theory of the pathological case with this behavior
was that overestimated read latches would serialize with write latches,
causing all requests on a range to serialize. I wasn't seeing this
in practice. It turns out that the "timestamp awareness" in the
latch manager should avoid this behavior in most cases because later
writes will have higher timestamps than earlier reads. The effect of
this is that they won't be considered to interfere by the latch manager.
Still, large clusters with a high amount of clock skew could see a
bounded variant of this situation.

_### Benchmark Results

```
name                                   old ops/sec  new ops/sec  delta
kv95/cores=16/nodes=3/splits=3          51.9k ± 0%   51.7k ± 1%     ~     (p=0.400 n=3+3)
kvS70-L1/cores=16/nodes=3/splits=3      24.1k ± 4%   27.7k ± 1%  +14.75%  (p=0.100 n=3+3)
kvS70-L5/cores=16/nodes=3/splits=3      24.5k ± 1%   27.5k ± 1%  +12.08%  (p=0.100 n=3+3)
kvS70-L1000/cores=16/nodes=3/splits=3   16.0k ± 1%   16.6k ± 2%   +3.79%  (p=0.100 n=3+3)

name                                   old p50(ms)  new p50(ms)  delta
kv95/cores=16/nodes=3/splits=3           0.70 ± 0%    0.70 ± 0%     ~     (all equal)
kvS70-L1/cores=16/nodes=3/splits=3       1.07 ± 6%    0.90 ± 0%  -15.62%  (p=0.100 n=3+3)
kvS70-L5/cores=16/nodes=3/splits=3       1.10 ± 0%    0.90 ± 0%  -18.18%  (p=0.100 n=3+3)
kvS70-L1000/cores=16/nodes=3/splits=3    1.80 ± 0%    1.67 ± 4%   -7.41%  (p=0.100 n=3+3)

name                                   old p99(ms)  new p99(ms)  delta
kv95/cores=16/nodes=3/splits=3           1.80 ± 0%    1.80 ± 0%     ~     (all equal)
kvS70-L1/cores=16/nodes=3/splits=3       5.77 ±32%    4.70 ± 0%     ~     (p=0.400 n=3+3)
kvS70-L5/cores=16/nodes=3/splits=3       5.00 ± 0%    4.70 ± 0%   -6.00%  (p=0.100 n=3+3)
kvS70-L1000/cores=16/nodes=3/splits=3    6.90 ± 3%    7.33 ± 8%     ~     (p=0.400 n=3+3)
```

_S<num> = --span-percent=<num>, L<num> = --span-limit=<num>_

Release note (performance improvement): improved performance on workloads
which mix OLAP queries with inserts and updates.
  • Loading branch information
nvanbenschoten committed Jul 11, 2019
1 parent 63d2350 commit cc26940
Show file tree
Hide file tree
Showing 6 changed files with 442 additions and 67 deletions.
169 changes: 148 additions & 21 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,9 +1033,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er
type endCmds struct {
repl *Replica
lg *spanlatch.Guard
sn *spanlatch.Snapshot
ba roachpb.BatchRequest
}

// optimistic returns whether the batch should execute optimistically.
func (ec *endCmds) optimistic() bool { return ec.sn != nil }

// checkOptimisticConflicts determines whether any earlier requests that held
// latches at the time that this request optimistically evaluated conflict with
// the span of keys that this request interacted with. On conflict, the method
// waits to successfully acquire all latches. After that, the request needs to
// be retried because the optimistic evaluation may or may not have observed the
// effect of the earlier requests.
//
// To call, ec.optimistic must be true. After the method is called, ec.optimistic
// will return false.
func (ec *endCmds) checkOptimisticConflicts(
ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet,
) (bool, error) {
// We only use the latch manager snapshot once.
defer func() {
ec.sn.Close()
ec.sn = nil
}()

// If the optimistic evaluation didn't run into an error, create a copy of
// the batch with the scan bounds of all requests constrained to the spans
// that they actually scanned over. If it did, check for conflicts over the
// entire original span set.
if pErr == nil {
baCopy := ec.ba
baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...)
for i := 0; i < len(baCopy.Requests); i++ {
req := baCopy.Requests[i].GetInner()
header := req.Header()

resp := br.Responses[i].GetInner()
if resp.Header().ResumeSpan == nil {
continue
}

switch t := resp.(type) {
case *roachpb.ScanResponse:
if header.Key.Equal(t.ResumeSpan.Key) {
// The request did not evaluate. Ignore it.
baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...)
i--
continue
}
header.EndKey = t.ResumeSpan.Key
case *roachpb.ReverseScanResponse:
if header.EndKey.Equal(t.ResumeSpan.EndKey) {
// The request did not evaluate. Ignore it.
baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...)
i--
continue
}
header.Key = t.ResumeSpan.EndKey
default:
continue
}
req = req.ShallowCopy()
req.SetHeader(header)
baCopy.Requests[i].MustSetInner(req)
}

// Collect the batch's declared spans again, this time with the
// constrained span bounds.
var err error
spans, _, err = ec.repl.collectSpans(&baCopy)
if err != nil {
return false, err
}
}

// Determine whether any actively held latch at the time of evaluation
// overlaps this constraint span set. If so, we have a conflict.
conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn)
if !conflict {
return false, nil
}

// If there were conflicts, wait for latches to be released, like
// we should have in the first place.
log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating")
return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn)
}

// done releases the latches acquired by the command and updates
// the timestamp cache using the final timestamp of each command.
func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
Expand All @@ -1051,10 +1136,30 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) {
if ec.lg != nil {
ec.repl.latchMgr.Release(ec.lg)
}

// Close the snapshot to release any resources that it holds.
if ec.sn != nil {
ec.sn.Close()
ec.sn = nil
}
}

func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) {
spans := &spanset.SpanSet{}
// collectSpans collects all of the spans that the batch may touch into a
// SpanSet.
//
// It also determines whether the batch should evaluate optimistically based on
// the key limits on the batch header. When a batch evaluates optimistically, it
// doesn't wait to acquire all of its latches. Instead, if begins evaluating
// immediately and verifies that it would not have needed to wait on any latch
// acquisitions after-the-fact. This is useful for requests that need to declare
// a much larger set of keys than they expect to touch in practice (e.g. limited
// scans).
func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) {
r.mu.RLock()
desc := r.descRLocked()
liveCount := r.mu.state.Stats.LiveCount
r.mu.RUnlock()

// TODO(bdarnell): need to make this less global when local
// latches are used more heavily. For example, a split will
// have a large read-only span but also a write (see #10084).
Expand All @@ -1065,6 +1170,7 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
//
// TODO(bdarnell): revisit as the local portion gets its appropriate
// use.
spans := &spanset.SpanSet{}
if ba.IsReadOnly() {
spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests))
} else {
Expand All @@ -1076,14 +1182,13 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess)
}

desc := r.Desc()
batcheval.DeclareKeysForBatch(desc, ba.Header, spans)
for _, union := range ba.Requests {
inner := union.GetInner()
if cmd, ok := batcheval.LookupCommand(inner.Method()); ok {
cmd.DeclareKeys(desc, ba.Header, inner, spans)
} else {
return nil, errors.Errorf("unrecognized command %s", inner.Method())
return nil, false, errors.Errorf("unrecognized command %s", inner.Method())
}
}

Expand All @@ -1094,9 +1199,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
// If any command gave us spans that are invalid, bail out early
// (before passing them to the spanlatch manager, which may panic).
if err := spans.Validate(); err != nil {
return nil, err
return nil, false, err
}
return spans, nil

// Evaluate batches optimistically if they have a key limit which is less
// than the number of live keys on the Range. Ignoring write latches can be
// massively beneficial because it can help avoid waiting on writes to keys
// that the batch will never actually need to read due to the overestimate
// of its key bounds. Only after it is clear exactly what spans were read do
// we verify whether there were any conflicts with concurrent writes.
//
// This case is not uncommon; for example, a Scan which requests the entire
// range but has a limit of 1 result. We want to avoid allowing overly broad
// spans from backing up the latch manager.
limit := ba.Header.MaxSpanRequestKeys
optimistic := limit > 0 && limit < liveCount

return spans, optimistic, nil
}

// beginCmds waits for any in-flight, conflicting commands to complete. This
Expand All @@ -1109,11 +1228,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro
// returns a cleanup function to be called when the commands are done and can be
// removed from the queue, and whose returned error is to be used in place of
// the supplied error.
//
// See collectSpans for a discussion on optimistic evaluation.
func (r *Replica) beginCmds(
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool,
) (*endCmds, error) {
ec := endCmds{
repl: r,
ba: *ba,
}

// Only acquire latches for consistent operations.
var lg *spanlatch.Guard
if ba.ReadConsistency == roachpb.CONSISTENT {
// Check for context cancellation before acquiring latches.
if err := ctx.Err(); err != nil {
Expand All @@ -1129,10 +1254,16 @@ func (r *Replica) beginCmds(
// Acquire latches for all the request's declared spans to ensure
// protected access and to avoid interacting requests from operating at
// the same time. The latches will be held for the duration of request.
var err error
lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp)
if err != nil {
return nil, err
if optimistic {
// Optimistic acquisition does not wait for existing latches to be
// released.
ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp)
} else {
var err error
ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp)
if err != nil {
return nil, err
}
}

if !beforeLatch.IsZero() {
Expand All @@ -1142,7 +1273,7 @@ func (r *Replica) beginCmds(

if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil {
if pErr := filter(*ba); pErr != nil {
r.latchMgr.Release(lg)
ec.done(nil, pErr)
return nil, pErr.GoError()
}
}
Expand Down Expand Up @@ -1191,8 +1322,9 @@ func (r *Replica) beginCmds(
// The store will catch that error and resubmit the request after
// mergeCompleteCh closes. See #27442 for the full context.
log.Event(ctx, "waiting on in-progress merge")
r.latchMgr.Release(lg)
return nil, &roachpb.MergeInProgressError{}
err := &roachpb.MergeInProgressError{}
ec.done(nil, roachpb.NewError(err))
return nil, err
}
} else {
log.Event(ctx, "operation accepts inconsistent results")
Expand All @@ -1208,12 +1340,7 @@ func (r *Replica) beginCmds(
}
}

ec := &endCmds{
repl: r,
lg: lg,
ba: *ba,
}
return ec, nil
return &ec, nil
}

// executeAdminBatch executes the command directly. There is no interaction
Expand Down
106 changes: 79 additions & 27 deletions pkg/storage/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,19 @@ func (r *Replica) executeReadOnlyBatch(
}
r.limitTxnMaxTimestamp(ctx, &ba, status)

spans, err := r.collectSpans(&ba)
spans, optimistic, err := r.collectSpans(&ba)
if err != nil {
return nil, roachpb.NewError(err)
}

// Acquire latches to prevent overlapping commands from executing
// until this command completes.
log.Event(ctx, "acquire latches")
endCmds, err := r.beginCmds(ctx, &ba, spans)
endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic)
if err != nil {
return nil, roachpb.NewError(err)
}

log.Event(ctx, "waiting for read lock")
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()

// Guarantee we release the latches that we just acquired. It is
// important that this is inside the readOnlyCmdMu lock so that the
// timestamp cache update is synchronized. This is wrapped to delay
Expand All @@ -67,31 +63,53 @@ func (r *Replica) executeReadOnlyBatch(
endCmds.done(br, pErr)
}()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
if _, err := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}
// Loop to support optimistic evaluation. We'll only evalute up to twice.
var result result.Result
for {
br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans)
if !endCmds.optimistic() {
// If this was not an optimistic evaluation, break.
break
}

rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
// If this was an optimistic evaluation, determine whether the keys
// that it touched conflict with in-flight requests that were ignored.
if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil {
pErr = roachpb.NewError(err)
} else if conflict {
// If they do, retry the evaluation. This time, we will not be
// evaluating optimistically because checkOptimisticConflicts
// waited for all latch acquisitions to succeed.
continue
}
break
}

if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}
// // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
// if _, err := r.IsDestroyed(); err != nil {
// return nil, roachpb.NewError(err)
// }

// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
var result result.Result
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
defer readOnly.Close()
br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)
// rSpan, err := keys.Range(ba)
// if err != nil {
// return nil, roachpb.NewError(err)
// }

// if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
// return nil, roachpb.NewError(err)
// }

// // Evaluate read-only batch command. It checks for matching key range; note
// // that holding readOnlyCmdMu throughout is important to avoid reads from the
// // "wrong" key range being served after the range has been split.
// var result result.Result
// rec := NewReplicaEvalContext(r, spans)
// readOnly := r.store.Engine().NewReadOnly()
// if util.RaceEnabled {
// readOnly = spanset.NewReadWriter(readOnly, spans)
// }
// defer readOnly.Close()
// br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)

// A merge is (likely) about to be carried out, and this replica
// needs to block all traffic until the merge either commits or
Expand Down Expand Up @@ -124,3 +142,37 @@ func (r *Replica) executeReadOnlyBatch(
}
return br, pErr
}

// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its
// results. It expects that latches have already been acquired for the spans
// that it will touch.
func (r *Replica) evaluateReadOnlyBatch(
ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet,
) (*roachpb.BatchResponse, result.Result, *roachpb.Error) {
log.Event(ctx, "waiting for read lock")
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()

// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
if _, err := r.IsDestroyed(); err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}
rSpan, err := keys.Range(ba)
if err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, result.Result{}, roachpb.NewError(err)
}

// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
defer readOnly.Close()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)
}
Loading

0 comments on commit cc26940

Please sign in to comment.