Skip to content

Commit

Permalink
kv: support server-side refreshes of uncertainty errors
Browse files Browse the repository at this point in the history
Extracted from cockroachdb#73732, with relevant comments addressed.

This commit adds support for server-side refreshes of
`ReadWithinUncertaintyIntervalError`s. This serves as a performance optimization
for transactional requests, which now benefit from this new capability to
refresh away `ReadWithinUncertaintyIntervalErrors` early in their transaction,
before they've accumulated any refresh spans. There's some complexity around
supporting this form of server-side retry, because it must be done above
latching, instead of below. However, the recent refactoring in cockroachdb#73557 has made
this possible to support cleanly.

This is also a prerequisite to giving non-transactional requests uncertainty
intervals (cockroachdb#73732), because we don't want ReadWithinUncertaintyIntervalErrors to
reach the client for non-transactional requests. Conveniently, because
non-transactional requests are always scoped to a single-range, those that hit
uncertainty errors will always be able to retry on the server, so these errors
will never bubble up to the client that initiated the request.

Release note (performance improvement): Certain forms of automatically retried
"read uncertainty" errors are now retried more efficiently, avoiding a network
round trip.
  • Loading branch information
nvanbenschoten committed Feb 5, 2022
1 parent 4a741cd commit 4f7c541
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 167 deletions.
27 changes: 14 additions & 13 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2262,9 +2262,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.Run(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// Even if accounting for the refresh spans would have exhausted the
Expand Down Expand Up @@ -2952,8 +2952,9 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
txnCoordRetry: true,
filter: newUncertaintyFilter(roachpb.Key("a")),
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
name: "cput within uncertainty interval after timestamp leaked",
Expand All @@ -2963,7 +2964,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
retryable: func(ctx context.Context, txn *kv.Txn) error {
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true,
tsLeaked: true,
},
Expand All @@ -2984,7 +2985,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value"))
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
txnCoordRetry: true,
},
{
Expand All @@ -3007,7 +3008,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
return nil
},
filter: newUncertaintyFilter(roachpb.Key([]byte("ac"))),
filter: newUncertaintyFilter(roachpb.Key("ac")),
clientRetry: true, // note this txn is read-only but still restarts
},
{
Expand All @@ -3023,7 +3024,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
txnCoordRetry: true,
},
{
Expand All @@ -3045,7 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("a"))),
filter: newUncertaintyFilter(roachpb.Key("a")),
clientRetry: true, // will fail because of conflict on refresh span for the Get
},
{
Expand All @@ -3059,7 +3060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value"))
return txn.CommitInBatch(ctx, b)
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -3069,7 +3070,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.Scan(ctx, "a", "d", 0)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand All @@ -3079,7 +3080,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
_, err := txn.DelRange(ctx, "a", "d", false /* returnKeys */)
return err
},
filter: newUncertaintyFilter(roachpb.Key([]byte("c"))),
filter: newUncertaintyFilter(roachpb.Key("c")),
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,26 @@ func (g *Guard) AssertNoLatches() {
}
}

// IsolatedAtLaterTimestamps returns whether the request holding the guard would
// continue to be isolated from other requests / transactions even if it were to
// increase its request timestamp while evaluating. If the method returns false,
// the concurrency guard must be dropped and re-acquired with the new timestamp
// before the request can evaluate at that later timestamp.
func (g *Guard) IsolatedAtLaterTimestamps() bool {
// If the request acquired any read latches with bounded (MVCC) timestamps
// then it can not trivially bump its timestamp without dropping and
// re-acquiring those latches. Doing so could allow the request to read at an
// unprotected timestamp. We only look at global latch spans because local
// latch spans always use unbounded (NonMVCC) timestamps.
return len(g.Req.LatchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 &&
// Similarly, if the request declared any global or local read lock spans
// then it can not trivially bump its timestamp without dropping its
// lockTableGuard and re-scanning the lockTable. Doing so could allow the
// request to conflict with locks that it previously did not conflict with.
len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) == 0 &&
len(g.Req.LockSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanLocal)) == 0
}

// CheckOptimisticNoConflicts checks that the {latch,lock}SpansRead do not
// have a conflicting latch, lock.
func (g *Guard) CheckOptimisticNoConflicts(
Expand Down
36 changes: 9 additions & 27 deletions pkg/kv/kvserver/replica_batch_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -183,7 +183,7 @@ func maybeStripInFlightWrites(ba *roachpb.BatchRequest) (*roachpb.BatchRequest,
// works for batches that exclusively contain writes; reads cannot be bumped
// like this because they've already acquired timestamp-aware latches.
func maybeBumpReadTimestampToWriteTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, latchSpans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard,
) bool {
if ba.Txn == nil {
return false
Expand All @@ -202,53 +202,35 @@ func maybeBumpReadTimestampToWriteTimestamp(
if batcheval.IsEndTxnExceedingDeadline(ba.Txn.WriteTimestamp, et.Deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, ba.Txn.WriteTimestamp, latchSpans)
return tryBumpBatchTimestamp(ctx, ba, g, ba.Txn.WriteTimestamp)
}

// tryBumpBatchTimestamp attempts to bump ba's read and write timestamps to ts.
//
// Returns true if the timestamp was bumped. Returns false if the timestamp could
// not be bumped.
func tryBumpBatchTimestamp(
ctx context.Context, ba *roachpb.BatchRequest, ts hlc.Timestamp, latchSpans *spanset.SpanSet,
ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, ts hlc.Timestamp,
) bool {
if len(latchSpans.GetSpans(spanset.SpanReadOnly, spanset.SpanGlobal)) > 0 {
// If the batch acquired any read latches with bounded (MVCC) timestamps
// then we can not trivially bump the batch's timestamp without dropping
// and re-acquiring those latches. Doing so could allow the request to
// read at an unprotected timestamp. We only look at global latch spans
// because local latch spans always use unbounded (NonMVCC) timestamps.
//
// NOTE: even if we hold read latches with high enough timestamps to
// fully cover ("protect") the batch at the new timestamp, we still
// don't want to allow the bump. This is because a batch with read spans
// and a higher timestamp may now conflict with locks that it previously
// did not. However, server-side retries don't re-scan the lock table.
// This can lead to requests missing unreplicated locks in the lock
// table that they should have seen or discovering replicated intents in
// MVCC that they should not have seen (from the perspective of the lock
// table's AddDiscoveredLock method).
//
// NOTE: we could consider adding a retry-loop above the latch
// acquisition to allow this to be retried, but given that we try not to
// mix read-only and read-write requests, doing so doesn't seem worth
// it.
if g != nil && !g.IsolatedAtLaterTimestamps() {
return false
}
if ts.Less(ba.Timestamp) {
log.Fatalf(ctx, "trying to bump to %s <= ba.Timestamp: %s", ts, ba.Timestamp)
}
ba.Timestamp = ts
if ba.Txn == nil {
log.VEventf(ctx, 2, "bumping batch timestamp to %s from %s", ts, ba.Timestamp)
ba.Timestamp = ts
return true
}
if ts.Less(ba.Txn.ReadTimestamp) || ts.Less(ba.Txn.WriteTimestamp) {
log.Fatalf(ctx, "trying to bump to %s inconsistent with ba.Txn.ReadTimestamp: %s, "+
"ba.Txn.WriteTimestamp: %s", ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
}
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s)",
log.VEventf(ctx, 2, "bumping batch timestamp to: %s from read: %s, write: %s",
ts, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp)
ba.Txn = ba.Txn.Clone()
ba.Txn.Refresh(ts)
ba.Timestamp = ba.Txn.ReadTimestamp // Refresh just updated ReadTimestamp
return true
}
33 changes: 17 additions & 16 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -518,8 +518,15 @@ func evaluateCommand(
// for transactional requests, retrying is possible if the transaction had not
// performed any prior reads that need refreshing.
//
// The concurrency guard, if not nil, indicates that the caller is holding
// latches and cannot adjust its timestamp beyond the limits of what is
// protected by those latches. If the concurrency guard is nil, the caller
// indicates that it is not holding latches and can therefore more freely adjust
// its timestamp because it will re-acquire latches at whatever timestamp the
// batch is bumped to.
//
// deadline, if not nil, specifies the highest timestamp (exclusive) at which
// the request can be evaluated. If ba is a transactional request, then dealine
// the request can be evaluated. If ba is a transactional request, then deadline
// cannot be specified; a transaction's deadline comes from it's EndTxn request.
//
// If true is returned, ba and ba.Txn will have been updated with the new
Expand All @@ -529,7 +536,7 @@ func canDoServersideRetry(
pErr *roachpb.Error,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
deadline *hlc.Timestamp,
) bool {
if ba.Txn != nil {
Expand All @@ -548,17 +555,6 @@ func canDoServersideRetry(
var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
// TODO(nvanbenschoten): This is intentionally not allowing server-side
// refreshes of ReadWithinUncertaintyIntervalErrors for now, even though
// that is the eventual goal here. Lifting that limitation will likely
// need to be accompanied by an above-latching retry loop, because read
// latches will usually prevent below-latch retries of
// ReadWithinUncertaintyIntervalErrors. See the comment in
// tryBumpBatchTimestamp.
if _, ok := pErr.GetDetail().(*roachpb.ReadWithinUncertaintyIntervalError); ok {
return false
}

var ok bool
ok, newTimestamp = roachpb.TransactionRefreshTimestamp(pErr)
if !ok {
Expand All @@ -576,7 +572,12 @@ func canDoServersideRetry(
}
switch tErr := pErr.GetDetail().(type) {
case *roachpb.WriteTooOldError:
newTimestamp = tErr.ActualTimestamp
newTimestamp = tErr.RetryTimestamp()

// TODO(nvanbenschoten): give non-txn requests uncertainty intervals. #73732.
//case *roachpb.ReadWithinUncertaintyIntervalError:
// newTimestamp = tErr.RetryTimestamp()

default:
return false
}
Expand All @@ -585,5 +586,5 @@ func canDoServersideRetry(
if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, newTimestamp, latchSpans)
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -773,7 +773,7 @@ func (r *Replica) evaluateProposal(
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (*result.Result, bool, *roachpb.Error) {
if ba.Timestamp.IsEmpty() {
return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp")
Expand All @@ -789,7 +789,7 @@ func (r *Replica) evaluateProposal(
//
// TODO(tschottdorf): absorb all returned values in `res` below this point
// in the call stack as well.
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, latchSpans)
batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, ui, g)

// Note: reusing the proposer's batch when applying the command on the
// proposer was explored as an optimization but resulted in no performance
Expand Down Expand Up @@ -875,18 +875,15 @@ func (r *Replica) evaluateProposal(
// evaluating it. The returned ProposalData is partially valid even
// on a non-nil *roachpb.Error and should be proposed through Raft
// if ProposalData.command is non-nil.
//
// TODO(nvanbenschoten): combine idKey, ba, and latchSpans into a
// `serializedRequest` struct.
func (r *Replica) requestToProposal(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
ba *roachpb.BatchRequest,
st kvserverpb.LeaseStatus,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (*ProposalData, *roachpb.Error) {
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, latchSpans)
res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, ui, g)

// Fill out the results even if pErr != nil; we'll return the error below.
proposal := &ProposalData{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *Replica) evalAndPropose(
) (chan proposalResult, func(), kvserverbase.CmdIDKey, *roachpb.Error) {
defer tok.DoneIfNotMoved(ctx)
idKey := makeIDKey()
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g.LatchSpans())
proposal, pErr := r.requestToProposal(ctx, idKey, ba, st, ui, g)
log.Event(proposal.ctx, "evaluated request")

// If the request hit a server-side concurrency retry error, immediately
Expand Down
13 changes: 5 additions & 8 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func (r *Replica) executeReadOnlyBatch(
ui := uncertainty.ComputeInterval(ba.Txn, st)

// Evaluate read-only batch command.
spans := g.LatchSpans()
rec := NewReplicaEvalContext(r, spans)
rec := NewReplicaEvalContext(r, g.LatchSpans())

// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
Expand All @@ -62,7 +61,7 @@ func (r *Replica) executeReadOnlyBatch(
panic("expected consistent iterators")
}
if util.RaceEnabled {
rw = spanset.NewReadWriterAt(rw, spans, ba.Timestamp)
rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp)
}
defer rw.Close()

Expand All @@ -81,9 +80,7 @@ func (r *Replica) executeReadOnlyBatch(
// the latches are released.

var result result.Result
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(
ctx, rw, rec, ba, ui, spans,
)
br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, ui, g)

// If the request hit a server-side concurrency retry error, immediately
// propagate the error. Don't assume ownership of the concurrency guard.
Expand Down Expand Up @@ -237,7 +234,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
rec batcheval.EvalContext,
ba *roachpb.BatchRequest,
ui uncertainty.Interval,
latchSpans *spanset.SpanSet,
g *concurrency.Guard,
) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) {
log.Event(ctx, "executing read-only batch")

Expand Down Expand Up @@ -290,7 +287,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes(
br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, ui, true /* readOnly */)
// If we can retry, set a higher batch timestamp and continue.
// Allow one retry only.
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, latchSpans, nil /* deadline */) {
if pErr == nil || retries > 0 || !canDoServersideRetry(ctx, pErr, ba, br, g, nil /* deadline */) {
break
}
}
Expand Down
Loading

0 comments on commit 4f7c541

Please sign in to comment.