Skip to content

Commit

Permalink
kv: only use txnSpanRefresher.refreshedTimestamp for assertions
Browse files Browse the repository at this point in the history
Related to cockroachdb#68051.

This is a partial reversion of d6ec977 which downgrades the role of
`txnSpanRefresher.refreshedTimestamp` back to being used as a sanity
check that we don't allow incoherent refresh spans into the refresh
footprint. We no longer use the field to determine where to refresh
from. Instead, we use the pre-refreshed BatchRequest.Txn.ReadTimestamp
to determine the lower-bound of the refresh.

This avoids some awkward logic in txnSpanRefresher.SendLocked (e.g. the
logic needed in b9fb236). It also avoids the kinds of issues we saw when
trying to expand the use of manual refreshing in cockroachdb#68051.

Release note: None.
  • Loading branch information
nvanbenschoten committed Jun 9, 2022
1 parent 443f1dc commit e0ad9a7
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 77 deletions.
140 changes: 75 additions & 65 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,10 @@ type txnSpanRefresher struct {
// set when we've failed to condense the refresh spans below the target memory
// limit.
refreshInvalid bool

// refreshedTimestamp keeps track of the largest timestamp that refreshed
// don't fail on (i.e. if we'll refresh, we'll refreshFrom timestamp onwards).
// After every epoch bump, it is initialized to the timestamp of the first
// batch. It is then bumped after every successful refresh.
// refreshedTimestamp keeps track of the largest timestamp that a transaction
// was able to refresh all of its refreshable spans to. It is updated under
// lock and used to ensure that concurrent requests don't cause the refresh
// spans to get out of sync. See rejectRefreshSpansAtInvalidTimestamp.
refreshedTimestamp hlc.Timestamp

// canAutoRetry is set if the txnSpanRefresher is allowed to auto-retry.
Expand All @@ -141,29 +140,6 @@ type txnSpanRefresher struct {
func (sr *txnSpanRefresher) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
batchReadTimestamp := ba.Txn.ReadTimestamp
if sr.refreshedTimestamp.IsEmpty() {
// This must be the first batch we're sending for this epoch. Future
// refreshes shouldn't check values below batchReadTimestamp, so initialize
// sr.refreshedTimestamp.
sr.refreshedTimestamp = batchReadTimestamp
} else if batchReadTimestamp.Less(sr.refreshedTimestamp) {
// sr.refreshedTimestamp might be ahead of batchReadTimestamp. We want to
// read at the latest refreshed timestamp, so bump the batch.
// batchReadTimestamp can be behind after a successful refresh, if the
// TxnCoordSender hasn't actually heard about the updated read timestamp.
// This can happen if a refresh succeeds, but then the retry of the batch
// that produced the timestamp fails without returning the update txn (for
// example, through a canceled ctx). The client should only be sending
// rollbacks in such cases.
ba.Txn.ReadTimestamp.Forward(sr.refreshedTimestamp)
ba.Txn.WriteTimestamp.Forward(sr.refreshedTimestamp)
} else if sr.refreshedTimestamp != batchReadTimestamp {
return nil, roachpb.NewError(errors.AssertionFailedf(
"unexpected batch read timestamp: %s. Expected refreshed timestamp: %s. ba: %s. txn: %s",
batchReadTimestamp, sr.refreshedTimestamp, ba, ba.Txn))
}

// Set the batch's CanForwardReadTimestamp flag.
ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn)

Expand All @@ -188,6 +164,9 @@ func (sr *txnSpanRefresher) SendLocked(
// Iterate over and aggregate refresh spans in the requests, qualified by
// possible resume spans in the responses.
if !sr.refreshInvalid {
if err := sr.rejectRefreshSpansAtInvalidTimestamp(br.Txn.ReadTimestamp); err != nil {
return nil, roachpb.NewError(err)
}
if err := sr.appendRefreshSpans(ctx, ba, br); err != nil {
return nil, roachpb.NewError(err)
}
Expand Down Expand Up @@ -286,7 +265,9 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
log.VEventf(ctx, 2, "not checking error for refresh; refresh attempts exhausted")
}
}
sr.forwardRefreshTimestampOnResponse(br, pErr)
if err := sr.forwardRefreshTimestampOnResponse(&ba, br, pErr); err != nil {
return nil, roachpb.NewError(err)
}
return br, pErr
}

Expand All @@ -307,12 +288,14 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
if !ok {
return nil, pErr
}
refreshTxn := txn.Clone()
refreshTxn.Refresh(refreshTS)
log.VEventf(ctx, 2, "trying to refresh to %s because of %s", refreshTxn.ReadTimestamp, pErr)
refreshFrom := txn.ReadTimestamp
refreshToTxn := txn.Clone()
refreshToTxn.Refresh(refreshTS)
log.VEventf(ctx, 2, "trying to refresh to %s because of %s",
refreshToTxn.ReadTimestamp, pErr)

// Try refreshing the txn spans so we can retry.
if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshTxn); refreshErr != nil {
if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshFrom, refreshToTxn); refreshErr != nil {
log.Eventf(ctx, "refresh failed; propagating original retry error")
// TODO(lidor): we should add refreshErr info to the returned error. See issue #41057.
return nil, pErr
Expand All @@ -321,7 +304,7 @@ func (sr *txnSpanRefresher) maybeRefreshAndRetrySend(
// We've refreshed all of the read spans successfully and bumped
// ba.Txn's timestamps. Attempt the request again.
log.Eventf(ctx, "refresh succeeded; retrying original request")
ba.UpdateTxn(refreshTxn)
ba.UpdateTxn(refreshToTxn)
sr.refreshAutoRetries.Inc(1)

// To prevent starvation of batches that are trying to commit, split off the
Expand Down Expand Up @@ -461,19 +444,20 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
return ba, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn, nil)
}

refreshTxn := ba.Txn.Clone()
refreshTxn.Refresh(ba.Txn.WriteTimestamp)
refreshFrom := ba.Txn.ReadTimestamp
refreshToTxn := ba.Txn.Clone()
refreshToTxn.Refresh(ba.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "preemptively refreshing to timestamp %s before issuing %s",
refreshTxn.ReadTimestamp, ba)
refreshToTxn.ReadTimestamp, ba)

// Try refreshing the txn spans at a timestamp that will allow us to commit.
if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshTxn); refreshErr != nil {
if refreshErr := sr.tryRefreshTxnSpans(ctx, refreshFrom, refreshToTxn); refreshErr != nil {
log.Eventf(ctx, "preemptive refresh failed; propagating retry error")
return roachpb.BatchRequest{}, newRetryErrorOnFailedPreemptiveRefresh(ba.Txn, refreshErr)
}

log.Eventf(ctx, "preemptive refresh succeeded")
ba.UpdateTxn(refreshTxn)
ba.UpdateTxn(refreshToTxn)
return ba, nil
}

Expand All @@ -498,14 +482,13 @@ func newRetryErrorOnFailedPreemptiveRefresh(

// tryRefreshTxnSpans sends Refresh and RefreshRange commands to all spans read
// during the transaction to ensure that no writes were written more recently
// than sr.refreshedTimestamp. All implicated timestamp caches are updated with
// the final transaction timestamp. Returns whether the refresh was successful
// or not.
// than refreshFrom. All implicated timestamp caches are updated with the final
// transaction timestamp. Returns whether the refresh was successful or not.
//
// The provided transaction should be a Clone() of the original transaction with
// its ReadTimestamp adjusted by the Refresh() method.
func (sr *txnSpanRefresher) tryRefreshTxnSpans(
ctx context.Context, refreshTxn *roachpb.Transaction,
ctx context.Context, refreshFrom hlc.Timestamp, refreshToTxn *roachpb.Transaction,
) (err *roachpb.Error) {
// Track the result of the refresh in metrics.
defer func() {
Expand All @@ -524,14 +507,14 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
return roachpb.NewError(errors.AssertionFailedf("can't refresh txn spans; not valid"))
} else if sr.refreshFootprint.empty() {
log.VEvent(ctx, 2, "there are no txn spans to refresh")
sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp)
sr.forwardRefreshTimestampOnRefresh(refreshToTxn)
return nil
}

// Refresh all spans (merge first).
// TODO(nvanbenschoten): actually merge spans.
refreshSpanBa := roachpb.BatchRequest{}
refreshSpanBa.Txn = refreshTxn
refreshSpanBa.Txn = refreshToTxn
addRefreshes := func(refreshes *condensableSpanSet) {
// We're going to check writes between the previous refreshed timestamp, if
// any, and the timestamp we want to bump the transaction to. Note that if
Expand All @@ -549,17 +532,17 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
if len(u.EndKey) == 0 {
req = &roachpb.RefreshRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(u),
RefreshFrom: sr.refreshedTimestamp,
RefreshFrom: refreshFrom,
}
} else {
req = &roachpb.RefreshRangeRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(u),
RefreshFrom: sr.refreshedTimestamp,
RefreshFrom: refreshFrom,
}
}
refreshSpanBa.Add(req)
log.VEventf(ctx, 2, "updating span %s @%s - @%s to avoid serializable restart",
req.Header().Span(), sr.refreshedTimestamp, refreshTxn.WriteTimestamp)
req.Header().Span(), refreshFrom, refreshToTxn.WriteTimestamp)
}
}
addRefreshes(&sr.refreshFootprint)
Expand All @@ -570,7 +553,7 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
return batchErr
}

sr.refreshedTimestamp.Forward(refreshTxn.ReadTimestamp)
sr.forwardRefreshTimestampOnRefresh(refreshToTxn)
return nil
}

Expand All @@ -579,15 +562,6 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
func (sr *txnSpanRefresher) appendRefreshSpans(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) error {
readTimestamp := br.Txn.ReadTimestamp
if readTimestamp.Less(sr.refreshedTimestamp) {
// This can happen with (illegal) concurrent txn use, but that's supposed to
// be detected by the gatekeeper interceptor.
return errors.AssertionFailedf("attempting to append refresh spans after the tracked"+
" timestamp has moved forward. batchTimestamp: %s refreshedTimestamp: %s ba: %s",
errors.Safe(readTimestamp), errors.Safe(sr.refreshedTimestamp), ba)
}

ba.RefreshSpanIterate(br, func(span roachpb.Span) {
if log.ExpensiveLogEnabled(ctx, 3) {
log.VEventf(ctx, 3, "recording span to refresh: %s", span.String())
Expand Down Expand Up @@ -622,21 +596,54 @@ func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.T
return sr.canForwardReadTimestamp(txn) && !sr.refreshInvalid && sr.refreshFootprint.empty()
}

// forwardRefreshTimestampOnRefresh updates the refresher's tracked
// refreshedTimestamp under lock after a successful refresh. This in conjunction
// with a check in rejectRefreshSpansAtInvalidTimestamp prevents a race where a
// concurrent request may add new refresh spans only "verified" up to its batch
// timestamp after we've refreshed past that timestamp.
func (sr *txnSpanRefresher) forwardRefreshTimestampOnRefresh(refreshToTxn *roachpb.Transaction) {
sr.refreshedTimestamp.Forward(refreshToTxn.ReadTimestamp)
}

// forwardRefreshTimestampOnResponse updates the refresher's tracked
// refreshedTimestamp to stay in sync with "server-side refreshes", where the
// transaction's read timestamp is updated during the evaluation of a batch.
func (sr *txnSpanRefresher) forwardRefreshTimestampOnResponse(
br *roachpb.BatchResponse, pErr *roachpb.Error,
) {
var txn *roachpb.Transaction
ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
) error {
baTxn := ba.Txn
var brTxn *roachpb.Transaction
if pErr != nil {
txn = pErr.GetTxn()
brTxn = pErr.GetTxn()
} else {
txn = br.Txn
brTxn = br.Txn
}
if baTxn == nil || brTxn == nil {
return nil
}
if txn != nil {
sr.refreshedTimestamp.Forward(txn.ReadTimestamp)
if baTxn.ReadTimestamp.Less(brTxn.ReadTimestamp) {
sr.refreshedTimestamp.Forward(brTxn.ReadTimestamp)
} else if brTxn.ReadTimestamp.Less(baTxn.ReadTimestamp) {
return errors.AssertionFailedf("received transaction in response with "+
"earlier read timestamp than in the request. ba.Txn: %s, br.Txn: %s", baTxn, brTxn)
}
return nil
}

// rejectRefreshSpansAtInvalidTimestamp returns an error if the timestamp at
// which a set of reads was performed is below the largest timestamp that this
// transaction has already refreshed to.
func (sr *txnSpanRefresher) rejectRefreshSpansAtInvalidTimestamp(
readTimestamp hlc.Timestamp,
) error {
if readTimestamp.Less(sr.refreshedTimestamp) {
// This can happen with (illegal) concurrent txn use, but that's supposed to
// be detected by the gatekeeper interceptor.
return errors.AssertionFailedf("attempting to append refresh spans after the tracked"+
" timestamp has moved forward. batchTimestamp: %s refreshedTimestamp: %s",
errors.Safe(readTimestamp), errors.Safe(sr.refreshedTimestamp))
}
return nil
}

// maxRefreshAttempts returns the configured number of times that a transaction
Expand Down Expand Up @@ -676,6 +683,9 @@ func (sr *txnSpanRefresher) importLeafFinalState(
sr.refreshInvalid = true
sr.refreshFootprint.clear()
} else if !sr.refreshInvalid {
if err := sr.rejectRefreshSpansAtInvalidTimestamp(tfs.Txn.ReadTimestamp); err != nil {
log.Fatalf(ctx, "%s", err)
}
sr.refreshFootprint.insert(tfs.RefreshSpans...)
sr.refreshFootprint.maybeCondense(ctx, sr.riGen, MaxTxnRefreshSpansBytes.Get(&sr.st.SV))
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) {
tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(3), tsr.refreshFootprint.bytes)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Scan with limit. Only the scanned keys are added to the refresh spans.
ba.Requests = nil
Expand All @@ -109,7 +109,7 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) {
tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(5), tsr.refreshFootprint.bytes)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)
}

// TestTxnSpanRefresherRefreshesTransactions tests that the txnSpanRefresher
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {

require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Hook up a chain of mocking functions.
onFirstSend := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
} else {
require.Nil(t, br)
require.NotNil(t, pErr)
require.Equal(t, ba.Txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(0), tsr.refreshSuccess.Count())
require.Equal(t, int64(0), tsr.refreshAutoRetries.Count())
// Note that we don't check the tsr.refreshFail metric here as tests
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestTxnSpanRefresherMaxRefreshAttempts(t *testing.T) {
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Hook up a chain of mocking functions.
onPut := func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestTxnSpanRefresherSplitEndTxnOnAutoRetry(t *testing.T) {
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, br.Txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

ba.Requests = nil
ba.Add(&putArgs, &etArgs)
Expand Down Expand Up @@ -878,7 +878,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {

require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)

// Send another batch that pushes us above the limit. The tracked spans are
Expand All @@ -896,7 +896,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)
require.False(t, tsr.refreshFootprint.condensed)
require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count())
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Exceed the limit again, this time with a non-adjacent span such that
// condensing needs to occur.
Expand All @@ -911,7 +911,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) {
require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyE}}, tsr.refreshFootprint.asSlice())
require.True(t, tsr.refreshFootprint.condensed)
require.False(t, tsr.refreshInvalid)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(1), tsr.refreshMemoryLimitExceeded.Count())
require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count())

Expand Down Expand Up @@ -1104,7 +1104,7 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, int64(2), tsr.refreshFootprint.bytes)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Incrementing the transaction epoch clears the spans.
tsr.epochBumpedLocked()
Expand All @@ -1125,14 +1125,14 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.True(t, tsr.refreshInvalid)
require.Equal(t, int64(0), tsr.refreshFootprint.bytes)
require.Equal(t, txn.ReadTimestamp, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)

// Incrementing the transaction epoch clears the invalid status.
tsr.epochBumpedLocked()

require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
require.Equal(t, hlc.Timestamp{}, tsr.refreshedTimestamp)
require.Zero(t, tsr.refreshedTimestamp)
}

// TestTxnSpanRefresherSavepoint checks that the span refresher can savepoint
Expand Down

0 comments on commit e0ad9a7

Please sign in to comment.