diff --git a/pkg/storage/command_queue.go b/pkg/storage/command_queue.go index 338f8e149189..012645af6244 100644 --- a/pkg/storage/command_queue.go +++ b/pkg/storage/command_queue.go @@ -81,11 +81,12 @@ type CommandQueue struct { } type cmd struct { - id int64 - key interval.Range - readOnly bool - timestamp hlc.Timestamp - debugInfo summaryWriter + id int64 + key interval.Range + readOnly bool + preEvaluate bool // collects pre-requisites and dependents, instead of waiting + timestamp hlc.Timestamp + debugInfo summaryWriter buffered bool // is this cmd buffered in readsBuffer expanded bool // have the children been added @@ -101,7 +102,10 @@ type cmd struct { // in the prereq slice to avoid duplicates. prereqIDs map[int64]struct{} - pending chan struct{} // closed when complete + // If pre-evaluating, collect commands which are prereqs or dependents. + prereqsAndDeps []*cmd + + pending chan struct{} // closed when complete, or if pre-evaulating } // A summaryWriter is capable of writing a summary about itself. It is typically @@ -277,6 +281,16 @@ func (c *cmd) ResolvePendingPrereq() { } } + // If the prereq is pre-evaluating, let it know that this command is + // a dependent. + if pre.preEvaluate { + pre.prereqsAndDeps = append(pre.prereqsAndDeps, c) + } + // If the command is pre-evaluating, keep track of prereqs. + if c.preEvaluate { + c.prereqsAndDeps = append(c.prereqsAndDeps, pre) + } + // Truncate the command's prerequisite list so that it no longer includes // the first prerequisite. Before doing so, nil out prefix of slice to allow // GC of the first command. Without this, large chunks of the dependency @@ -289,7 +303,7 @@ func (c *cmd) ResolvePendingPrereq() { delete(c.prereqIDs, pre.id) } -// OptimisticallyResolvePrereqs removes all prerequisite in the cmd's prereq +// OptimisticallyResolvePrereqs removes all prerequisites in the cmd's prereq // slice that have already finished without blocking on pending commands. // Prerequisite commands that are still pending or that were canceled are left // in the prereq slice. @@ -313,6 +327,20 @@ func (c *cmd) OptimisticallyResolvePrereqs() { (*c.prereqs) = (*c.prereqs)[:j] } +// Overlaps returns whether the supplied range overlaps with any span +// in the command. +func (c *cmd) Overlaps(o interval.Range) bool { + if len(c.children) == 0 { + return interval.ExclusiveOverlapper.Overlap(o, c.key) + } + for _, child := range c.children { + if interval.ExclusiveOverlapper.Overlap(o, child.key) { + return true + } + } + return false +} + // NewCommandQueue returns a new command queue. The boolean specifies whether // to enable the covering span optimization. With this optimization, whenever // a command consisting of multiple spans is added, a covering span is computed @@ -724,7 +752,7 @@ func (o *overlapHeap) PopOverlap() *cmd { // // Returns a nil `cmd` when no spans are given. func (cq *CommandQueue) add( - readOnly bool, timestamp hlc.Timestamp, prereqs []*cmd, spans []roachpb.Span, + readOnly, preEvaluate bool, timestamp hlc.Timestamp, prereqs []*cmd, spans []roachpb.Span, ) *cmd { if len(spans) == 0 { return nil @@ -766,6 +794,7 @@ func (cq *CommandQueue) add( cmd.id = cq.nextID() cmd.key = coveringSpan.AsRange() cmd.readOnly = readOnly + cmd.preEvaluate = preEvaluate cmd.timestamp = timestamp cmd.prereqsBuf = prereqs cmd.prereqs = &cmd.prereqsBuf diff --git a/pkg/storage/command_queue_test.go b/pkg/storage/command_queue_test.go index 77e8fb1cf4a5..4a413c79ad6f 100644 --- a/pkg/storage/command_queue_test.go +++ b/pkg/storage/command_queue_test.go @@ -34,7 +34,7 @@ func getPrereqs(cq *CommandQueue, from, to roachpb.Key, readOnly bool) []*cmd { } func add(cq *CommandQueue, from, to roachpb.Key, readOnly bool, prereqs []*cmd) *cmd { - return cq.add(readOnly, zeroTS, prereqs, []roachpb.Span{{Key: from, EndKey: to}}) + return cq.add(readOnly, false /* preEvaluate */, zeroTS, prereqs, []roachpb.Span{{Key: from, EndKey: to}}) } func getPrereqsAndAdd(cq *CommandQueue, from, to roachpb.Key, readOnly bool) ([]*cmd, *cmd) { @@ -292,7 +292,7 @@ func TestCommandQueueWithoutCoveringOptimization(t *testing.T) { c := roachpb.Span{Key: roachpb.Key("c")} { - cmd := cq.add(false, zeroTS, nil, []roachpb.Span{a, b}) + cmd := cq.add(false, false, zeroTS, nil, []roachpb.Span{a, b}) if !cmd.expanded { t.Errorf("expected non-expanded command, not %+v", cmd) } @@ -306,7 +306,7 @@ func TestCommandQueueWithoutCoveringOptimization(t *testing.T) { } { - cmd := cq.add(false, zeroTS, nil, []roachpb.Span{c}) + cmd := cq.add(false, false, zeroTS, nil, []roachpb.Span{c}) if cmd.expanded { t.Errorf("expected unexpanded command, not %+v", cmd) } @@ -360,16 +360,16 @@ func TestCommandQueueIssue6495(t *testing.T) { } cq.getPrereqs(false, zeroTS, spans1998) - cmd1998 := cq.add(false, zeroTS, nil, spans1998) + cmd1998 := cq.add(false, false, zeroTS, nil, spans1998) cq.getPrereqs(true, zeroTS, spans1999) - cmd1999 := cq.add(true, zeroTS, nil, spans1999) + cmd1999 := cq.add(true, false, zeroTS, nil, spans1999) cq.getPrereqs(true, zeroTS, spans2002) - cq.add(true, zeroTS, nil, spans2002) + cq.add(true, false, zeroTS, nil, spans2002) cq.getPrereqs(false, zeroTS, spans2003) - cq.add(false, zeroTS, nil, spans2003) + cq.add(false, false, zeroTS, nil, spans2003) cq.remove(cmd1998) cq.remove(cmd1999) @@ -404,19 +404,19 @@ func TestCommandQueueTimestamps(t *testing.T) { mkSpan("e", "g"), } - cmd1 := cq.add(true, makeTS(1, 0), nil, spans1) + cmd1 := cq.add(true, false, makeTS(1, 0), nil, spans1) pre2 := cq.getPrereqs(true, makeTS(2, 0), spans2) if pre2 != nil { t.Errorf("expected nil prereq slice; got %+v", pre2) } - cmd2 := cq.add(false, makeTS(2, 0), pre2, spans2) + cmd2 := cq.add(false, false, makeTS(2, 0), pre2, spans2) pre3 := cq.getPrereqs(true, makeTS(3, 0), spans3) if pre3 != nil { t.Errorf("expected nil prereq slice; got %+v", pre3) } - cmd3 := cq.add(false, makeTS(3, 0), pre3, spans3) + cmd3 := cq.add(false, false, makeTS(3, 0), pre3, spans3) // spans4 should wait on spans3.children[1]. pre4 := cq.getPrereqs(true, makeTS(4, 0), spans4) @@ -424,7 +424,7 @@ func TestCommandQueueTimestamps(t *testing.T) { if !reflect.DeepEqual(expPre, pre4) { t.Errorf("expected prereq commands %+v; got %+v", expPre, pre4) } - cmd4 := cq.add(true, makeTS(4, 0), pre4, spans4) + cmd4 := cq.add(true, false, makeTS(4, 0), pre4, spans4) // Verify that an earlier writer for whole span waits on all commands. pre5 := cq.getPrereqs(false, makeTS(0, 1), []roachpb.Span{mkSpan("a", "g")}) @@ -500,14 +500,14 @@ func TestCommandQueueEnclosedRead(t *testing.T) { } // Add command 1. - cmd1 := cq.add(true, makeTS(2, 0), nil, spans1) + cmd1 := cq.add(true, false, makeTS(2, 0), nil, spans1) // Add command 2. pre := cq.getPrereqs(false, makeTS(3, 0), spans2) if expPre := []*cmd(nil); !reflect.DeepEqual(expPre, pre) { t.Errorf("expected prereq commands %+v; got %+v", expPre, pre) } - cmd2 := cq.add(false, makeTS(3, 0), pre, spans2) + cmd2 := cq.add(false, false, makeTS(3, 0), pre, spans2) // Add command 3. pre = cq.getPrereqs(false, makeTS(1, 0), spansCandidate) @@ -540,14 +540,14 @@ func TestCommandQueueEnclosedWrite(t *testing.T) { } // Add command 1. - cmd1 := cq.add(false, makeTS(3, 0), nil, spans1) + cmd1 := cq.add(false, false, makeTS(3, 0), nil, spans1) // Add command 2. pre := cq.getPrereqs(true, makeTS(2, 0), spans2) if expPre := []*cmd(nil); !reflect.DeepEqual(expPre, pre) { t.Errorf("expected prereq commands %+v; got %+v", expPre, pre) } - cmd2 := cq.add(true, makeTS(2, 0), nil, spans2) + cmd2 := cq.add(true, false, makeTS(2, 0), nil, spans2) // Add command 3. pre = cq.getPrereqs(false, makeTS(1, 0), spansCandidate) @@ -576,10 +576,10 @@ func TestCommandQueueTimestampsEmpty(t *testing.T) { mkSpan("h", ""), } - cmd1 := cq.add(true, zeroTS, nil, spansR) - cmd2 := cq.add(false, zeroTS, nil, spansW) - cmd3 := cq.add(true, makeTS(1, 0), nil, spansRTS) - cmd4 := cq.add(false, makeTS(1, 0), nil, spansWTS) + cmd1 := cq.add(true, false, zeroTS, nil, spansR) + cmd2 := cq.add(false, false, zeroTS, nil, spansW) + cmd3 := cq.add(true, false, makeTS(1, 0), nil, spansRTS) + cmd4 := cq.add(false, false, makeTS(1, 0), nil, spansWTS) // A writer will depend on both zero-timestamp spans. pre := cq.getPrereqs(false, makeTS(1, 0), []roachpb.Span{mkSpan("a", "f")}) @@ -677,7 +677,7 @@ func TestCommandQueueTransitiveDependencies(t *testing.T) { { cq := NewCommandQueue(true) - cq.add(ops1.readOnly, ops1.ts, nil, ops1.spans) + cq.add(ops1.readOnly, false, ops1.ts, nil, ops1.spans) pre3 := cq.getPrereqs(ops3.readOnly, ops3.ts, ops3.spans) if expectDependency := len(pre3) > 0; !expectDependency { @@ -697,12 +697,12 @@ func TestCommandQueueTransitiveDependencies(t *testing.T) { cq := NewCommandQueue(true) // Add command 1. - cmd1 := cq.add(ops1.readOnly, ops1.ts, nil, ops1.spans) + cmd1 := cq.add(ops1.readOnly, false, ops1.ts, nil, ops1.spans) // Add command 2, taking note of whether it depends on command 1. pre2 := cq.getPrereqs(ops2.readOnly, ops2.ts, ops2.spans) dependency2to1 := len(pre2) > 0 - cmd2 := cq.add(ops2.readOnly, ops2.ts, pre2, ops2.spans) + cmd2 := cq.add(ops2.readOnly, false, ops2.ts, pre2, ops2.spans) // Add command 3, taking note of whether it depends on command 1 // or on command 2. @@ -755,7 +755,7 @@ func TestCommandQueueGetSnapshotWithChildren(t *testing.T) { cmd2 := add(cq, roachpb.Key("a"), nil, true, []*cmd{cmd1}) // the following creates a node with two children because it has two spans // only the children show up in the snapshot. - cq.add(true, zeroTS, []*cmd{cmd2}, []roachpb.Span{ + cq.add(true, false, zeroTS, []*cmd{cmd2}, []roachpb.Span{ {Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, {Key: roachpb.Key("d"), EndKey: roachpb.Key("f")}, }) @@ -818,7 +818,7 @@ func BenchmarkCommandQueueGetPrereqsAllReadOnly(b *testing.B) { EndKey: roachpb.Key("aaaaaaaaab"), }} for i := 0; i < size; i++ { - cq.add(true, zeroTS, nil, spans) + cq.add(true, false, zeroTS, nil, spans) } b.ResetTimer() @@ -853,7 +853,7 @@ func BenchmarkCommandQueueReadWriteMix(b *testing.B) { var cmd *cmd readOnly := j%(readsPerWrite+1) != 0 prereqs := cq.getPrereqs(readOnly, zeroTS, spans) - cmd = cq.add(readOnly, zeroTS, prereqs, spans) + cmd = cq.add(readOnly, false, zeroTS, prereqs, spans) if len(liveCmdQueue) == cap(liveCmdQueue) { cq.remove(<-liveCmdQueue) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 5d2e6150fbc7..3bf1d1db859c 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2129,6 +2129,48 @@ type endCmds struct { ba roachpb.BatchRequest } +// checkConflicts determines whether any pre requisites or +// dependencies of the command overlapped the actual affected spans, +// as recorded in the BatchResponse. On conflict, returns an error +// indicating that the command should be retried without +// pre-evaluation. +func (ec *endCmds) checkConflicts(br *roachpb.BatchResponse) *roachpb.Error { + for _, accessCmds := range ec.cmds { + for _, cmd := range accessCmds { + if cmd == nil { + continue + } + // If there were no overlapping commands at all, continue. + if len(cmd.prereqsAndDeps) == 0 { + continue + } + // Otherwise, iterate through requests, get truncated spans, and + // check for conflicts with concurrent commands. + for i, union := range ec.ba.Requests { + header := union.GetInner().Header() + span := roachpb.Span{Key: header.Key, EndKey: header.EndKey} + switch t := br.Responses[i].GetInner().(type) { + case *roachpb.ScanResponse: + if t.ResumeSpan != nil { + span.EndKey = t.ResumeSpan.Key + } + case *roachpb.ReverseScanResponse: + if t.ResumeSpan != nil { + span.Key = t.ResumeSpan.EndKey + } + } + rng := span.AsRange() + for _, c := range cmd.prereqsAndDeps { + if c.Overlaps(rng) { + return retryWithoutPreEvaluationError + } + } + } + } + } + return nil +} + // done removes pending commands from the command queue and updates // the timestamp cache using the final timestamp of each command. func (ec *endCmds) done(br *roachpb.BatchResponse, pErr *roachpb.Error, retry proposalRetryReason) { @@ -2249,7 +2291,7 @@ func (r *Replica) updateTimestampCache( func collectSpans( desc roachpb.RangeDescriptor, ba *roachpb.BatchRequest, -) (*spanset.SpanSet, error) { +) (*spanset.SpanSet, bool, error) { spans := &spanset.SpanSet{} // TODO(bdarnell): need to make this less global when the local // command queue is used more heavily. For example, a split will @@ -2272,16 +2314,17 @@ func collectSpans( if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(desc, ba.Header, inner, spans) } else { - return nil, errors.Errorf("unrecognized command %s", inner.Method()) + return nil, false, errors.Errorf("unrecognized command %s", inner.Method()) } } // If any command gave us spans that are invalid, bail out early // (before passing them to the command queue, which may panic). if err := spans.Validate(); err != nil { - return nil, err + return nil, false, err } - return spans, nil + hasLimit := ba.Header.MaxSpanRequestKeys > 0 + return spans, hasLimit, nil } // beginCmds waits for any in-flight, conflicting commands to complete. This @@ -2295,7 +2338,7 @@ func collectSpans( // removed from the queue, and whose returned error is to be used in place of // the supplied error. func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, preEvaluate bool, ) (*endCmds, error) { var newCmds batchCmdSet clocklessReads := r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset @@ -2346,7 +2389,7 @@ func (r *Replica) beginCmds( for i := spanset.SpanAccess(0); i < spanset.NumSpanAccess; i++ { readOnly := i == spanset.SpanReadOnly && !clocklessReads // ditto above for j := spanset.SpanScope(0); j < spanset.NumSpanScope; j++ { - cmd := r.cmdQMu.queues[j].add(readOnly, scopeTS(j), prereqs[i][j], spans.GetSpans(i, j)) + cmd := r.cmdQMu.queues[j].add(readOnly, preEvaluate, scopeTS(j), prereqs[i][j], spans.GetSpans(i, j)) if cmd != nil { cmd.SetDebugInfo(ba) newCmds[i][j] = cmd @@ -2382,6 +2425,12 @@ func (r *Replica) beginCmds( if pre == nil { break } + // If either the command or its prereq are pre-evaluating, + // resolve the prereq immediately instead of waiting. + if newCmd.preEvaluate || pre.preEvaluate { + newCmd.ResolvePendingPrereq() + continue + } select { case <-pre.pending: // The prerequisite command has finished so remove it from our prereq list. @@ -2535,7 +2584,7 @@ func prereqDebugSummary(prereqs prereqCmdSet) string { return b.String() } -// removeCmdsFromCommandQueue removes a batch's set of commands for the +// removeCmdsFromCommandQueue removes a batch's set of commands from the // replica's command queue. func (r *Replica) removeCmdsFromCommandQueue(cmds batchCmdSet) { r.cmdQMu.Lock() @@ -2996,11 +3045,36 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { return err } -// executeReadOnlyBatch updates the read timestamp cache and waits for any -// overlapping writes currently processing through Raft ahead of us to -// clear via the command queue. +var retryWithoutPreEvaluationError = roachpb.NewErrorf("retry read-only batch without pre-evaluation") + func (r *Replica) executeReadOnlyBatch( ctx context.Context, ba roachpb.BatchRequest, +) (br *roachpb.BatchResponse, pErr *roachpb.Error) { + br, pErr = r.tryExecuteReadOnlyBatch(ctx, ba, true /* preEvaluateOnLimits */) + if pErr == nil || pErr != retryWithoutPreEvaluationError { + return br, pErr + } + return r.tryExecuteReadOnlyBatch(ctx, ba, false /* preEvaluateOnLimits */) +} + +// tryExecuteReadOnlyBatch is called by executeReadOnlyBatch to +// execute read-only batches and update the read timestamp cache on +// success. It waits for any overlapping writes currently processing +// through Raft ahead of us to clear via the command queue. +// +// If the preEvaluateOnLimits parameter is true, then if the batch +// contains commands which specify limits, the batch is optimistically +// pre-evaluated without waiting in the command queue, and only after +// it's clear exactly what spans were read, do we verify (after the +// fact) whether there were any conflicts with concurrent writes. This +// is case is not uncommon; for example, a Scan which requests the +// entire range but has a limit of 1 result. We want to avoid allowing +// overly broad spans from backing up the command queue. If there were +// conflicts, then a retryWithoutPreEvaluationError is returned which +// causes executeReadOnlyBatch to retry with preEvaluateOnLimits set +// to false. +func (r *Replica) tryExecuteReadOnlyBatch( + ctx context.Context, ba roachpb.BatchRequest, preEvaluateOnLimits bool, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease or // permission to serve via follower reads. @@ -3041,15 +3115,16 @@ func (r *Replica) executeReadOnlyBatch( } r.limitTxnMaxTimestamp(ctx, &ba, status) - spans, err := collectSpans(*r.Desc(), &ba) + spans, hasLimit, err := collectSpans(*r.Desc(), &ba) if err != nil { return nil, roachpb.NewError(err) } + preEvaluate := hasLimit && preEvaluateOnLimits // Add the read to the command queue to gate subsequent // overlapping commands until this command completes. log.Event(ctx, "command queue") - endCmds, err := r.beginCmds(ctx, &ba, spans) + endCmds, err := r.beginCmds(ctx, &ba, spans, preEvaluate) if err != nil { return nil, roachpb.NewError(err) } @@ -3063,6 +3138,11 @@ func (r *Replica) executeReadOnlyBatch( // timestamp cache update is synchronized. This is wrapped to delay // pErr evaluation to its value when returning. defer func() { + // If we pre-evaluated and skipped waiting in the command queue, + // check for concurrent write conflicts. + if pErr == nil && preEvaluate { + pErr = endCmds.checkConflicts(br) + } endCmds.done(br, pErr, proposalNoRetry) }() @@ -3217,7 +3297,7 @@ func (r *Replica) tryExecuteWriteBatch( return nil, roachpb.NewError(err), proposalNoRetry } - spans, err := collectSpans(*r.Desc(), &ba) + spans, _, err := collectSpans(*r.Desc(), &ba) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } @@ -3231,7 +3311,7 @@ func (r *Replica) tryExecuteWriteBatch( // been run to successful completion. log.Event(ctx, "command queue") var err error - endCmds, err = r.beginCmds(ctx, &ba, spans) + endCmds, err = r.beginCmds(ctx, &ba, spans, false /* preEvaluate */) if err != nil { return nil, roachpb.NewError(err), proposalNoRetry } diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index bbedf5c47904..f1d106d304c0 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -3028,7 +3028,7 @@ func (ct *cmdQCancelTest) insertCmds(instrs []cancelInstr) { ctx, cancel := context.WithCancel(context.Background()) ba := instr.req() - spanSet, err := collectSpans(roachpb.RangeDescriptor{}, ba) + spanSet, _, err := collectSpans(roachpb.RangeDescriptor{}, ba) if err != nil { ct.Fatal(err) }