From d488b79bc39d5ec0c2e21784246761fb28bb05c9 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 21 Dec 2018 20:55:15 -0500 Subject: [PATCH 1/5] workload/kv: add --span-limit flag to LIMIT spanning queries Release note: None --- pkg/workload/kv/kv.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 25031070644e..48639f99144f 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -50,6 +50,7 @@ type kv struct { cycleLength int64 readPercent int spanPercent int + spanLimit int seed int64 writeSeq string sequential bool @@ -97,6 +98,8 @@ var kvMeta = workload.Meta{ `Percent (0-100) of operations that are reads of existing keys.`) g.flags.IntVar(&g.spanPercent, `span-percent`, 0, `Percent (0-100) of operations that are spanning queries of all ranges.`) + g.flags.IntVar(&g.spanLimit, `span-limit`, 0, + `LIMIT count for each spanning query, or 0 for no limit`) g.flags.Int64Var(&g.seed, `seed`, 1, `Key hash seed.`) g.flags.BoolVar(&g.zipfian, `zipfian`, false, `Pick keys in a zipfian distribution instead of randomly.`) @@ -239,7 +242,13 @@ func (w *kv) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, er writeStmtStr := buf.String() // Span statement - spanStmtStr := "SELECT count(v) FROM kv" + buf.Reset() + buf.WriteString(`SELECT count(v) FROM [SELECT v FROM kv`) + if w.spanLimit > 0 { + fmt.Fprintf(&buf, ` WHERE k >= $1 LIMIT %d`, w.spanLimit) + } + buf.WriteString(`]`) + spanStmtStr := buf.String() ql := workload.QueryLoad{SQLDatabase: sqlDatabase} seq := &sequence{config: w, val: int64(writeSeq)} @@ -308,10 +317,18 @@ func (o *kvOp) run(ctx context.Context) error { statementProbability -= o.config.readPercent if statementProbability < o.config.spanPercent { start := timeutil.Now() - _, err := o.spanStmt.Exec(ctx) + var args []interface{} + if o.config.spanLimit > 0 { + args = append(args, o.g.readKey()) + } + rows, err := o.spanStmt.Query(ctx, args...) + if err != nil { + return err + } + rows.Close() elapsed := timeutil.Since(start) o.hists.Get(`span`).Record(elapsed) - return err + return rows.Err() } const argCount = 2 args := make([]interface{}, argCount*o.config.batchSize) From 4e37e262866312123c732f4b558a6e081218c971 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 21 Dec 2018 21:45:46 -0500 Subject: [PATCH 2/5] storage/spanlatch: pass snapshot by reference to wait This does not escape. Release note: None --- pkg/storage/spanlatch/manager.go | 4 ++-- pkg/storage/spanlatch/manager_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index e72f36534cf4..b66f74aa8c93 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -204,7 +204,7 @@ func (m *Manager) Acquire( lg, snap := m.sequence(spans, ts) defer snap.close() - err := m.wait(ctx, lg, snap) + err := m.wait(ctx, lg, &snap) if err != nil { m.Release(lg) return nil, err @@ -331,7 +331,7 @@ func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { // wait waits for all interfering latches in the provided snapshot to complete // before returning. -func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { +func (m *Manager) wait(ctx context.Context, lg *Guard, snap *snapshot) error { timer := timeutil.NewTimer() timer.Reset(base.SlowRequestThreshold) defer timer.Stop() diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index dca176643d35..09797e5b4dba 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -104,7 +104,7 @@ func (m *Manager) MustAcquireChCtx( ch := make(chan *Guard) lg, snap := m.sequence(spans, ts) go func() { - err := m.wait(ctx, lg, snap) + err := m.wait(ctx, lg, &snap) if err != nil { m.Release(lg) lg = nil From 8125e8771536ca1a8f112f2b163f0e7667734b77 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Dec 2018 16:17:31 -0500 Subject: [PATCH 3/5] storage: improve TestReplicaLatchingTimestampNonInterference The test was asserting that interfering requests interfered, but it wasn't asserting that non-interfering requests didn't interfere. Release note: None --- pkg/storage/replica_test.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 97e421f405c2..071c510c885e 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2638,18 +2638,29 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { } if test.interferes { + // Neither request should complete until the write is unblocked. select { case <-time.After(10 * time.Millisecond): // Expected. case pErr := <-errCh: t.Fatalf("expected interference: got error %s", pErr) } - } - // Verify no errors on waiting read and write. - blockCh <- struct{}{} - for j := 0; j < 2; j++ { + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} if pErr := <-errCh; pErr != nil { - t.Errorf("error %d: unexpected error: %s", j, pErr) + t.Errorf("unexpected error: %s", pErr) } } }) From 63d2350442e82c65dbee0a5784abf2a3677cfca4 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Dec 2018 16:17:50 -0500 Subject: [PATCH 4/5] storage: don't update timestamp cache on scans with limit 0 Not related to #32149. Before this change, we would treat a scan with limit 0 as a point lookup when updating the timestamp cache. After the change, we no longer update the timestamp cache if a scan had a limit of 0 and never looked at any keys. Release note: None --- pkg/storage/replica_tscache.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 13df5e878395..d59f22c87806 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -149,6 +149,12 @@ func (r *Replica) updateTimestampCache( case *roachpb.ScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ScanResponse) if resp.ResumeSpan != nil { + if start.Equal(resp.ResumeSpan.Key) { + // If the forward scan was evaluated with a key limit of zero + // then it will return a resume span equal to its request + // span. In this case, don't update the timestamp cache. + continue + } // Note that for forward scan, the resume span will start at // the (last key read).Next(), which is actually the correct // end key for the span to update the timestamp cache. @@ -158,6 +164,12 @@ func (r *Replica) updateTimestampCache( case *roachpb.ReverseScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse) if resp.ResumeSpan != nil { + if end.Equal(resp.ResumeSpan.EndKey) { + // If the reverse scan was evaluated with a key limit of zero + // then it will return a resume span equal to its request + // span. In this case, don't update the timestamp cache. + continue + } // Note that for reverse scans, the resume span's end key is // an open interval. That means it was read as part of this op // and won't be read on resume. It is the correct start key for From cc26940f1aa82fd85c45a6293cdf832b0f24ef3d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 26 Dec 2018 16:45:15 -0500 Subject: [PATCH 5/5] storage: evaluate limited scans optimistically without latching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9521. Supersedes #31904. SQL has a tendency to create scans which cover a range's entire key span, though looking only to return a finite number of results. These requests end up blocking on writes that are holding latches over keys that the scan will not actually touch. In reality, when there is a scan with a limit, the actual affected key span ends up being a small subset of the range's key span. This change creates a new "optimistic evaluation" path for read-only requests. When evaluating optimistically, the batch will sequence itself with the latch manager, but will not wait to acquire all of its latches. Instead, it begins evaluating immediately and verifies that it would not have needed to wait on any latch acquisitions after-the-fact. When performing this verification, it uses knowledge about the limited set of keys that the scan actually looked at. If there are no conflicts, the scan succeeds. If there are conflicts, the request waits for all of its latch acquisition attempts to finish and re-evaluates. This PR replaces #31904. The major difference between the two is that this PR exploits the structure of the latch manager to efficiently perform optimistic latch acquisition and after-the-fact verification of conflicts. Doing this requires keeping no extra state because it uses the immutable snapshots that the latch manager now captures during sequencing. The other major difference is that this PR does not release latches after a failed optimistic evaluation. NOTE: a prevalent theory of the pathological case with this behavior was that overestimated read latches would serialize with write latches, causing all requests on a range to serialize. I wasn't seeing this in practice. It turns out that the "timestamp awareness" in the latch manager should avoid this behavior in most cases because later writes will have higher timestamps than earlier reads. The effect of this is that they won't be considered to interfere by the latch manager. Still, large clusters with a high amount of clock skew could see a bounded variant of this situation. _### Benchmark Results ``` name old ops/sec new ops/sec delta kv95/cores=16/nodes=3/splits=3 51.9k ± 0% 51.7k ± 1% ~ (p=0.400 n=3+3) kvS70-L1/cores=16/nodes=3/splits=3 24.1k ± 4% 27.7k ± 1% +14.75% (p=0.100 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 24.5k ± 1% 27.5k ± 1% +12.08% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 16.0k ± 1% 16.6k ± 2% +3.79% (p=0.100 n=3+3) name old p50(ms) new p50(ms) delta kv95/cores=16/nodes=3/splits=3 0.70 ± 0% 0.70 ± 0% ~ (all equal) kvS70-L1/cores=16/nodes=3/splits=3 1.07 ± 6% 0.90 ± 0% -15.62% (p=0.100 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 1.10 ± 0% 0.90 ± 0% -18.18% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 1.80 ± 0% 1.67 ± 4% -7.41% (p=0.100 n=3+3) name old p99(ms) new p99(ms) delta kv95/cores=16/nodes=3/splits=3 1.80 ± 0% 1.80 ± 0% ~ (all equal) kvS70-L1/cores=16/nodes=3/splits=3 5.77 ±32% 4.70 ± 0% ~ (p=0.400 n=3+3) kvS70-L5/cores=16/nodes=3/splits=3 5.00 ± 0% 4.70 ± 0% -6.00% (p=0.100 n=3+3) kvS70-L1000/cores=16/nodes=3/splits=3 6.90 ± 3% 7.33 ± 8% ~ (p=0.400 n=3+3) ``` _S = --span-percent=, L = --span-limit=_ Release note (performance improvement): improved performance on workloads which mix OLAP queries with inserts and updates. --- pkg/storage/replica.go | 169 ++++++++++++++++++++++---- pkg/storage/replica_read.go | 106 ++++++++++++---- pkg/storage/replica_test.go | 116 ++++++++++++++++++ pkg/storage/replica_write.go | 4 +- pkg/storage/spanlatch/manager.go | 94 ++++++++++++-- pkg/storage/spanlatch/manager_test.go | 20 ++- 6 files changed, 442 insertions(+), 67 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index e999b4bd6259..32a27ffcc704 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1033,9 +1033,94 @@ func (r *Replica) checkBatchRequest(ba roachpb.BatchRequest, isReadOnly bool) er type endCmds struct { repl *Replica lg *spanlatch.Guard + sn *spanlatch.Snapshot ba roachpb.BatchRequest } +// optimistic returns whether the batch should execute optimistically. +func (ec *endCmds) optimistic() bool { return ec.sn != nil } + +// checkOptimisticConflicts determines whether any earlier requests that held +// latches at the time that this request optimistically evaluated conflict with +// the span of keys that this request interacted with. On conflict, the method +// waits to successfully acquire all latches. After that, the request needs to +// be retried because the optimistic evaluation may or may not have observed the +// effect of the earlier requests. +// +// To call, ec.optimistic must be true. After the method is called, ec.optimistic +// will return false. +func (ec *endCmds) checkOptimisticConflicts( + ctx context.Context, br *roachpb.BatchResponse, pErr *roachpb.Error, spans *spanset.SpanSet, +) (bool, error) { + // We only use the latch manager snapshot once. + defer func() { + ec.sn.Close() + ec.sn = nil + }() + + // If the optimistic evaluation didn't run into an error, create a copy of + // the batch with the scan bounds of all requests constrained to the spans + // that they actually scanned over. If it did, check for conflicts over the + // entire original span set. + if pErr == nil { + baCopy := ec.ba + baCopy.Requests = append([]roachpb.RequestUnion(nil), baCopy.Requests...) + for i := 0; i < len(baCopy.Requests); i++ { + req := baCopy.Requests[i].GetInner() + header := req.Header() + + resp := br.Responses[i].GetInner() + if resp.Header().ResumeSpan == nil { + continue + } + + switch t := resp.(type) { + case *roachpb.ScanResponse: + if header.Key.Equal(t.ResumeSpan.Key) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.EndKey = t.ResumeSpan.Key + case *roachpb.ReverseScanResponse: + if header.EndKey.Equal(t.ResumeSpan.EndKey) { + // The request did not evaluate. Ignore it. + baCopy.Requests = append(baCopy.Requests[:i], baCopy.Requests[i+1:]...) + i-- + continue + } + header.Key = t.ResumeSpan.EndKey + default: + continue + } + req = req.ShallowCopy() + req.SetHeader(header) + baCopy.Requests[i].MustSetInner(req) + } + + // Collect the batch's declared spans again, this time with the + // constrained span bounds. + var err error + spans, _, err = ec.repl.collectSpans(&baCopy) + if err != nil { + return false, err + } + } + + // Determine whether any actively held latch at the time of evaluation + // overlaps this constraint span set. If so, we have a conflict. + conflict := ec.repl.latchMgr.Overlaps(spans, ec.ba.Timestamp, ec.sn) + if !conflict { + return false, nil + } + + // If there were conflicts, wait for latches to be released, like + // we should have in the first place. + log.Event(ctx, "optimistic evaluation failed, latching and re-evaluating") + return true, ec.repl.latchMgr.Wait(ctx, ec.lg, ec.sn) +} + // done releases the latches acquired by the command and updates // the timestamp cache using the final timestamp of each command. func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { @@ -1051,10 +1136,30 @@ func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error) { if ec.lg != nil { ec.repl.latchMgr.Release(ec.lg) } + + // Close the snapshot to release any resources that it holds. + if ec.sn != nil { + ec.sn.Close() + ec.sn = nil + } } -func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, error) { - spans := &spanset.SpanSet{} +// collectSpans collects all of the spans that the batch may touch into a +// SpanSet. +// +// It also determines whether the batch should evaluate optimistically based on +// the key limits on the batch header. When a batch evaluates optimistically, it +// doesn't wait to acquire all of its latches. Instead, if begins evaluating +// immediately and verifies that it would not have needed to wait on any latch +// acquisitions after-the-fact. This is useful for requests that need to declare +// a much larger set of keys than they expect to touch in practice (e.g. limited +// scans). +func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, bool, error) { + r.mu.RLock() + desc := r.descRLocked() + liveCount := r.mu.state.Stats.LiveCount + r.mu.RUnlock() + // TODO(bdarnell): need to make this less global when local // latches are used more heavily. For example, a split will // have a large read-only span but also a write (see #10084). @@ -1065,6 +1170,7 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // // TODO(bdarnell): revisit as the local portion gets its appropriate // use. + spans := &spanset.SpanSet{} if ba.IsReadOnly() { spans.Reserve(spanset.SpanReadOnly, spanset.SpanGlobal, len(ba.Requests)) } else { @@ -1076,14 +1182,13 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess) } - desc := r.Desc() batcheval.DeclareKeysForBatch(desc, ba.Header, spans) for _, union := range ba.Requests { inner := union.GetInner() if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(desc, ba.Header, inner, spans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, false, errors.Errorf("unrecognized command %s", inner.Method()) } } @@ -1094,9 +1199,23 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // If any command gave us spans that are invalid, bail out early // (before passing them to the spanlatch manager, which may panic). if err := spans.Validate(); err != nil { - return nil, err + return nil, false, err } - return spans, nil + + // Evaluate batches optimistically if they have a key limit which is less + // than the number of live keys on the Range. Ignoring write latches can be + // massively beneficial because it can help avoid waiting on writes to keys + // that the batch will never actually need to read due to the overestimate + // of its key bounds. Only after it is clear exactly what spans were read do + // we verify whether there were any conflicts with concurrent writes. + // + // This case is not uncommon; for example, a Scan which requests the entire + // range but has a limit of 1 result. We want to avoid allowing overly broad + // spans from backing up the latch manager. + limit := ba.Header.MaxSpanRequestKeys + optimistic := limit > 0 && limit < liveCount + + return spans, optimistic, nil } // beginCmds waits for any in-flight, conflicting commands to complete. This @@ -1109,11 +1228,17 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro // returns a cleanup function to be called when the commands are done and can be // removed from the queue, and whose returned error is to be used in place of // the supplied error. +// +// See collectSpans for a discussion on optimistic evaluation. func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, optimistic bool, ) (*endCmds, error) { + ec := endCmds{ + repl: r, + ba: *ba, + } + // Only acquire latches for consistent operations. - var lg *spanlatch.Guard if ba.ReadConsistency == roachpb.CONSISTENT { // Check for context cancellation before acquiring latches. if err := ctx.Err(); err != nil { @@ -1129,10 +1254,16 @@ func (r *Replica) beginCmds( // Acquire latches for all the request's declared spans to ensure // protected access and to avoid interacting requests from operating at // the same time. The latches will be held for the duration of request. - var err error - lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) - if err != nil { - return nil, err + if optimistic { + // Optimistic acquisition does not wait for existing latches to be + // released. + ec.lg, ec.sn = r.latchMgr.AcquireOptimistic(spans, ba.Timestamp) + } else { + var err error + ec.lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) + if err != nil { + return nil, err + } } if !beforeLatch.IsZero() { @@ -1142,7 +1273,7 @@ func (r *Replica) beginCmds( if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { if pErr := filter(*ba); pErr != nil { - r.latchMgr.Release(lg) + ec.done(nil, pErr) return nil, pErr.GoError() } } @@ -1191,8 +1322,9 @@ func (r *Replica) beginCmds( // The store will catch that error and resubmit the request after // mergeCompleteCh closes. See #27442 for the full context. log.Event(ctx, "waiting on in-progress merge") - r.latchMgr.Release(lg) - return nil, &roachpb.MergeInProgressError{} + err := &roachpb.MergeInProgressError{} + ec.done(nil, roachpb.NewError(err)) + return nil, err } } else { log.Event(ctx, "operation accepts inconsistent results") @@ -1208,12 +1340,7 @@ func (r *Replica) beginCmds( } } - ec := &endCmds{ - repl: r, - lg: lg, - ba: *ba, - } - return ec, nil + return &ec, nil } // executeAdminBatch executes the command directly. There is no interaction diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 321546466304..46bcefe7b18f 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -42,7 +42,7 @@ func (r *Replica) executeReadOnlyBatch( } r.limitTxnMaxTimestamp(ctx, &ba, status) - spans, err := r.collectSpans(&ba) + spans, optimistic, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -50,15 +50,11 @@ func (r *Replica) executeReadOnlyBatch( // Acquire latches to prevent overlapping commands from executing // until this command completes. log.Event(ctx, "acquire latches") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, &ba, spans, optimistic) if err != nil { return nil, roachpb.NewError(err) } - log.Event(ctx, "waiting for read lock") - r.readOnlyCmdMu.RLock() - defer r.readOnlyCmdMu.RUnlock() - // Guarantee we release the latches that we just acquired. It is // important that this is inside the readOnlyCmdMu lock so that the // timestamp cache update is synchronized. This is wrapped to delay @@ -67,31 +63,53 @@ func (r *Replica) executeReadOnlyBatch( endCmds.done(br, pErr) }() - // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? - if _, err := r.IsDestroyed(); err != nil { - return nil, roachpb.NewError(err) - } + // Loop to support optimistic evaluation. We'll only evalute up to twice. + var result result.Result + for { + br, result, pErr = r.evaluateReadOnlyBatch(ctx, ba, spans) + if !endCmds.optimistic() { + // If this was not an optimistic evaluation, break. + break + } - rSpan, err := keys.Range(ba) - if err != nil { - return nil, roachpb.NewError(err) + // If this was an optimistic evaluation, determine whether the keys + // that it touched conflict with in-flight requests that were ignored. + if conflict, err := endCmds.checkOptimisticConflicts(ctx, br, pErr, spans); err != nil { + pErr = roachpb.NewError(err) + } else if conflict { + // If they do, retry the evaluation. This time, we will not be + // evaluating optimistically because checkOptimisticConflicts + // waited for all latch acquisitions to succeed. + continue + } + break } - if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { - return nil, roachpb.NewError(err) - } + // // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + // if _, err := r.IsDestroyed(); err != nil { + // return nil, roachpb.NewError(err) + // } - // Evaluate read-only batch command. It checks for matching key range; note - // that holding readOnlyCmdMu throughout is important to avoid reads from the - // "wrong" key range being served after the range has been split. - var result result.Result - rec := NewReplicaEvalContext(r, spans) - readOnly := r.store.Engine().NewReadOnly() - if util.RaceEnabled { - readOnly = spanset.NewReadWriter(readOnly, spans) - } - defer readOnly.Close() - br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) + // rSpan, err := keys.Range(ba) + // if err != nil { + // return nil, roachpb.NewError(err) + // } + + // if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + // return nil, roachpb.NewError(err) + // } + + // // Evaluate read-only batch command. It checks for matching key range; note + // // that holding readOnlyCmdMu throughout is important to avoid reads from the + // // "wrong" key range being served after the range has been split. + // var result result.Result + // rec := NewReplicaEvalContext(r, spans) + // readOnly := r.store.Engine().NewReadOnly() + // if util.RaceEnabled { + // readOnly = spanset.NewReadWriter(readOnly, spans) + // } + // defer readOnly.Close() + // br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) // A merge is (likely) about to be carried out, and this replica // needs to block all traffic until the merge either commits or @@ -124,3 +142,37 @@ func (r *Replica) executeReadOnlyBatch( } return br, pErr } + +// evaluateReadOnlyBatch evaluates the provided read-only batch and returns its +// results. It expects that latches have already been acquired for the spans +// that it will touch. +func (r *Replica) evaluateReadOnlyBatch( + ctx context.Context, ba roachpb.BatchRequest, spans *spanset.SpanSet, +) (*roachpb.BatchResponse, result.Result, *roachpb.Error) { + log.Event(ctx, "waiting for read lock") + r.readOnlyCmdMu.RLock() + defer r.readOnlyCmdMu.RUnlock() + + // TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed? + if _, err := r.IsDestroyed(); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + rSpan, err := keys.Range(ba) + if err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil { + return nil, result.Result{}, roachpb.NewError(err) + } + + // Evaluate read-only batch command. It checks for matching key range; note + // that holding readOnlyCmdMu throughout is important to avoid reads from the + // "wrong" key range being served after the range has been split. + rec := NewReplicaEvalContext(r, spans) + readOnly := r.store.Engine().NewReadOnly() + defer readOnly.Close() + if util.RaceEnabled { + readOnly = spanset.NewReadWriter(readOnly, spans) + } + return evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) +} diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 071c510c885e..7ed9b81f1894 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -2701,6 +2701,122 @@ func TestReplicaLatchingSplitDeclaresWrites(t *testing.T) { } } +// TestReplicaLatchingOptimisticEvaluation verifies that limited scans evaluate +// optimistically without waiting for latches to be acquired. In some cases, +// this allows them to avoid waiting on writes that their over-estimated +// declared spans overlapped with. +func TestReplicaLatchingOptimisticEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + + sArgs1 := scanArgs([]byte("a"), []byte("c")) + sArgs2 := scanArgs([]byte("c"), []byte("e")) + baScan := roachpb.BatchRequest{} + baScan.Add(&sArgs1, &sArgs2) + + var blockKey, blockWriter atomic.Value + blockKey.Store(roachpb.Key("a")) + blockWriter.Store(false) + blockCh := make(chan struct{}, 1) + blockedCh := make(chan struct{}, 1) + + tc := testContext{} + tsc := TestStoreConfig(nil) + tsc.TestingKnobs.EvalKnobs.TestingEvalFilter = + func(filterArgs storagebase.FilterArgs) *roachpb.Error { + // Make sure the direct GC path doesn't interfere with this test. + if !filterArgs.Req.Header().Key.Equal(blockKey.Load().(roachpb.Key)) { + return nil + } + if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { + blockedCh <- struct{}{} + <-blockCh + } + return nil + } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.StartWithStoreConfig(t, stopper, tsc) + + // Write initial keys. + for _, k := range []string{"a", "b", "c", "d"} { + pArgs := putArgs([]byte(k), []byte("value")) + if _, pErr := tc.SendWrapped(&pArgs); pErr != nil { + t.Fatal(pErr) + } + } + + testCases := []struct { + writeKey string + limit int64 + interferes bool + }{ + // No limit. + {"a", 0, true}, + {"b", 0, true}, + {"c", 0, true}, + {"d", 0, true}, + {"e", 0, false}, // Only scanning from [a,e) + // Limited. + {"a", 1, true}, + {"b", 1, false}, + {"b", 2, true}, + {"c", 2, false}, + {"c", 3, true}, + {"d", 3, false}, + {"d", 4, true}, + {"e", 4, false}, // Only scanning from [a,e) + {"e", 5, false}, + } + for _, test := range testCases { + t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { + errCh := make(chan *roachpb.Error, 2) + pArgs := putArgs([]byte(test.writeKey), []byte("value")) + blockKey.Store(roachpb.Key(test.writeKey)) + blockWriter.Store(true) + go func() { + _, pErr := tc.SendWrapped(&pArgs) + errCh <- pErr + }() + <-blockedCh + blockWriter.Store(false) + + baScanCopy := baScan + baScanCopy.MaxSpanRequestKeys = test.limit + go func() { + _, pErr := tc.Sender().Send(context.Background(), baScanCopy) + errCh <- pErr + }() + + if test.interferes { + // Neither request should complete until the write is unblocked. + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-errCh: + t.Fatalf("expected interference: got error %s", pErr) + } + // Verify no errors on waiting read and write. + blockCh <- struct{}{} + for j := 0; j < 2; j++ { + if pErr := <-errCh; pErr != nil { + t.Errorf("error %d: unexpected error: %s", j, pErr) + } + } + } else { + // The read should complete first. + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + // The write should complete next, after it is unblocked. + blockCh <- struct{}{} + if pErr := <-errCh; pErr != nil { + t.Errorf("unexpected error: %s", pErr) + } + } + }) + } +} + // TestReplicaUseTSCache verifies that write timestamps are upgraded // based on the read timestamp cache. func TestReplicaUseTSCache(t *testing.T) { diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index ac6a1a0482a3..a1f2dea80f00 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -74,7 +74,7 @@ func (r *Replica) executeWriteBatch( return nil, roachpb.NewError(err) } - spans, err := r.collectSpans(&ba) + spans, _, err := r.collectSpans(&ba) if err != nil { return nil, roachpb.NewError(err) } @@ -87,7 +87,7 @@ func (r *Replica) executeWriteBatch( // after preceding commands have been run to successful completion. log.Event(ctx, "acquire latches") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, &ba, spans, false /* optimistic */) if err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index b66f74aa8c93..0771ee2f450e 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -197,14 +197,14 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // be released, it stops waiting and releases all latches that it has already // acquired. // -// It returns a Guard which must be provided to Release. +// The method returns a Guard which must be provided to Release. func (m *Manager) Acquire( ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, ) (*Guard, error) { lg, snap := m.sequence(spans, ts) - defer snap.close() + defer snap.Close() - err := m.wait(ctx, lg, &snap) + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) return nil, err @@ -212,11 +212,28 @@ func (m *Manager) Acquire( return lg, nil } +// AcquireOptimistic is like Acquire, except it does not wait for latches over +// overlapping spans to be released before returning. Instead, it optimistically +// assumes that there are no currently held latches that need to be waited on. +// This can be verified after the fact by passing the returned Snapshot to +// Overlaps. +// +// Regardless of whether existing latches are ignored or not by this method, +// future calls to Acquire will observe the latches acquired here and will wait +// for them to be Released, as usual. +// +// The method returns a Guard which must be provided to Release. It also returns +// a Snapshot which must be Closed when no longer in use. +func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, *Snapshot) { + lg, snap := m.sequence(spans, ts) + return lg, &snap +} + // sequence locks the manager, captures an immutable snapshot, inserts latches // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { +func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, Snapshot) { lg := newGuard(spans, ts) m.mu.Lock() @@ -226,13 +243,14 @@ func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, sn return lg, snap } -// snapshot is an immutable view into the latch manager's state. -type snapshot struct { +// Snapshot is an immutable view into the latch manager's state. +type Snapshot struct { trees [spanset.NumSpanScope][spanset.NumSpanAccess]btree } -// close closes the snapshot and releases any associated resources. -func (sn *snapshot) close() { +// Close closes the Snapshot and releases any associated resources. The method +// can be called multiple times. +func (sn *Snapshot) Close() { for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { sn.trees[s][a].Reset() @@ -242,8 +260,8 @@ func (sn *snapshot) close() { // snapshotLocked captures an immutable snapshot of the latch manager. It takes // a spanset to limit the amount of state captured. -func (m *Manager) snapshotLocked(spans *spanset.SpanSet) snapshot { - var snap snapshot +func (m *Manager) snapshotLocked(spans *spanset.SpanSet) Snapshot { + var snap Snapshot for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { sm := &m.scopes[s] reading := len(spans.GetSpans(spanset.SpanReadOnly, s)) > 0 @@ -329,9 +347,9 @@ func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { } } -// wait waits for all interfering latches in the provided snapshot to complete +// Wait waits for all interfering latches in the provided snapshot to complete // before returning. -func (m *Manager) wait(ctx context.Context, lg *Guard, snap *snapshot) error { +func (m *Manager) Wait(ctx context.Context, lg *Guard, snap *Snapshot) error { timer := timeutil.NewTimer() timer.Reset(base.SlowRequestThreshold) defer timer.Stop() @@ -422,6 +440,58 @@ func (m *Manager) waitForSignal(ctx context.Context, t *timeutil.Timer, wait, he } } +// Overlaps determines if any of the spans in the provided spanset, at the +// specified timestamp, overlap with any of the latches in the snapshot. +func (m *Manager) Overlaps(spans *spanset.SpanSet, ts hlc.Timestamp, sn *Snapshot) bool { + // NB: the function has a similar structure to Manager.wait, but any attempt + // at code deduplication causes allocations and obscures the code. Avoiding + // these allocations would require further obfuscation. + var search latch + for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { + tr := &sn.trees[s] + search.ts = ifGlobal(ts, s) + for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { + ss := spans.GetSpans(a, s) + for _, sp := range ss { + search.span = sp + switch a { + case spanset.SpanReadOnly: + // Search for writes at equal or lower timestamps. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreLater) { + return true + } + case spanset.SpanReadWrite: + // Search for all other writes. + it := tr[spanset.SpanReadWrite].MakeIter() + if overlaps(&it, &search, ignoreNothing) { + return true + } + // Search for reads at equal or higher timestamps. + it = tr[spanset.SpanReadOnly].MakeIter() + if overlaps(&it, &search, ignoreEarlier) { + return true + } + default: + panic("unknown access") + } + } + } + } + return false +} + +func overlaps(it *iterator, search *latch, ignore ignoreFn) bool { + for it.FirstOverlap(search); it.Valid(); it.NextOverlap() { + held := it.Cur() + if ignore(search.ts, held.ts) { + continue + } + return true + } + return false +} + // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index 09797e5b4dba..7d2be60721b5 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -104,7 +104,8 @@ func (m *Manager) MustAcquireChCtx( ch := make(chan *Guard) lg, snap := m.sequence(spans, ts) go func() { - err := m.wait(ctx, lg, &snap) + defer snap.Close() + err := m.Wait(ctx, lg, &snap) if err != nil { m.Release(lg) lg = nil @@ -424,14 +425,23 @@ func TestLatchManagerDependentLatches(t *testing.T) { var m Manager lg1 := m.MustAcquire(c.sp1, c.ts1) - lg2C := m.MustAcquireCh(c.sp2, c.ts2) + lg2, snap2 := m.sequence(c.sp2, c.ts2) + lg2C := make(chan *Guard) + defer snap2.Close() + go func() { + err := m.Wait(context.Background(), lg2, &snap2) + require.Nil(t, err) + lg2C <- lg2 + }() + + require.Equal(t, c.dependent, m.Overlaps(c.sp2, c.ts2, &snap2)) if c.dependent { testLatchBlocks(t, lg2C) m.Release(lg1) - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg2) } else { - lg2 := testLatchSucceeds(t, lg2C) + testLatchSucceeds(t, lg2C) m.Release(lg1) m.Release(lg2) } @@ -512,7 +522,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { lg, snap := m.sequence(&spans[i], zeroTS) - snap.close() + snap.Close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) }