Skip to content

Commit

Permalink
kv: decouple parallel commit auto-retries from refresh auto-retries
Browse files Browse the repository at this point in the history
Informs #100131.

This commit decouples the concepts of a parallel commit auto-retry from
a refresh auto-retry. This is important for weak isolation transactions
which can tolerate write skew, where a failed parallel commit does not
necessarily imply that the transaction needs to refresh its read spans
in order to commit.

Parallel commit auto-retries are performed when a parallel committing
batch is successful in its execution but fails to qualify for the
implicit commit condition. In such cases, the EndTxn request (and just
the EndTxn request) is re-issued to move the transaction directly to the
COMMITTED state. This only implies the need for a refresh for
serializable transactions. For these transactions, a preemptive refresh
will be performed by the txnSpanRefresher before the EndTxn is
re-issued.

Conversely, refresh auto-retries are performed when a batch fails to
execute fully and returns an error. In such cases, the txnSpanRefresher
will intercept the error, attempt to refresh, and then re-issue the
entire original batch. Because we no longer make an attempt to determine
which parts of a previously issued batch were successful (this was subtle
and error-prone when it existed), we instead re-issue the entire batch
and rely on idempotency.

To make this change, this commit moves the txnCommitter interceptor
above the txnSpanRefresher interceptor in the TxnCoordSender interceptor
stack. This means that the txnCommitter no longer needs to guard against
re-issued batches, because all retries are handled below it. Instead,
the txnSpanRefresher now needs to learn about the STAGING transaction
status. The txnSpanRefresher was already taught about the STAGING status
on successful batches in ef77322, and is taught about the STAGING
status on errors here.

While the primary motivation of this change is to permit weak isolation
transactions to perform parallel commit auto-retries without needing to
refresh, it also has another benefit. Failed parallel commits no longer
re-issue their entire batch on serialization failures. Instead, they
just re-issue the EndTxn. As a result, this replaces and closes #93099.
It also motivates release note below.

Release note (performance improvement): Some large, long-running INSERT
statements now perform less work during their commit phase and can run
faster.
  • Loading branch information
nvanbenschoten committed May 12, 2023
1 parent eb7bd75 commit d697e6a
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 132 deletions.
40 changes: 25 additions & 15 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expClientRestart bool // client-side txn restart
expServerRefresh bool // server-side refresh
expOnePhaseCommit bool // 1PC commits
expParallelCommitAutoRetry bool // parallel commit auto-retries
expFailure string // regexp pattern to match on error, if not empty
}
type testCase struct {
Expand Down Expand Up @@ -3045,6 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expServerRefresh: true,
expClientRefreshSuccess: false,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: false,
},
},
{
Expand All @@ -3062,10 +3064,11 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// The Put to "c" will succeed with a forwarded timestamp. However, the
// txn has already staged on the other range at an earlier timestamp. As a
// result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
// requires a client-side refresh and parallel commit auto-retry.
allIsoLevels: &expect{
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
},
},
{
Expand Down Expand Up @@ -3149,6 +3152,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
expServerRefresh: true,
expClientRefreshSuccess: false,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: false,
},
},
{
Expand All @@ -3165,11 +3169,12 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// The Put to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
// requires a parallel commit auto-retry.
allIsoLevels: &expect{
expServerRefresh: true,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expClientRefreshSuccess: false,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
},
},
{
Expand All @@ -3184,12 +3189,13 @@ func TestTxnCoordSenderRetries(t *testing.T) {
return txn.CommitInBatch(ctx, b)
},
priorReads: true,
// The Put to "a" will fail, failing the parallel commit and forcing a
// client-side refresh.
// The Put to "a" will fail, failing the parallel commit with an error and
// forcing a client-side refresh and auto-retry of the full batch.
allIsoLevels: &expect{
expServerRefresh: false,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expParallelCommitAutoRetry: false,
},
},
{
Expand All @@ -3205,11 +3211,12 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
priorReads: true,
// The Put to "c" will fail, failing the parallel commit and forcing a
// client-side refresh.
// client-side refresh and parallel commit auto-retry.
allIsoLevels: &expect{
expServerRefresh: false,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
},
},
{
Expand Down Expand Up @@ -3387,11 +3394,12 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// The cput to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
// requires a parallel commit auto-retry.
allIsoLevels: &expect{
expServerRefresh: true,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expClientRefreshSuccess: false,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
},
},
{
Expand Down Expand Up @@ -3435,11 +3443,12 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// The cput to "c" will succeed after a server-side refresh. However, the
// txn has already staged on the other range at the pre-refresh timestamp.
// As a result, it does not qualify for the implicit commit condition and
// requires a client-side refresh.
// requires a parallel commit auto-retry.
allIsoLevels: &expect{
expServerRefresh: true,
expClientRefreshSuccess: true,
expClientAutoRetryAfterRefresh: true,
expClientRefreshSuccess: false,
expClientAutoRetryAfterRefresh: false,
expParallelCommitAutoRetry: true,
},
},
{
Expand Down Expand Up @@ -3615,6 +3624,7 @@ func TestTxnCoordSenderRetries(t *testing.T) {
require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess")
require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts")
require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC")
require.Equal(t, exp.expParallelCommitAutoRetry, metrics.ParallelCommitAutoRetries.Count() != 0, "TxnMetrics.ParallelCommitAutoRetries")
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ type TxnCoordSender struct {
txnHeartbeater
txnSeqNumAllocator
txnPipeliner
txnSpanRefresher
txnCommitter
txnSpanRefresher
txnMetricRecorder
txnLockGatekeeper // not in interceptorStack array.
}
Expand Down Expand Up @@ -254,6 +254,7 @@ func newRootTxnCoordSender(
tcs.interceptorAlloc.txnCommitter = txnCommitter{
st: tcf.st,
stopper: tcs.stopper,
metrics: &tcs.metrics,
mu: &tcs.mu.Mutex,
}
tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{
Expand All @@ -274,18 +275,20 @@ func newRootTxnCoordSender(
// never generate transaction retry errors that could be avoided
// with a refresh.
&tcs.interceptorAlloc.txnPipeliner,
// The committer sits above the span refresher because it will
// never generate transaction retry errors that could be avoided
// with a refresh. However, it may re-issue parts of "successful"
// batches that failed to qualify for the parallel commit condition.
// These re-issued batches benefit from pre-emptive refreshes in the
// span refresher.
&tcs.interceptorAlloc.txnCommitter,
// The span refresher may resend entire batches to avoid transaction
// retries. Because of that, we need to be careful which interceptors
// sit below it in the stack.
// The span refresher sits below the committer, so it must be prepared
// to see STAGING transaction protos in responses and errors. It should
// defer to the committer for how to handle those.
&tcs.interceptorAlloc.txnSpanRefresher,
// The committer sits beneath the span refresher so that any
// retryable errors that it generates have a chance of being
// "refreshed away" without the need for a txn restart. Because the
// span refresher can re-issue batches, it needs to be careful about
// what parts of the batch it mutates. Any mutation needs to be
// idempotent and should avoid writing to memory when not changing
// it to avoid looking like a data race.
&tcs.interceptorAlloc.txnCommitter,
// The metrics recorder sits at the bottom of the stack so that it
// can observe all transformations performed by other interceptors.
&tcs.interceptorAlloc.txnMetricRecorder,
Expand Down
153 changes: 99 additions & 54 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,33 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting(
// satisfied and the transaction is still in-progress (and could still be
// committed or aborted at a later time). There are a number of reasons why
// some of the requests in the final batch may have failed:
// - intent writes: these requests may fail to write an intent due to a logical
// error like a ConditionFailedError. They also could have succeeded at writing
// an intent but failed to write it at the desired timestamp because they ran
// into the timestamp cache or another committed value. In the first case, the
// txnCommitter will receive an error. In the second, it will generate one in
// needTxnRetryAfterStaging.
//
// - intent writes (error): these requests may fail to write an intent due to
// a logical error like a ConditionFailedError during evaluation. In these
// cases, the txnCommitter will receive an error and can conclude that
// intent was never written and so the implicit commit condition is not
// satisfied. The error is returned to the client.
//
// - intent writes (successful but pushed): these requests may also succeed at
// writing an intent but fail to write it at the desired (staging) timestamp
// because they run into the timestamp cache or another committed value
// (forms of contention). In these cases, the txnCommitter will receive a
// successful response, but can determine (isTxnCommitImplicit) that the
// transaction does not satisfy the implicit commit condition because one or
// more of its intents are written with a timestamp higher than the staging
// transaction record's. It will retry the transaction commit by re-issuing
// the EndTxn request (retryTxnCommitAfterFailedParallelCommit) to attempt
// to move the transaction directly to the explicitly committed state. This
// retry is called a "parallel commit auto-retry".
//
// - query intents: these requests may fail because they discover that one of the
// previously issued writes has failed; either because it never left an intent
// or because it left one at too high of a timestamp. In this case, the request
// will return an error because the requests all have the ErrorIfMissing option
// set. It will also prevent the write from ever succeeding in the future, which
// ensures that the transaction will never suddenly become implicitly committed
// at a later point due to the write eventually succeeding (e.g. after a replay).
//
// - end txn: this request may fail with a TransactionRetryError for any number of
// reasons, such as if the transaction's provisional commit timestamp has been
// pushed past its read timestamp. In all of these cases, an error will be
Expand All @@ -117,6 +131,7 @@ type txnCommitter struct {
st *cluster.Settings
stopper *stop.Stopper
wrapped lockedSender
metrics *TxnMetrics
mu sync.Locker
}

Expand All @@ -142,17 +157,11 @@ func (tc *txnCommitter) SendLocked(
return tc.sendLockedWithElidedEndTxn(ctx, ba, et)
}

// Assign the transaction's key to the Request's header if it isn't already
// set. This is the only place where EndTxnRequest.Key is assigned, but we
// could be dealing with a re-issued batch after a refresh. Remember, the
// committer is below the span refresh on the interceptor stack.
if et.Key == nil {
et.Key = ba.Txn.Key
} else if len(et.InFlightWrites) > 0 {
// Parallel commits should be disabled on retries by splitEndTxnAndRetrySend.
return nil, kvpb.NewError(errors.AssertionFailedf(
"re-issued EndTxn request with InFlightWrites: %v", et))
// Assign the transaction's key to the Request's header.
if et.Key != nil {
return nil, kvpb.NewError(errors.AssertionFailedf("client must not assign Key to EndTxn"))
}
et.Key = ba.Txn.Key

// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
Expand Down Expand Up @@ -212,17 +221,24 @@ func (tc *txnCommitter) SendLocked(
return nil, kvpb.NewErrorf("unexpected response status without error: %v", br.Txn)
}

// Determine whether the transaction needs to either retry or refresh. When
// the EndTxn request evaluated while STAGING the transaction record, it
// performed this check. However, the transaction proto may have changed due
// to writes evaluated concurrently with the EndTxn even if none of those
// writes returned an error. Remember that the transaction proto we see here
// could be a combination of protos from responses, all merged by
// DistSender.
if pErr := needTxnRetryAfterStaging(br); pErr != nil {
log.VEventf(ctx, 2, "parallel commit failed since some writes were pushed. "+
"Synthesized err: %s", pErr)
return nil, pErr
// Determine whether the transaction satisfies the implicit commit condition.
// If not, it needs to retry the EndTxn request, and possibly also refresh if
// it is serializable.
implicitCommit, err := isTxnCommitImplicit(br)
if err != nil {
return nil, kvpb.NewError(err)
}

// Retry the EndTxn request (and nothing else) if the transaction does not
// satisfy the implicit commit condition. This EndTxn request will not be
// in-flight concurrently with any other writes (they all succeeded), so it
// will move the transaction record directly to the COMMITTED state.
//
// Note that we leave the transaction record that we wrote in the STAGING
// state, which is not ideal. But as long as we continue heartbeating the
// txn record, it being PENDING or STAGING does not make a difference.
if !implicitCommit {
return tc.retryTxnCommitAfterFailedParallelCommit(ctx, ba, br)
}

// If the transaction doesn't need to retry then it is implicitly committed!
Expand Down Expand Up @@ -384,42 +400,71 @@ func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Sp
return roachpb.MergeSpans(&m)
}

// needTxnRetryAfterStaging determines whether the transaction needs to refresh
// (see txnSpanRefresher) or retry based on the batch response of a parallel
// commit attempt.
func needTxnRetryAfterStaging(br *kvpb.BatchResponse) *kvpb.Error {
// isTxnCommitImplicit determines whether the transaction has satisfied the
// implicit commit requirements. It is used to determine whether the transaction
// needs to retry its EndTxn based on the response to a parallel commit attempt.
func isTxnCommitImplicit(br *kvpb.BatchResponse) (bool, error) {
if len(br.Responses) == 0 {
return kvpb.NewErrorf("no responses in BatchResponse: %v", br)
return false, errors.AssertionFailedf("no responses in BatchResponse: %v", br)
}
lastResp := br.Responses[len(br.Responses)-1].GetInner()
etResp, ok := lastResp.(*kvpb.EndTxnResponse)
if !ok {
return kvpb.NewErrorf("unexpected response in BatchResponse: %v", lastResp)
return false, errors.AssertionFailedf("unexpected response in BatchResponse: %v", lastResp)
}
if etResp.StagingTimestamp.IsEmpty() {
return kvpb.NewErrorf("empty StagingTimestamp in EndTxnResponse: %v", etResp)
return false, errors.AssertionFailedf("empty StagingTimestamp in EndTxnResponse: %v", etResp)
}
if etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) {
// If the timestamp that the transaction record was staged at
// is less than the timestamp of the transaction in the batch
// response then one of the concurrent writes was pushed to
// a higher timestamp. This violates the "implicit commit"
// condition and neither the transaction coordinator nor any
// other concurrent actor will consider this transaction to
// be committed as is.
// Note that we leave the transaction record that we wrote in the STAGING
// state, which is not ideal. But as long as we continue heartbeating the
// txn record, it being PENDING or STAGING does not make a difference.
reason := kvpb.RETRY_SERIALIZABLE
if br.Txn.WriteTooOld {
reason = kvpb.RETRY_WRITE_TOO_OLD
}
err := kvpb.NewTransactionRetryError(
reason, "serializability failure concurrent with STAGING")
txn := cloneWithStatus(br.Txn, roachpb.PENDING)
return kvpb.NewErrorWithTxn(err, txn)
// If the timestamp that the transaction record was staged at is less than
// the timestamp of the transaction in the batch response then one of the
// concurrent writes was pushed to a higher timestamp. This violates the
// "implicit commit" condition and neither the transaction coordinator nor
// any other concurrent actor will consider this transaction to be committed
// as is.
failed := etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp)
return !failed, nil
}

// retryTxnCommitAfterFailedParallelCommit retries the batch's EndTxn request
// after the batch has previously succeeded (with the response br), but failed
// to qualify for the implicit commit condition. This EndTxn request will not be
// in-flight concurrently with any other writes (they all succeeded), so it will
// move the transaction record directly to the COMMITTED state.
//
// If successful, the response for the re-issued EndTxn request is stitched back
// together with the rest of the BatchResponse and returned.
func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit(
ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse,
) (*kvpb.BatchResponse, *kvpb.Error) {
log.Eventf(ctx, "parallel commit failed; retrying EndTxn request")
tc.metrics.ParallelCommitAutoRetries.Inc(1)

// Issue a batch containing only the EndTxn request.
etIdx := len(ba.Requests) - 1
baSuffix := ba.ShallowCopy()
baSuffix.Requests = ba.Requests[etIdx:]
baSuffix.Txn = cloneWithStatus(br.Txn, roachpb.PENDING)
// Update the EndTxn request to move the in-flight writes currently attached
// to the EndTxn request to the LockSpans and clear the in-flight write set;
// the writes already succeeded and will not be in-flight concurrently with
// the EndTxn request.
{
et := baSuffix.Requests[0].GetEndTxn().ShallowCopy().(*kvpb.EndTxnRequest)
et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites)
et.InFlightWrites = nil
baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et
}
return nil
brSuffix, pErr := tc.wrapped.SendLocked(ctx, baSuffix)
if pErr != nil {
return nil, pErr
}

// Combine the responses.
br.Responses[etIdx] = kvpb.ResponseUnion{}
if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil {
return nil, kvpb.NewError(err)
}
return br, nil
}

// makeTxnCommitExplicitAsync launches an async task that attempts to move the
Expand Down
Loading

0 comments on commit d697e6a

Please sign in to comment.