From 019d15736f8d9d7bef0f70bfb31550cc6807c83d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 12 May 2023 18:18:53 -0400 Subject: [PATCH 1/4] kv: decouple parallel commit auto-retries from refresh auto-retries Informs #100131. This commit decouples the concepts of a parallel commit auto-retry from a refresh auto-retry. This is important for weak isolation transactions which can tolerate write skew, where a failed parallel commit does not necessarily imply that the transaction needs to refresh its read spans in order to commit. Parallel commit auto-retries are performed when a parallel committing batch is successful in its execution but fails to qualify for the implicit commit condition. In such cases, the EndTxn request (and just the EndTxn request) is re-issued to move the transaction directly to the COMMITTED state. This only implies the need for a refresh for serializable transactions. For these transactions, a preemptive refresh will be performed by the txnSpanRefresher before the EndTxn is re-issued. Conversely, refresh auto-retries are performed when a batch fails to execute fully and returns an error. In such cases, the txnSpanRefresher will intercept the error, attempt to refresh, and then re-issue the entire original batch. Because we no longer make an attempt to determine which parts of a previously issued batch were successful (this was subtle and error-prone when it existed), we instead re-issue the entire batch and rely on idempotency. To make this change, this commit moves the txnCommitter interceptor above the txnSpanRefresher interceptor in the TxnCoordSender interceptor stack. This means that the txnCommitter no longer needs to guard against re-issued batches, because all retries are handled below it. Instead, the txnSpanRefresher now needs to learn about the STAGING transaction status. The txnSpanRefresher was already taught about the STAGING status on successful batches in ef773227, and is taught about the STAGING status on errors here. While the primary motivation of this change is to permit weak isolation transactions to perform parallel commit auto-retries without needing to refresh, it also has another benefit. Failed parallel commits no longer re-issue their entire batch on serialization failures. Instead, they just re-issue the EndTxn. As a result, this replaces and closes #93099. It also motivates release note below. Release note (performance improvement): Some large, long-running INSERT statements now perform less work during their commit phase and can run faster. --- .../kvcoord/dist_sender_server_test.go | 41 +++-- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 21 +-- .../kvcoord/txn_interceptor_committer.go | 153 +++++++++++------- .../kvcoord/txn_interceptor_committer_test.go | 114 +++++++------ .../kvcoord/txn_interceptor_pipeliner.go | 5 +- .../kvcoord/txn_interceptor_span_refresher.go | 12 ++ .../txn_interceptor_span_refresher_test.go | 96 +++++++++++ pkg/kv/kvclient/kvcoord/txn_metrics.go | 18 ++- 8 files changed, 328 insertions(+), 132 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index dd71af584971..6f27c607d0ac 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2060,6 +2060,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientRestart bool // client-side txn restart expServerRefresh bool // server-side refresh expOnePhaseCommit bool // 1PC commits + expParallelCommitAutoRetry bool // parallel commit auto-retries expFailure string // regexp pattern to match on error, if not empty } type testCase struct { @@ -3045,6 +3046,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3062,10 +3064,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed with a forwarded timestamp. However, the // txn has already staged on the other range at an earlier timestamp. As a // result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry and preemptive client-side + // refresh. allIsoLevels: &expect{ expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3149,6 +3153,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { expServerRefresh: true, expClientRefreshSuccess: false, expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, }, }, { @@ -3165,11 +3170,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The Put to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3184,12 +3190,13 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "a" will fail, failing the parallel commit and forcing a - // client-side refresh. + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, }, }, { @@ -3205,11 +3212,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, priorReads: true, // The Put to "c" will fail, failing the parallel commit and forcing a - // client-side refresh. + // parallel commit auto-retry and preemptive client-side refresh. allIsoLevels: &expect{ expServerRefresh: false, expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3387,11 +3395,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3435,11 +3444,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { // The cput to "c" will succeed after a server-side refresh. However, the // txn has already staged on the other range at the pre-refresh timestamp. // As a result, it does not qualify for the implicit commit condition and - // requires a client-side refresh. + // requires a parallel commit auto-retry. allIsoLevels: &expect{ expServerRefresh: true, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, }, }, { @@ -3615,6 +3625,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess") require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts") require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC") + require.Equal(t, exp.expParallelCommitAutoRetry, metrics.ParallelCommitAutoRetries.Count() != 0, "TxnMetrics.ParallelCommitAutoRetries") } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 7e78838a5de6..2367dec634b3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -163,8 +163,8 @@ type TxnCoordSender struct { txnHeartbeater txnSeqNumAllocator txnPipeliner - txnSpanRefresher txnCommitter + txnSpanRefresher txnMetricRecorder txnLockGatekeeper // not in interceptorStack array. } @@ -254,6 +254,7 @@ func newRootTxnCoordSender( tcs.interceptorAlloc.txnCommitter = txnCommitter{ st: tcf.st, stopper: tcs.stopper, + metrics: &tcs.metrics, mu: &tcs.mu.Mutex, } tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{ @@ -274,18 +275,20 @@ func newRootTxnCoordSender( // never generate transaction retry errors that could be avoided // with a refresh. &tcs.interceptorAlloc.txnPipeliner, + // The committer sits above the span refresher because it will + // never generate transaction retry errors that could be avoided + // with a refresh. However, it may re-issue parts of "successful" + // batches that failed to qualify for the parallel commit condition. + // These re-issued batches benefit from pre-emptive refreshes in the + // span refresher. + &tcs.interceptorAlloc.txnCommitter, // The span refresher may resend entire batches to avoid transaction // retries. Because of that, we need to be careful which interceptors // sit below it in the stack. + // The span refresher sits below the committer, so it must be prepared + // to see STAGING transaction protos in responses and errors. It should + // defer to the committer for how to handle those. &tcs.interceptorAlloc.txnSpanRefresher, - // The committer sits beneath the span refresher so that any - // retryable errors that it generates have a chance of being - // "refreshed away" without the need for a txn restart. Because the - // span refresher can re-issue batches, it needs to be careful about - // what parts of the batch it mutates. Any mutation needs to be - // idempotent and should avoid writing to memory when not changing - // it to avoid looking like a data race. - &tcs.interceptorAlloc.txnCommitter, // The metrics recorder sits at the bottom of the stack so that it // can observe all transformations performed by other interceptors. &tcs.interceptorAlloc.txnMetricRecorder, diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go index 21dfa1198c46..a408f2438a4c 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go @@ -89,12 +89,25 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // satisfied and the transaction is still in-progress (and could still be // committed or aborted at a later time). There are a number of reasons why // some of the requests in the final batch may have failed: -// - intent writes: these requests may fail to write an intent due to a logical -// error like a ConditionFailedError. They also could have succeeded at writing -// an intent but failed to write it at the desired timestamp because they ran -// into the timestamp cache or another committed value. In the first case, the -// txnCommitter will receive an error. In the second, it will generate one in -// needTxnRetryAfterStaging. +// +// - intent writes (error): these requests may fail to write an intent due to +// a logical error like a ConditionFailedError during evaluation. In these +// cases, the txnCommitter will receive an error and can conclude that +// intent was never written and so the implicit commit condition is not +// satisfied. The error is returned to the client. +// +// - intent writes (successful but pushed): these requests may also succeed at +// writing an intent but fail to write it at the desired (staging) timestamp +// because they run into the timestamp cache or another committed value +// (forms of contention). In these cases, the txnCommitter will receive a +// successful response, but can determine (isTxnCommitImplicit) that the +// transaction does not satisfy the implicit commit condition because one or +// more of its intents are written with a timestamp higher than the staging +// transaction record's. It will retry the transaction commit by re-issuing +// the EndTxn request (retryTxnCommitAfterFailedParallelCommit) to attempt +// to move the transaction directly to the explicitly committed state. This +// retry is called a "parallel commit auto-retry". +// // - query intents: these requests may fail because they discover that one of the // previously issued writes has failed; either because it never left an intent // or because it left one at too high of a timestamp. In this case, the request @@ -102,6 +115,7 @@ var parallelCommitsEnabled = settings.RegisterBoolSetting( // set. It will also prevent the write from ever succeeding in the future, which // ensures that the transaction will never suddenly become implicitly committed // at a later point due to the write eventually succeeding (e.g. after a replay). +// // - end txn: this request may fail with a TransactionRetryError for any number of // reasons, such as if the transaction's provisional commit timestamp has been // pushed past its read timestamp. In all of these cases, an error will be @@ -117,6 +131,7 @@ type txnCommitter struct { st *cluster.Settings stopper *stop.Stopper wrapped lockedSender + metrics *TxnMetrics mu sync.Locker } @@ -142,17 +157,11 @@ func (tc *txnCommitter) SendLocked( return tc.sendLockedWithElidedEndTxn(ctx, ba, et) } - // Assign the transaction's key to the Request's header if it isn't already - // set. This is the only place where EndTxnRequest.Key is assigned, but we - // could be dealing with a re-issued batch after a refresh. Remember, the - // committer is below the span refresh on the interceptor stack. - if et.Key == nil { - et.Key = ba.Txn.Key - } else if len(et.InFlightWrites) > 0 { - // Parallel commits should be disabled on retries by splitEndTxnAndRetrySend. - return nil, kvpb.NewError(errors.AssertionFailedf( - "re-issued EndTxn request with InFlightWrites: %v", et)) + // Assign the transaction's key to the Request's header. + if et.Key != nil { + return nil, kvpb.NewError(errors.AssertionFailedf("client must not assign Key to EndTxn")) } + et.Key = ba.Txn.Key // Determine whether the commit request can be run in parallel with the rest // of the requests in the batch. If not, move the in-flight writes currently @@ -212,17 +221,24 @@ func (tc *txnCommitter) SendLocked( return nil, kvpb.NewErrorf("unexpected response status without error: %v", br.Txn) } - // Determine whether the transaction needs to either retry or refresh. When - // the EndTxn request evaluated while STAGING the transaction record, it - // performed this check. However, the transaction proto may have changed due - // to writes evaluated concurrently with the EndTxn even if none of those - // writes returned an error. Remember that the transaction proto we see here - // could be a combination of protos from responses, all merged by - // DistSender. - if pErr := needTxnRetryAfterStaging(br); pErr != nil { - log.VEventf(ctx, 2, "parallel commit failed since some writes were pushed. "+ - "Synthesized err: %s", pErr) - return nil, pErr + // Determine whether the transaction satisfies the implicit commit condition. + // If not, it needs to retry the EndTxn request, and possibly also refresh if + // it is serializable. + implicitCommit, err := isTxnCommitImplicit(br) + if err != nil { + return nil, kvpb.NewError(err) + } + + // Retry the EndTxn request (and nothing else) if the transaction does not + // satisfy the implicit commit condition. This EndTxn request will not be + // in-flight concurrently with any other writes (they all succeeded), so it + // will move the transaction record directly to the COMMITTED state. + // + // Note that we leave the transaction record that we wrote in the STAGING + // state, which is not ideal. But as long as we continue heartbeating the + // txn record, it being PENDING or STAGING does not make a difference. + if !implicitCommit { + return tc.retryTxnCommitAfterFailedParallelCommit(ctx, ba, br) } // If the transaction doesn't need to retry then it is implicitly committed! @@ -384,42 +400,71 @@ func mergeIntoSpans(s []roachpb.Span, ws []roachpb.SequencedWrite) ([]roachpb.Sp return roachpb.MergeSpans(&m) } -// needTxnRetryAfterStaging determines whether the transaction needs to refresh -// (see txnSpanRefresher) or retry based on the batch response of a parallel -// commit attempt. -func needTxnRetryAfterStaging(br *kvpb.BatchResponse) *kvpb.Error { +// isTxnCommitImplicit determines whether the transaction has satisfied the +// implicit commit requirements. It is used to determine whether the transaction +// needs to retry its EndTxn based on the response to a parallel commit attempt. +func isTxnCommitImplicit(br *kvpb.BatchResponse) (bool, error) { if len(br.Responses) == 0 { - return kvpb.NewErrorf("no responses in BatchResponse: %v", br) + return false, errors.AssertionFailedf("no responses in BatchResponse: %v", br) } lastResp := br.Responses[len(br.Responses)-1].GetInner() etResp, ok := lastResp.(*kvpb.EndTxnResponse) if !ok { - return kvpb.NewErrorf("unexpected response in BatchResponse: %v", lastResp) + return false, errors.AssertionFailedf("unexpected response in BatchResponse: %v", lastResp) } if etResp.StagingTimestamp.IsEmpty() { - return kvpb.NewErrorf("empty StagingTimestamp in EndTxnResponse: %v", etResp) + return false, errors.AssertionFailedf("empty StagingTimestamp in EndTxnResponse: %v", etResp) } - if etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) { - // If the timestamp that the transaction record was staged at - // is less than the timestamp of the transaction in the batch - // response then one of the concurrent writes was pushed to - // a higher timestamp. This violates the "implicit commit" - // condition and neither the transaction coordinator nor any - // other concurrent actor will consider this transaction to - // be committed as is. - // Note that we leave the transaction record that we wrote in the STAGING - // state, which is not ideal. But as long as we continue heartbeating the - // txn record, it being PENDING or STAGING does not make a difference. - reason := kvpb.RETRY_SERIALIZABLE - if br.Txn.WriteTooOld { - reason = kvpb.RETRY_WRITE_TOO_OLD - } - err := kvpb.NewTransactionRetryError( - reason, "serializability failure concurrent with STAGING") - txn := cloneWithStatus(br.Txn, roachpb.PENDING) - return kvpb.NewErrorWithTxn(err, txn) + // If the timestamp that the transaction record was staged at is less than + // the timestamp of the transaction in the batch response then one of the + // concurrent writes was pushed to a higher timestamp. This violates the + // "implicit commit" condition and neither the transaction coordinator nor + // any other concurrent actor will consider this transaction to be committed + // as is. + failed := etResp.StagingTimestamp.Less(br.Txn.WriteTimestamp) + return !failed, nil +} + +// retryTxnCommitAfterFailedParallelCommit retries the batch's EndTxn request +// after the batch has previously succeeded (with the response br), but failed +// to qualify for the implicit commit condition. This EndTxn request will not be +// in-flight concurrently with any other writes (they all succeeded), so it will +// move the transaction record directly to the COMMITTED state. +// +// If successful, the response for the re-issued EndTxn request is stitched back +// together with the rest of the BatchResponse and returned. +func (tc *txnCommitter) retryTxnCommitAfterFailedParallelCommit( + ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse, +) (*kvpb.BatchResponse, *kvpb.Error) { + log.Eventf(ctx, "parallel commit failed; retrying EndTxn request") + tc.metrics.ParallelCommitAutoRetries.Inc(1) + + // Issue a batch containing only the EndTxn request. + etIdx := len(ba.Requests) - 1 + baSuffix := ba.ShallowCopy() + baSuffix.Requests = ba.Requests[etIdx:] + baSuffix.Txn = cloneWithStatus(br.Txn, roachpb.PENDING) + // Update the EndTxn request to move the in-flight writes currently attached + // to the EndTxn request to the LockSpans and clear the in-flight write set; + // the writes already succeeded and will not be in-flight concurrently with + // the EndTxn request. + { + et := baSuffix.Requests[0].GetEndTxn().ShallowCopy().(*kvpb.EndTxnRequest) + et.LockSpans, _ = mergeIntoSpans(et.LockSpans, et.InFlightWrites) + et.InFlightWrites = nil + baSuffix.Requests[0].Value.(*kvpb.RequestUnion_EndTxn).EndTxn = et } - return nil + brSuffix, pErr := tc.wrapped.SendLocked(ctx, baSuffix) + if pErr != nil { + return nil, pErr + } + + // Combine the responses. + br.Responses[etIdx] = kvpb.ResponseUnion{} + if err := br.Combine(ctx, brSuffix, []int{etIdx}, ba); err != nil { + return nil, kvpb.NewError(err) + } + return br, nil } // makeTxnCommitExplicitAsync launches an async task that attempts to move the diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go index a8c8585f9e23..eb58244b724b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" @@ -27,10 +28,12 @@ import ( func makeMockTxnCommitter() (txnCommitter, *mockLockedSender) { mockSender := &mockLockedSender{} + metrics := MakeTxnMetrics(metric.TestSampleInterval) return txnCommitter{ st: cluster.MakeTestingClusterSettings(), stopper: stop.NewStopper(), wrapped: mockSender, + metrics: &metrics, mu: new(syncutil.Mutex), }, mockSender } @@ -402,61 +405,78 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) { <-explicitCommitCh } -// TestTxnCommitterRetryAfterStaging verifies that txnCommitter returns a retry -// error when a write performed in parallel with staging a transaction is pushed -// to a timestamp above the staging timestamp. +// TestTxnCommitterRetryAfterStaging verifies that txnCommitter performs a +// parallel commit auto-retry when a write performed in parallel with staging a +// transaction is pushed to a timestamp above the staging timestamp. func TestTxnCommitterRetryAfterStaging(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - testutils.RunTrueAndFalse(t, "WriteTooOld", func(t *testing.T, writeTooOld bool) { - tc, mockSender := makeMockTxnCommitter() - defer tc.stopper.Stop(ctx) + tc, mockSender := makeMockTxnCommitter() + defer tc.stopper.Stop(ctx) - txn := makeTxnProto() - keyA := roachpb.Key("a") + txn := makeTxnProto() + keyA := roachpb.Key("a") - ba := &kvpb.BatchRequest{} - ba.Header = kvpb.Header{Txn: &txn} - putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} - etArgs := kvpb.EndTxnRequest{Commit: true} - putArgs.Sequence = 1 - etArgs.Sequence = 2 - etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} - ba.Add(&putArgs, &etArgs) + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyA}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}} + ba.Add(&putArgs, &etArgs) - mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 2) - require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) - require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + onFirstSend := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) - et := ba.Requests[1].GetInner().(*kvpb.EndTxnRequest) - require.True(t, et.Commit) - require.Len(t, et.InFlightWrites, 1) - require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) + et := ba.Requests[1].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.LockSpans) + require.Len(t, et.InFlightWrites, 1) + require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0]) - br := ba.CreateReply() - br.Txn = ba.Txn - br.Txn.Status = roachpb.STAGING - br.Responses[1].GetInner().(*kvpb.EndTxnResponse).StagingTimestamp = br.Txn.WriteTimestamp - - // Pretend the PutRequest was split and sent to a different Range. It - // could hit the timestamp cache, or a WriteTooOld error (which sets the - // WriteTooOld flag). The intent will be written but the response - // transaction's timestamp will be larger than the staging timestamp. - br.Txn.WriteTooOld = writeTooOld - br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) - return br, nil - }) + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + br.Responses[1].GetEndTxn().StagingTimestamp = br.Txn.WriteTimestamp + + // Pretend the PutRequest was split and sent to a different Range. It + // could hit the timestamp cache or a WriteTooOld error (which sets the + // WriteTooOld flag which is stripped by the txnSpanRefresher). The intent + // will be written but the response transaction's timestamp will be larger + // than the staging timestamp. + br.Txn.WriteTimestamp = br.Txn.WriteTimestamp.Add(1, 0) + return br, nil + } + sawRetry := false + onRetry := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + sawRetry = true + require.Len(t, ba.Requests, 1) + require.Equal(t, roachpb.PENDING, ba.Txn.Status) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) - br, pErr := tc.SendLocked(ctx, ba) - require.Nil(t, br) - require.NotNil(t, pErr) - require.IsType(t, &kvpb.TransactionRetryError{}, pErr.GetDetail()) - expReason := kvpb.RETRY_SERIALIZABLE - if writeTooOld { - expReason = kvpb.RETRY_WRITE_TOO_OLD - } - require.Equal(t, expReason, pErr.GetDetail().(*kvpb.TransactionRetryError).Reason) - }) + et := ba.Requests[0].GetEndTxn() + require.True(t, et.Commit) + require.Nil(t, et.InFlightWrites) + require.Len(t, et.LockSpans, 1) + require.Equal(t, roachpb.Span{Key: keyA}, et.LockSpans[0]) + + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onFirstSend, onRetry) + + br, pErr := tc.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) + require.True(t, sawRetry) + require.Equal(t, int64(1), tc.metrics.ParallelCommitAutoRetries.Count()) } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index c4e5bdc94b29..0b01acf3e683 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/google/btree" ) @@ -363,10 +364,10 @@ func (tp *txnPipeliner) attachLocksToEndTxn( } et := args.(*kvpb.EndTxnRequest) if len(et.LockSpans) > 0 { - return ba, kvpb.NewErrorf("client must not pass intents to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass intents to EndTxn")) } if len(et.InFlightWrites) > 0 { - return ba, kvpb.NewErrorf("client must not pass in-flight writes to EndTxn") + return ba, kvpb.NewError(errors.AssertionFailedf("client must not pass in-flight writes to EndTxn")) } // Populate et.LockSpans and et.InFlightWrites. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index c7622c968f58..54248d6ddcf2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -300,6 +300,18 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( refreshFrom := txn.ReadTimestamp refreshToTxn := txn.Clone() refreshToTxn.Refresh(refreshTS) + switch refreshToTxn.Status { + case roachpb.PENDING: + case roachpb.STAGING: + // If the batch resulted in an error but the EndTxn request succeeded, + // staging the transaction record in the process, downgrade the status + // back to PENDING. Even though the transaction record may have a status + // of STAGING, we know that the transaction failed to implicitly commit. + refreshToTxn.Status = roachpb.PENDING + default: + return nil, kvpb.NewError(errors.AssertionFailedf( + "unexpected txn status during refresh: %v", refreshToTxn)) + } log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshToTxn.ReadTimestamp, pErr) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index cd570842d7e6..08e5b231d68e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -350,6 +350,102 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { } } +// TestTxnSpanRefresherDowngradesStagingTxnStatus tests that the txnSpanRefresher +// tolerates retry errors with a STAGING transaction status. In such cases, it +// will downgrade the status to PENDING before refreshing and retrying, because +// the presence of an error proves that the transaction failed to implicitly +// commit. +func TestTxnSpanRefresherDowngradesStagingTxnStatus(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tsr, mockSender := makeMockTxnSpanRefresher() + + txn := makeTxnProto() + keyA, keyB := roachpb.Key("a"), roachpb.Key("b") + conflictTs := txn.WriteTimestamp.Add(15, 0) + + // Collect some refresh spans. + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + scanArgs := kvpb.ScanRequest{RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyB}} + ba.Add(&scanArgs) + + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) + require.False(t, tsr.refreshInvalid) + require.Zero(t, tsr.refreshedTimestamp) + + // Hook up a chain of mocking functions. + onPutAndEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[1].GetInner()) + require.True(t, ba.Requests[1].GetEndTxn().IsParallelCommit()) + + // Return a write-too-old error with a STAGING status, emulating a + // successful EndTxn request and a failed Put request. This mixed success + // state is possible if the requests were split across ranges. + pErrTxn := ba.Txn.Clone() + pErrTxn.Status = roachpb.STAGING + pErr := &kvpb.WriteTooOldError{ActualTimestamp: conflictTs} + return nil, kvpb.NewErrorWithTxn(pErr, pErrTxn) + } + onRefresh := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.Equal(t, conflictTs, ba.Txn.ReadTimestamp) + require.IsType(t, &kvpb.RefreshRangeRequest{}, ba.Requests[0].GetInner()) + + refReq := ba.Requests[0].GetRefreshRange() + require.Equal(t, scanArgs.Span(), refReq.Span()) + require.Equal(t, txn.ReadTimestamp, refReq.RefreshFrom) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onPut := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + return br, nil + } + onEndTxn := func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Equal(t, roachpb.PENDING, ba.Txn.Status) // downgraded + require.Len(t, ba.Requests, 1) + require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) + require.False(t, ba.Requests[0].GetEndTxn().IsParallelCommit()) + + br = ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + return br, nil + } + mockSender.ChainMockSend(onPutAndEndTxn, onRefresh, onPut, onEndTxn) + + // Send a request that will hit a write-too-old error while also returning a + // STAGING transaction status. + ba.Requests = nil + putArgs := kvpb.PutRequest{RequestHeader: kvpb.RequestHeader{Key: keyB}} + etArgs := kvpb.EndTxnRequest{Commit: true} + putArgs.Sequence = 1 + etArgs.Sequence = 2 + etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyB, Sequence: 1}} + ba.Add(&putArgs, &etArgs) + + br, pErr = tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.Equal(t, roachpb.COMMITTED, br.Txn.Status) +} + // TestTxnSpanRefresherMaxRefreshAttempts tests that the txnSpanRefresher // attempts some number of retries before giving up and passing retryable // errors back up the stack. diff --git a/pkg/kv/kvclient/kvcoord/txn_metrics.go b/pkg/kv/kvclient/kvcoord/txn_metrics.go index ac3c8f2fcd15..d5f2ff42e434 100644 --- a/pkg/kv/kvclient/kvcoord/txn_metrics.go +++ b/pkg/kv/kvclient/kvcoord/txn_metrics.go @@ -19,11 +19,12 @@ import ( // TxnMetrics holds all metrics relating to KV transactions. type TxnMetrics struct { - Aborts *metric.Counter - Commits *metric.Counter - Commits1PC *metric.Counter // Commits which finished in a single phase - ParallelCommits *metric.Counter // Commits which entered the STAGING state - CommitWaits *metric.Counter // Commits that waited for linearizability + Aborts *metric.Counter + Commits *metric.Counter + Commits1PC *metric.Counter // Commits which finished in a single phase + ParallelCommits *metric.Counter // Commits which entered the STAGING state + ParallelCommitAutoRetries *metric.Counter // Commits which were retried after entering the STAGING state + CommitWaits *metric.Counter // Commits that waited for linearizability ClientRefreshSuccess *metric.Counter ClientRefreshFail *metric.Counter @@ -82,6 +83,12 @@ var ( Measurement: "KV Transactions", Unit: metric.Unit_COUNT, } + metaParallelCommitAutoRetries = metric.Metadata{ + Name: "txn.parallelcommits.auto_retries", + Help: "Number of commit tries after successful failed parallel commit attempts", + Measurement: "Retries", + Unit: metric.Unit_COUNT, + } metaCommitWaitCount = metric.Metadata{ Name: "txn.commit_waits", Help: "Number of KV transactions that had to commit-wait on commit " + @@ -275,6 +282,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics { Commits: metric.NewCounter(metaCommitsRates), Commits1PC: metric.NewCounter(metaCommits1PCRates), ParallelCommits: metric.NewCounter(metaParallelCommitsRates), + ParallelCommitAutoRetries: metric.NewCounter(metaParallelCommitAutoRetries), CommitWaits: metric.NewCounter(metaCommitWaitCount), ClientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess), ClientRefreshFail: metric.NewCounter(metaClientRefreshFail), From ec5ac9f60d310c64b94397a5ec20dc7ed3a6fe83 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 15 May 2023 09:51:35 +0000 Subject: [PATCH 2/4] kvserver: add expiration lease escape hatch This patch adds `COCKROACH_DISABLE_EXPIRATION_LEASES_ONLY`, which can be used to hard-disable expiration-based leases, e.g. in cases where the lease extensions overload the cluster and prevent it from working, and thus prevent operators from disabling the setting. Epic: none Release note: None --- pkg/kv/kvserver/replica_range_lease.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 8439e8434a17..1b20dd632067 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -58,6 +58,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/growstack" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -90,6 +91,12 @@ var ExpirationLeasesOnly = settings.RegisterBoolSetting( util.ConstantWithMetamorphicTestBool("kv.expiration_leases_only.enabled", false), ) +// DisableExpirationLeasesOnly is an escape hatch for ExpirationLeasesOnly, +// which can be used to hard-disable expiration-based leases e.g. if clusters +// are unable to start back up due to the lease extension load. +var DisableExpirationLeasesOnly = envutil.EnvOrDefaultBool( + "COCKROACH_DISABLE_EXPIRATION_LEASES_ONLY", false) + // EagerLeaseAcquisitionConcurrency is the number of concurrent, eager lease // acquisitions made during Raft ticks, across all stores. Note that this does // not include expiration lease extensions, which are unbounded. @@ -824,7 +831,8 @@ func (r *Replica) requiresExpirationLeaseRLocked() bool { // expiration-based lease, either because it requires one or because // kv.expiration_leases_only.enabled is enabled. func (r *Replica) shouldUseExpirationLeaseRLocked() bool { - return ExpirationLeasesOnly.Get(&r.ClusterSettings().SV) || r.requiresExpirationLeaseRLocked() + return (ExpirationLeasesOnly.Get(&r.ClusterSettings().SV) && !DisableExpirationLeasesOnly) || + r.requiresExpirationLeaseRLocked() } // requestLeaseLocked executes a request to obtain or extend a lease From d59a11184381dca49021a5304fb7899e71cb8f4e Mon Sep 17 00:00:00 2001 From: adityamaru Date: Thu, 4 May 2023 18:42:12 -0400 Subject: [PATCH 3/4] backupccl,kvserver: log failed ExportRequest trace on client and server This change strives to improve observability around backups that fail because of timed out ExportRequests. Currently, there is very little indication of what the request was doing when the client cancelled the context after the pre-determined timeout window. With this change we now log the trace of the ExportRequest that failed. If the ExportRequest was served locally, then the trace will be part of the sender's tracing span. However, if the request was served on a remote node then the sender does not wait for the server side evaluation to notice the context cancellation. To work around this, we also print the trace on the server side if the request encountered a context cancellation and the associating tracing span is not a noop. This change also adds a private cluster setting `bulkio.backup.export_request_verbose_tracing` that if set to true will send all backup export requests with verbose tracing mode. To debug a backup failing with a timed out export request we can now: - SET CLUSTER SETTING bulkio.backup.export_request_verbose_tracing = true; - SET CLUSTER SETTING trace.snapshot.rate = '1m' Once the backup times out we can look at the logs for the server side and client side ExportRequest traces to then understand where we were stuck executing for so long. Fixes: #86047 Release note: None --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_processor.go | 29 ++++++++++++++++++-- pkg/kv/kvserver/batcheval/cmd_export.go | 9 ++++++ pkg/kv/sender.go | 8 +++--- pkg/server/node.go | 11 ++++++++ pkg/sql/sem/builtins/fingerprint_builtins.go | 12 ++++++-- 6 files changed, 62 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index acbecab0c4ec..428118cc68c7 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -150,6 +150,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 35d4cdc5fc86..98a9e3eb5d65 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" @@ -55,7 +57,7 @@ var ( time.Minute, settings.NonNegativeDuration, ).WithPublic() - delayPerAttmpt = settings.RegisterDurationSetting( + delayPerAttempt = settings.RegisterDurationSetting( settings.TenantWritable, "bulkio.backup.read_retry_delay", "amount of time since the read-as-of time, per-prior attempt, to wait before making another attempt", @@ -82,6 +84,13 @@ var ( "split backup data on timestamps when writing revision history", true, ) + + sendExportRequestWithVerboseTracing = settings.RegisterBoolSetting( + settings.TenantWritable, + "bulkio.backup.export_request_verbose_tracing", + "send each export request with a verbose tracing span", + util.ConstantWithMetamorphicTestBool("export_request_verbose_tracing", false), + ) ) const ( @@ -378,7 +387,7 @@ func runBackupProcessor( // We're okay with delaying this worker until then since we assume any // other work it could pull off the queue will likely want to delay to // a similar or later time anyway. - if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + if delay := delayPerAttempt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { timer.Reset(delay) log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) select { @@ -427,13 +436,22 @@ func runBackupProcessor( log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawResp kvpb.Response + var recording tracingpb.Recording var pErr *kvpb.Error requestSentAt := timeutil.Now() exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + sp := tracing.SpanFromContext(ctx) + opts := make([]tracing.SpanOption, 0) + opts = append(opts, tracing.WithParent(sp)) + if sendExportRequestWithVerboseTracing.Get(&clusterSettings.SV) { + opts = append(opts, tracing.WithRecording(tracingpb.RecordingVerbose)) + } + ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "backupccl.ExportRequest", opts...) rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.KV().NonTransactionalSender(), header, admissionHeader, req) + recording = exportSpan.FinishAndGetConfiguredRecording() if pErr != nil { return pErr.GoError() } @@ -453,6 +471,9 @@ func runBackupProcessor( // TimeoutError improves the opaque `context deadline exceeded` error // message so use that instead. if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { + if recording != nil { + log.Errorf(ctx, "failed export request for span %s\n trace:\n%s", span.span, recording) + } return errors.Wrap(exportRequestErr, "export request timeout") } // BatchTimestampBeforeGCError is returned if the ExportRequest @@ -467,6 +488,10 @@ func runBackupProcessor( continue } } + + if recording != nil { + log.Errorf(ctx, "failed export request %s\n trace:\n%s", span.span, recording) + } return errors.Wrapf(exportRequestErr, "exporting %s", span.span) } diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index bb004fbc49ab..fd4e245a92f9 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -235,6 +235,15 @@ func evalExport( return result.Result{}, maybeAnnotateExceedMaxSizeError(err) } } + + // Check if our ctx has been cancelled while we were constructing the SST. + // If it has been cancelled the client is no longer expecting a response. + select { + case <-ctx.Done(): + return result.Result{}, ctx.Err() + default: + } + data := destFile.Bytes() // NB: This should only happen in two cases: diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index 2767cae51716..b4b5e2d8ddb5 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -424,10 +424,10 @@ func SendWrappedWith( return SendWrappedWithAdmission(ctx, sender, h, kvpb.AdmissionHeader{}, args) } -// SendWrappedWithAdmission is a convenience function which wraps the request -// in a batch and sends it via the provided Sender and headers. It returns the -// unwrapped response or an error. It's valid to pass a `nil` context; an -// empty one is used in that case. +// SendWrappedWithAdmission is a convenience function which wraps the request in +// a batch and sends it via the provided Sender and headers. It returns the +// unwrapped response or an error. It's valid to pass a `nil` context; an empty +// one is used in that case. func SendWrappedWithAdmission( ctx context.Context, sender Sender, h kvpb.Header, ah kvpb.AdmissionHeader, args kvpb.Request, ) (kvpb.Response, *kvpb.Error) { diff --git a/pkg/server/node.go b/pkg/server/node.go index c319d1f3a582..269533ab8f7d 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1218,6 +1218,17 @@ func (n *Node) batchInternal( tracing.SpanFromContext(ctx).MaybeRecordStackHistory(tStart) } + // If the sender cancelled the context they may not wait around for the + // replica to notice the cancellation and return a response. For this reason, + // we log the server-side trace of the cancelled request to help debug what + // the request was doing at the time it noticed the cancellation. + if pErr != nil && errors.IsAny(pErr.GoError(), context.Canceled, context.DeadlineExceeded) { + if sp := tracing.SpanFromContext(ctx); sp != nil && !sp.IsNoop() { + log.Infof(ctx, "batch request %s failed with error: %s\ntrace:\n%s", args.String(), + pErr.GoError().Error(), sp.GetConfiguredRecording().String()) + } + } + n.metrics.callComplete(timeutil.Since(tStart), pErr) br.Error = pErr diff --git a/pkg/sql/sem/builtins/fingerprint_builtins.go b/pkg/sql/sem/builtins/fingerprint_builtins.go index 236a763e30fd..07069a890558 100644 --- a/pkg/sql/sem/builtins/fingerprint_builtins.go +++ b/pkg/sql/sem/builtins/fingerprint_builtins.go @@ -25,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" ) @@ -90,19 +92,25 @@ func fingerprint( ExportFingerprint: true, FingerprintOptions: kvpb.FingerprintOptions{StripIndexPrefixAndTimestamp: stripped}} var rawResp kvpb.Response + var recording tracingpb.Recording var pErr *kvpb.Error exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest fingerprint for span %s", roachpb.Span{Key: span.Key, EndKey: span.EndKey}), 5*time.Minute, func(ctx context.Context) error { - rawResp, pErr = kv.SendWrappedWithAdmission(ctx, - evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req) + sp := tracing.SpanFromContext(ctx) + ctx, exportSpan := sp.Tracer().StartSpanCtx(ctx, "fingerprint.ExportRequest", tracing.WithParent(sp)) + rawResp, pErr = kv.SendWrappedWithAdmission(ctx, evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req) + recording = exportSpan.FinishAndGetConfiguredRecording() if pErr != nil { return pErr.GoError() } return nil }) if exportRequestErr != nil { + if recording != nil { + log.Errorf(ctx, "failed export request trace:\n%s", recording) + } return nil, exportRequestErr } From 13d23f0c209a7409a4aaf7a0e44a1a6ed2a2968c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 15 May 2023 11:56:38 -0700 Subject: [PATCH 4/4] logictest: increase max-sql-memory from 192MiB to 256MiB We have seen some rare flakes where the validation (after all the test queries are done) hits "memory budget exceeded" on the root monitor. To avoid this, let's just bump the root pool a little. Release note: None --- pkg/sql/logictest/logic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 0ce2b4737c3e..8e2b30ef69a0 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -1326,7 +1326,7 @@ func (t *logicTest) newCluster( if serverArgs.MaxSQLMemoryLimit == 0 { // Specify a fixed memory limit (some test cases verify OOM conditions; // we don't want those to take long on large machines). - serverArgs.MaxSQLMemoryLimit = 192 * 1024 * 1024 + serverArgs.MaxSQLMemoryLimit = 256 * 1024 * 1024 } // We have some queries that bump into 100MB default temp storage limit // when run with fakedist-disk config, so we'll use a larger limit here.