From e0ad9a72e5b0e4e3d813e0361fef5bd8180f8210 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 9 Jun 2022 00:23:34 -0400 Subject: [PATCH] kv: only use txnSpanRefresher.refreshedTimestamp for assertions Related to #68051. This is a partial reversion of d6ec977 which downgrades the role of `txnSpanRefresher.refreshedTimestamp` back to being used as a sanity check that we don't allow incoherent refresh spans into the refresh footprint. We no longer use the field to determine where to refresh from. Instead, we use the pre-refreshed BatchRequest.Txn.ReadTimestamp to determine the lower-bound of the refresh. This avoids some awkward logic in txnSpanRefresher.SendLocked (e.g. the logic needed in b9fb236). It also avoids the kinds of issues we saw when trying to expand the use of manual refreshing in #68051. Release note: None. --- .../kvcoord/txn_interceptor_span_refresher.go | 140 ++++++++++-------- .../txn_interceptor_span_refresher_test.go | 24 +-- 2 files changed, 87 insertions(+), 77 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index a014397bd7a4..579af29bc0b6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -120,11 +120,10 @@ type txnSpanRefresher struct { // set when we've failed to condense the refresh spans below the target memory // limit. refreshInvalid bool - - // refreshedTimestamp keeps track of the largest timestamp that refreshed - // don't fail on (i.e. if we'll refresh, we'll refreshFrom timestamp onwards). - // After every epoch bump, it is initialized to the timestamp of the first - // batch. It is then bumped after every successful refresh. + // refreshedTimestamp keeps track of the largest timestamp that a transaction + // was able to refresh all of its refreshable spans to. It is updated under + // lock and used to ensure that concurrent requests don't cause the refresh + // spans to get out of sync. See rejectRefreshSpansAtInvalidTimestamp. refreshedTimestamp hlc.Timestamp // canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry. @@ -141,29 +140,6 @@ type txnSpanRefresher struct { func (sr *txnSpanRefresher) SendLocked( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - batchReadTimestamp := ba.Txn.ReadTimestamp - if sr.refreshedTimestamp.IsEmpty() { - // This must be the first batch we're sending for this epoch. Future - // refreshes shouldn't check values below batchReadTimestamp, so initialize - // sr.refreshedTimestamp. - sr.refreshedTimestamp = batchReadTimestamp - } else if batchReadTimestamp.Less(sr.refreshedTimestamp) { - // sr.refreshedTimestamp might be ahead of batchReadTimestamp. We want to - // read at the latest refreshed timestamp, so bump the batch. - // batchReadTimestamp can be behind after a successful refresh, if the - // TxnCoordSender hasn't actually heard about the updated read timestamp. - // This can happen if a refresh succeeds, but then the retry of the batch - // that produced the timestamp fails without returning the update txn (for - // example, through a canceled ctx). The client should only be sending - // rollbacks in such cases. - ba.Txn.ReadTimestamp.Forward(sr.refreshedTimestamp) - ba.Txn.WriteTimestamp.Forward(sr.refreshedTimestamp) - } else if sr.refreshedTimestamp != batchReadTimestamp { - return nil, roachpb.NewError(errors.AssertionFailedf( - "unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s. txn: %s", - batchReadTimestamp, sr.refreshedTimestamp, ba, ba.Txn)) - } - // Set the batch's CanForwardReadTimestamp flag. ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn) @@ -188,6 +164,9 @@ func (sr *txnSpanRefresher) SendLocked( // Iterate over and aggregate refresh spans in the requests, qualified by // possible resume spans in the responses. if !sr.refreshInvalid { + if err := sr.rejectRefreshSpansAtInvalidTimestamp(br.Txn.ReadTimestamp); err != nil { + return nil, roachpb.NewError(err) + } if err := sr.appendRefreshSpans(ctx, ba, br); err != nil { return nil, roachpb.NewError(err) } @@ -286,7 +265,9 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted") } } - sr.forwardRefreshTimestampOnResponse(br, pErr) + if err := sr.forwardRefreshTimestampOnResponse(&ba, br, pErr); err != nil { + return nil, roachpb.NewError(err) + } return br, pErr } @@ -307,12 +288,14 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( if !ok { return nil, pErr } - refreshTxn := txn.Clone() - refreshTxn.Refresh(refreshTS) - log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr) + refreshFrom := txn.ReadTimestamp + refreshToTxn := txn.Clone() + refreshToTxn.Refresh(refreshTS) + log.VEventf(ctx, 2, "trying to refresh to %s because of %s", + refreshToTxn.ReadTimestamp, pErr) // Try refreshing the txn spans so we can retry. - if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshTxn); refreshErr != nil { + if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshFrom, refreshToTxn); refreshErr != nil { log.Eventf(ctx, "refresh failed; propagating original retry error") // TODO(lidor): we should add refreshErr info to the returned error. See issue #41057. return nil, pErr @@ -321,7 +304,7 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend( // We've refreshed all of the read spans successfully and bumped // ba.Txn's timestamps. Attempt the request again. log.Eventf(ctx, "refresh succeeded; retrying original request") - ba.UpdateTxn(refreshTxn) + ba.UpdateTxn(refreshToTxn) sr.refreshAutoRetries.Inc(1) // To prevent starvation of batches that are trying to commit, split off the @@ -461,19 +444,20 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked( return ba, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn, nil) } - refreshTxn := ba.Txn.Clone() - refreshTxn.Refresh(ba.Txn.WriteTimestamp) + refreshFrom := ba.Txn.ReadTimestamp + refreshToTxn := ba.Txn.Clone() + refreshToTxn.Refresh(ba.Txn.WriteTimestamp) log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s", - refreshTxn.ReadTimestamp, ba) + refreshToTxn.ReadTimestamp, ba) // Try refreshing the txn spans at a timestamp that will allow us to commit. - if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshTxn); refreshErr != nil { + if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshFrom, refreshToTxn); refreshErr != nil { log.Eventf(ctx, "preemptive refresh failed; propagating retry error") return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn, refreshErr) } log.Eventf(ctx, "preemptive refresh succeeded") - ba.UpdateTxn(refreshTxn) + ba.UpdateTxn(refreshToTxn) return ba, nil } @@ -498,14 +482,13 @@ func newRetryErrorOnFailedPreemptiveRefresh( // tryRefreshTxnSpans sends Refresh and RefreshRange commands to all spans read // during the transaction to ensure that no writes were written more recently -// than sr.refreshedTimestamp. All implicated timestamp caches are updated with -// the final transaction timestamp. Returns whether the refresh was successful -// or not. +// than refreshFrom. All implicated timestamp caches are updated with the final +// transaction timestamp. Returns whether the refresh was successful or not. // // The provided transaction should be a Clone() of the original transaction with // its ReadTimestamp adjusted by the Refresh() method. func (sr *txnSpanRefresher) tryRefreshTxnSpans( - ctx context.Context, refreshTxn *roachpb.Transaction, + ctx context.Context, refreshFrom hlc.Timestamp, refreshToTxn *roachpb.Transaction, ) (err *roachpb.Error) { // Track the result of the refresh in metrics. defer func() { @@ -524,14 +507,14 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans( return roachpb.NewError(errors.AssertionFailedf("can't refresh txn spans; not valid")) } else if sr.refreshFootprint.empty() { log.VEvent(ctx, 2, "there are no txn spans to refresh") - sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp) + sr.forwardRefreshTimestampOnRefresh(refreshToTxn) return nil } // Refresh all spans (merge first). // TODO(nvanbenschoten): actually merge spans. refreshSpanBa := roachpb.BatchRequest{} - refreshSpanBa.Txn = refreshTxn + refreshSpanBa.Txn = refreshToTxn addRefreshes := func(refreshes *condensableSpanSet) { // We're going to check writes between the previous refreshed timestamp, if // any, and the timestamp we want to bump the transaction to. Note that if @@ -549,17 +532,17 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans( if len(u.EndKey) == 0 { req = &roachpb.RefreshRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(u), - RefreshFrom: sr.refreshedTimestamp, + RefreshFrom: refreshFrom, } } else { req = &roachpb.RefreshRangeRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(u), - RefreshFrom: sr.refreshedTimestamp, + RefreshFrom: refreshFrom, } } refreshSpanBa.Add(req) log.VEventf(ctx, 2, "updating span %s @%s - @%s to avoid serializable restart", - req.Header().Span(), sr.refreshedTimestamp, refreshTxn.WriteTimestamp) + req.Header().Span(), refreshFrom, refreshToTxn.WriteTimestamp) } } addRefreshes(&sr.refreshFootprint) @@ -570,7 +553,7 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans( return batchErr } - sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp) + sr.forwardRefreshTimestampOnRefresh(refreshToTxn) return nil } @@ -579,15 +562,6 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans( func (sr *txnSpanRefresher) appendRefreshSpans( ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse, ) error { - readTimestamp := br.Txn.ReadTimestamp - if readTimestamp.Less(sr.refreshedTimestamp) { - // This can happen with (illegal) concurrent txn use, but that's supposed to - // be detected by the gatekeeper interceptor. - return errors.AssertionFailedf("attempting to append refresh spans after the tracked"+ - " timestamp has moved forward. batchTimestamp: %s refreshedTimestamp: %s ba: %s", - errors.Safe(readTimestamp), errors.Safe(sr.refreshedTimestamp), ba) - } - ba.RefreshSpanIterate(br, func(span roachpb.Span) { if log.ExpensiveLogEnabled(ctx, 3) { log.VEventf(ctx, 3, "recording span to refresh: %s", span.String()) @@ -622,21 +596,54 @@ func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.T return sr.canForwardReadTimestamp(txn) && !sr.refreshInvalid && sr.refreshFootprint.empty() } +// forwardRefreshTimestampOnRefresh updates the refresher's tracked +// refreshedTimestamp under lock after a successful refresh. This in conjunction +// with a check in rejectRefreshSpansAtInvalidTimestamp prevents a race where a +// concurrent request may add new refresh spans only "verified" up to its batch +// timestamp after we've refreshed past that timestamp. +func (sr *txnSpanRefresher) forwardRefreshTimestampOnRefresh(refreshToTxn *roachpb.Transaction) { + sr.refreshedTimestamp.Forward(refreshToTxn.ReadTimestamp) +} + // forwardRefreshTimestampOnResponse updates the refresher's tracked // refreshedTimestamp to stay in sync with "server-side refreshes", where the // transaction's read timestamp is updated during the evaluation of a batch. func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse( - br *roachpb.BatchResponse, pErr *roachpb.Error, -) { - var txn *roachpb.Transaction + ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, +) error { + baTxn := ba.Txn + var brTxn *roachpb.Transaction if pErr != nil { - txn = pErr.GetTxn() + brTxn = pErr.GetTxn() } else { - txn = br.Txn + brTxn = br.Txn + } + if baTxn == nil || brTxn == nil { + return nil } - if txn != nil { - sr.refreshedTimestamp.Forward(txn.ReadTimestamp) + if baTxn.ReadTimestamp.Less(brTxn.ReadTimestamp) { + sr.refreshedTimestamp.Forward(brTxn.ReadTimestamp) + } else if brTxn.ReadTimestamp.Less(baTxn.ReadTimestamp) { + return errors.AssertionFailedf("received transaction in response with "+ + "earlier read timestamp than in the request. ba.Txn: %s, br.Txn: %s", baTxn, brTxn) } + return nil +} + +// rejectRefreshSpansAtInvalidTimestamp returns an error if the timestamp at +// which a set of reads was performed is below the largest timestamp that this +// transaction has already refreshed to. +func (sr *txnSpanRefresher) rejectRefreshSpansAtInvalidTimestamp( + readTimestamp hlc.Timestamp, +) error { + if readTimestamp.Less(sr.refreshedTimestamp) { + // This can happen with (illegal) concurrent txn use, but that's supposed to + // be detected by the gatekeeper interceptor. + return errors.AssertionFailedf("attempting to append refresh spans after the tracked"+ + " timestamp has moved forward. batchTimestamp: %s refreshedTimestamp: %s", + errors.Safe(readTimestamp), errors.Safe(sr.refreshedTimestamp)) + } + return nil } // maxRefreshAttempts returns the configured number of times that a transaction @@ -676,6 +683,9 @@ func (sr *txnSpanRefresher) importLeafFinalState( sr.refreshInvalid = true sr.refreshFootprint.clear() } else if !sr.refreshInvalid { + if err := sr.rejectRefreshSpansAtInvalidTimestamp(tfs.Txn.ReadTimestamp); err != nil { + log.Fatalf(ctx, "%s", err) + } sr.refreshFootprint.insert(tfs.RefreshSpans...) sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV)) } diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 7ae3a41ac2be..166aad8b9fe2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -82,7 +82,7 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) require.Equal(t, int64(3), tsr.refreshFootprint.bytes) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Scan with limit. Only the scanned keys are added to the refresh spans. ba.Requests = nil @@ -109,7 +109,7 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) require.Equal(t, int64(5), tsr.refreshFootprint.bytes) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) } // TestTxnSpanRefresherRefreshesTransactions tests that the txnSpanRefresher @@ -221,7 +221,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. onFirstSend := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -290,7 +290,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { } else { require.Nil(t, br) require.NotNil(t, pErr) - require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) require.Equal(t, int64(0), tsr.refreshSuccess.Count()) require.Equal(t, int64(0), tsr.refreshAutoRetries.Count()) // Note that we don't check the tsr.refreshFail metric here as tests @@ -326,7 +326,7 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) { require.NotNil(t, br) require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Hook up a chain of mocking functions. onPut := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { @@ -712,7 +712,7 @@ func TestTxnSpanRefresherSplitEndTxnOnAutoRetry(t *testing.T) { require.NotNil(t, br) require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) ba.Requests = nil ba.Add(&putArgs, &etArgs) @@ -878,7 +878,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) require.Equal(t, int64(2), tsr.refreshFootprint.bytes) // Send another batch that pushes us above the limit. The tracked spans are @@ -896,7 +896,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, int64(2), tsr.refreshFootprint.bytes) require.False(t, tsr.refreshFootprint.condensed) require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count()) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Exceed the limit again, this time with a non-adjacent span such that // condensing needs to occur. @@ -911,7 +911,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyE}}, tsr.refreshFootprint.asSlice()) require.True(t, tsr.refreshFootprint.condensed) require.False(t, tsr.refreshInvalid) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) require.Equal(t, int64(1), tsr.refreshMemoryLimitExceeded.Count()) require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count()) @@ -1104,7 +1104,7 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) require.Equal(t, int64(2), tsr.refreshFootprint.bytes) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the spans. tsr.epochBumpedLocked() @@ -1125,14 +1125,14 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.True(t, tsr.refreshInvalid) require.Equal(t, int64(0), tsr.refreshFootprint.bytes) - require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the invalid status. tsr.epochBumpedLocked() require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp) + require.Zero(t, tsr.refreshedTimestamp) } // TestTxnSpanRefresherSavepoint checks that the span refresher can savepoint