Skip to content

Commit

Permalink
roachpb: remove EndTxn's DeprecatedCanCommitAtHigherTimestamp field
Browse files Browse the repository at this point in the history
The field was replaced with the more general BatchRequest.CanForwardReadTimestamp
in 972915d. This commit completes the migration to remove the old flag.

We attempted this before, in 93d5eb9, but had to back that out in 4189938 because
we still needed compatibility with v20.1 nodes at the time. That is no longer the
case.
  • Loading branch information
nvanbenschoten committed Apr 29, 2021
1 parent 974b144 commit ee23325
Show file tree
Hide file tree
Showing 8 changed files with 525 additions and 792 deletions.
24 changes: 0 additions & 24 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,17 +659,6 @@ func splitBatchAndCheckForRefreshSpans(
}()
if hasRefreshSpans {
ba.CanForwardReadTimestamp = false

// If the final part contains an EndTxn request, unset its
// DeprecatedCanCommitAtHigherTimestamp flag as well.
lastPart := parts[len(parts)-1]
if et := lastPart[len(lastPart)-1].GetEndTxn(); et != nil {
etCopy := *et
etCopy.DeprecatedCanCommitAtHigherTimestamp = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
}
}
}
return parts
Expand All @@ -692,19 +681,6 @@ func unsetCanForwardReadTimestampFlag(ctx context.Context, ba *roachpb.BatchRequ
if roachpb.NeedsRefresh(req.GetInner()) {
// Unset the flag.
ba.CanForwardReadTimestamp = false

// We would need to also unset the DeprecatedCanCommitAtHigherTimestamp
// flag on any EndTxn request in the batch, but it turns out that
// because we call this function when a batch is split across
// ranges, we'd already have bailed if the EndTxn wasn't a parallel
// commit — and if it was a parallel commit then we must not have
// any requests that need to refresh (see
// txnCommitter.canCommitInParallel). Assert this for our own
// sanity.
if _, ok := ba.GetArg(roachpb.EndTxn); ok {
log.Fatalf(ctx, "batch unexpected contained requests "+
"that need to refresh and an EndTxn request: %s", ba.String())
}
return
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ func makeTxnCommitExplicitLocked(
et := roachpb.EndTxnRequest{Commit: true}
et.Key = txn.Key
et.LockSpans = lockSpans
et.DeprecatedCanCommitAtHigherTimestamp = canFwdRTS
ba.Add(&et)

_, pErr := s.SendLocked(ctx, ba)
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,9 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {
etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}
ba.Add(&putArgs, &etArgs)

// Set the CanForwardReadTimestamp and DeprecatedCanCommitAtHigherTimestamp
// flags so we can make sure that these are propagated to the async explicit
// commit task.
// Set the CanForwardReadTimestamp flag so we can make sure that these are
// propagated to the async explicit commit task.
ba.Header.CanForwardReadTimestamp = true
etArgs.DeprecatedCanCommitAtHigherTimestamp = true

explicitCommitCh := make(chan struct{})
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
Expand All @@ -369,7 +367,6 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.DeprecatedCanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 1)
require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0])

Expand All @@ -388,7 +385,6 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.DeprecatedCanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (m *mockLockedSender) MockSend(
m.mockFn = fn
}

// Reset resets the mockLockedSender mocking function to a no-op.
func (m *mockLockedSender) Reset() {
m.mockFn = nil
}

// ChainMockSend sets a series of mocking functions on the mockLockedSender.
// The provided mocking functions are set in the order that they are provided
// and a given mocking function is set after the previous one has been called.
Expand Down
27 changes: 0 additions & 27 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,33 +164,6 @@ func (sr *txnSpanRefresher) SendLocked(

// Set the batch's CanForwardReadTimestamp flag.
ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn)
if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
et := rArgs.(*roachpb.EndTxnRequest)
// Assign the EndTxn's DeprecatedCanCommitAtHigherTimestamp flag if it
// isn't already set correctly. We don't write blindly because we could
// be dealing with a re-issued batch from splitEndTxnAndRetrySend after
// a refresh and we don't want to mutate previously issued requests or
// we risk a data race (checked by raceTransport). In these cases, we
// need to clone the EndTxn request first before mutating.
//
// We know this is a re-issued batch if the flag is already set and we
// need to unset it. We aren't able to detect the case where the flag is
// not set and we now need to set it to true, but such cases don't
// happen in practice (i.e. we'll never begin setting the flag after a
// refresh).
//
// TODO(nvanbenschoten): this is ugly. If we weren't about to delete
// this field, we'd want to do something better. Just delete this ASAP.
if et.DeprecatedCanCommitAtHigherTimestamp != ba.CanForwardReadTimestamp {
isReissue := et.DeprecatedCanCommitAtHigherTimestamp
if isReissue {
etCpy := *et
ba.Requests[len(ba.Requests)-1].MustSetInner(&etCpy)
et = &etCpy
}
et.DeprecatedCanCommitAtHigherTimestamp = ba.CanForwardReadTimestamp
}
}

// Attempt a refresh before sending the batch.
ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false)
Expand Down
145 changes: 0 additions & 145 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,151 +1069,6 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp tests that the
// txnSpanRefresher assigns the CanCommitAtHigherTimestamp flag on EndTxn
// requests, along with the CanForwardReadTimestamp on Batch headers.
func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send an EndTxn request. Should set DeprecatedCanCommitAtHigherTimestamp
// and CanForwardReadTimestamp flags.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().DeprecatedCanCommitAtHigherTimestamp)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send an EndTxn request for a transaction with a fixed commit timestamp.
// Should NOT set DeprecatedCanCommitAtHigherTimestamp and
// CanForwardReadTimestamp flags.
txnFixed := txn.Clone()
txnFixed.CommitTimestampFixed = true
var baFixed roachpb.BatchRequest
baFixed.Header = roachpb.Header{Txn: txnFixed}
baFixed.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().DeprecatedCanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, baFixed)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send a batch below the limit to collect refresh spans.
ba.Requests = nil
scanArgs := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}
ba.Add(&scanArgs)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)

// Send another EndTxn request. Should NOT set
// DeprecatedCanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().DeprecatedCanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another batch.
ba.Requests = nil
scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}}
ba.Add(&scanArgs2)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another EndTxn request. Still should NOT set
// DeprecatedCanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().DeprecatedCanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Increment the transaction's epoch and send another EndTxn request. Should
// set DeprecatedCanCommitAtHigherTimestamp and CanForwardReadTimestamp
// flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().DeprecatedCanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

tsr.epochBumpedLocked()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherEpochIncrement tests that a txnSpanRefresher's refresh
// spans and span validity status are reset on an epoch increment.
func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
Expand Down
Loading

0 comments on commit ee23325

Please sign in to comment.