Skip to content

Commit

Permalink
kv: enable replay protection for ambiguous writes on commits
Browse files Browse the repository at this point in the history
While previously, RPC failures were assumed to be retriable, as write
operations (with the notable exception of `EndTxn`) were assumed to be
idempotent, it has been seen in #67765 and documented in #103817 that
RPC failures on write operations that occur in parallel with a commit
(i.e. a partial batch where `withCommit==true`), it is not always
possible to assume idempotency and retry the "ambiguous" writes.
This is due to the fact that the retried write RPC could result in the
transaction's `WriteTimestamp` being bumped, changing the commit
timestamp of the transaction that may in fact already be implicitly
committed if the initial "ambiguous" write actually succeeded.

This change modifies the protocol of the DistSender to flag in
subsequent retries that a batch with a commit has previously
experienced ambiguity, as well as the handling of the retried write in
the MVCC layer to detect this previous ambiguity and reject retries
that change the write timestamp as a non-idempotent replay. The flag
allows subsequent retries to "remember" the earlier ambiguous write and
evaluate accordingly.

The flag allows us to properly handle RPC failures (i.e. ambiguous
writes) that occur on commit, as a transaction that is implicitly
committed is eligible to be marked as explicitly committed by contending
transactions via the `RecoverTxn` operation, resulting in a race between
retries by the transaction coordinator and recovery by contending
transactions that could result in either incorrectly reporting a
transaction as having failed with a `RETRY_SERIALIZABLE` error (despite
the possibility that it already was or could be recovered and
successfully committed), or in attempting to explicitly commit an
already-recovered and committed transaction, resulting in seeing an
assertion failure due to `transaction unexpectedly committed`.

The replay protection introduced here allows us to avoid both of these
situations by detecting a replay that should be considered
non-idempotent and returning an error, causing the original RPC error
remembered by the DistSender to be propagated as an
`AmbiguousResultError`. As such, this can be handled by application code
by validating the success/failure of a transaction when receiving this
error.

Depends on #107680, #107323, #108154, #108001

Fixes: #103817

Release note (bug fix): Properly handles RPC failures on writes using
the parallel commit protocol that execute in parallel to the commit
operation, avoiding incorrect retriable failures and `transaction
unexpectedly committed` assertions by detecting when writes cannot
be retried idempotently, instead returning an `AmbiguousResultError`.
  • Loading branch information
AlexTalks committed Aug 8, 2023
1 parent 974c752 commit 546f346
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 169 deletions.
29 changes: 21 additions & 8 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,8 @@ func maybeSetResumeSpan(
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr)
return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted] (last error: %v)",
ambiguousErr, lastAttemptErr)
}

// TODO(bdarnell): The error from the last attempt is not necessarily the best
Expand Down Expand Up @@ -2262,6 +2263,10 @@ func (ds *DistSender) sendToReplicas(
ba = ba.ShallowCopy()
ba.Replica = curReplica
ba.RangeID = desc.RangeID
ba.AmbiguousReplayProtection = ambiguousError != nil
if ba.AmbiguousReplayProtection && ba.CanForwardReadTimestamp {
ba.CanForwardReadTimestamp = false
}
// Communicate to the server the information our cache has about the
// range. If it's stale, the server will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
Expand Down Expand Up @@ -2296,10 +2301,13 @@ func (ds *DistSender) sendToReplicas(
ds.maybeIncrementErrCounters(br, err)

if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate] (last error: %v)",
ambiguousError, err)
}
return nil, err
}
Expand All @@ -2321,10 +2329,6 @@ func (ds *DistSender) sendToReplicas(
// ambiguity.
// 2) SQL recognizes AmbiguousResultErrors and gives them a special code
// (StatementCompletionUnknown).
// TODO(andrei): The use of this code is inconsistent because a) the
// DistSender tries to only return the code for commits, but it'll happily
// forward along AmbiguousResultErrors coming from the replica and b) we
// probably should be returning that code for non-commit statements too.
//
// We retry requests in order to avoid returning errors (in particular,
// AmbiguousResultError). Retrying the batch will either:
Expand All @@ -2339,6 +2343,12 @@ func (ds *DistSender) sendToReplicas(
// can't claim success (and even if we could claim success, we still
// wouldn't have the complete result of the successful evaluation).
//
// Note that in case c), a request is not idempotent if the retry finds
// the request succeeded the first time around, but requires a change to
// the transaction's write timestamp. This is guarded against by setting
// the AmbiguousReplayProtection flag, so that the replay is aware the
// batch has seen an ambiguous error.
//
// Case a) is great - the retry made the request succeed. Case b) is also
// good; due to idempotency we managed to swallow a communication error.
// Case c) is not great - we'll end up returning an error even though the
Expand All @@ -2351,10 +2361,12 @@ func (ds *DistSender) sendToReplicas(
// evaluating twice, overwriting another unrelated write that fell
// in-between.
//
// NB: If this partial batch does not contain the EndTxn request but the
// batch contains a commit, the ambiguous error should be caught on
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
}
log.VErrEventf(ctx, 2, "RPC error: %s", err)

// If the error wasn't just a context cancellation and the down replica
// is cached as the lease holder, evict it. The only other eviction
Expand Down Expand Up @@ -2510,7 +2522,8 @@ func (ds *DistSender) sendToReplicas(
}
default:
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate] (last error: %v)",
ambiguousError, br.Error.GoError())
}

// The error received is likely not specific to this
Expand Down
248 changes: 107 additions & 141 deletions pkg/kv/kvclient/kvcoord/dist_sender_ambiguous_test.go

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2766,9 +2766,17 @@ message Header {
// sender.
repeated string profile_labels = 31 [(gogoproto.customname) = "ProfileLabels"];

// AmbiguousReplayProtection, if set, prevents replay of a write operation
// from being considered idempotent if its write timestamp is different from
// the intent's. This protection is required when there has been an
// ambiguous write (i.e. RPC error) on a batch that contained a commit,
// as the transaction may have already been explicitly committed by a racing
// RecoverTxn request. See #103817.
bool ambiguous_replay_protection = 32;

reserved 7, 10, 12, 14, 20;

// Next ID: 32
// Next ID: 33
}

// BoundedStalenessHeader contains configuration values pertaining to bounded
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func Delete(
reply := resp.(*kvpb.DeleteResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
EnableReplayProtection: h.AmbiguousReplayProtection,
}

var err error
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,22 @@ func DeleteRange(
if !args.Inline {
timestamp = h.Timestamp
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
EnableReplayProtection: h.AmbiguousReplayProtection,
}

// NB: Even if args.ReturnKeys is false, we want to know which intents were
// written if we're evaluating the DeleteRange for a transaction so that we
// can update the Result's AcquiredLocks field.
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, err := storage.MVCCDeleteRange(
ctx, readWriter, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp,
storage.MVCCWriteOptions{Txn: h.Txn, LocalTimestamp: cArgs.Now, Stats: cArgs.Stats},
returnKeys)
opts, returnKeys)
if err != nil {
return result.Result{}, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ func Increment(
reply := resp.(*kvpb.IncrementResponse)

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
EnableReplayProtection: h.AmbiguousReplayProtection,
}

var err error
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ func InitPut(
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
EnableReplayProtection: h.AmbiguousReplayProtection,
}

var err error
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func Put(
}

opts := storage.MVCCWriteOptions{
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
Txn: h.Txn,
LocalTimestamp: cArgs.Now,
Stats: cArgs.Stats,
EnableReplayProtection: h.AmbiguousReplayProtection,
}

var err error
Expand Down
29 changes: 24 additions & 5 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,6 +1665,7 @@ func replayTransactionalWrite(
value roachpb.Value,
txn *roachpb.Transaction,
valueFn func(optionalValue) (roachpb.Value, error),
ambiguousReplayProtection bool,
) error {
var writtenValue optionalValue
var err error
Expand Down Expand Up @@ -1750,6 +1751,14 @@ func replayTransactionalWrite(
txn.ID, txn.Sequence, value.RawBytes, writtenValue.RawBytes)
}

// If ambiguous replay protection is enabled, a replay that changes the
// timestamp should fail, as this would break idempotency (see #103817).
if ambiguousReplayProtection && !txn.WriteTimestamp.Equal(meta.Txn.WriteTimestamp) {
return errors.Errorf("transaction %s with sequence %d prevented from changing "+
"write timestamp from %s to %s due to replay protection",
txn.ID, txn.Sequence, meta.Txn.WriteTimestamp, txn.WriteTimestamp)
}

return nil
}

Expand Down Expand Up @@ -1819,6 +1828,9 @@ func mvccPutInternal(
if !value.Timestamp.IsEmpty() {
return false, errors.Errorf("cannot have timestamp set in value")
}
if err := opts.validate(); err != nil {
return false, err
}

metaKey := MakeMVCCMetadataKey(key)
ok, origMetaKeySize, origMetaValSize, origRealKeyChanged, err :=
Expand Down Expand Up @@ -1918,7 +1930,7 @@ func mvccPutInternal(
// The transaction has executed at this sequence before. This is merely a
// replay of the transactional write. Assert that all is in order and return
// early.
return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn)
return false, replayTransactionalWrite(ctx, iter, meta, key, readTimestamp, value, opts.Txn, valueFn, opts.EnableReplayProtection)
}

// We're overwriting the intent that was present at this key, before we do
Expand Down Expand Up @@ -2306,7 +2318,6 @@ func MVCCIncrement(
newValue.InitChecksum(key)
return newValue, nil
}

err := mvccPutUsingIter(ctx, rw, iter, key, timestamp, noValue, valueFn, opts)

return newInt64Val, err
Expand Down Expand Up @@ -3897,9 +3908,17 @@ func buildScanIntents(data []byte) ([]roachpb.Intent, error) {
// MVCCWriteOptions bundles options for the MVCCPut and MVCCDelete families of functions.
type MVCCWriteOptions struct {
// See the comment on mvccPutInternal for details on these parameters.
Txn *roachpb.Transaction
LocalTimestamp hlc.ClockTimestamp
Stats *enginepb.MVCCStats
Txn *roachpb.Transaction
LocalTimestamp hlc.ClockTimestamp
Stats *enginepb.MVCCStats
EnableReplayProtection bool
}

func (opts *MVCCWriteOptions) validate() error {
if opts.EnableReplayProtection && opts.Txn == nil {
return errors.Errorf("cannot enable replay protection without a transaction")
}
return nil
}

// MVCCScanOptions bundles options for the MVCCScan family of functions.
Expand Down

0 comments on commit 546f346

Please sign in to comment.