Skip to content

Commit

Permalink
kv: remove EndTxn.CanCommitAtHigherTimestamp flag
Browse files Browse the repository at this point in the history
The field was replaced with the more general BatchRequest.CanForwardReadTimestamp
in 972915d. This commit completes the migration to remove the old flag, which is
possible now that we're building towards v20.2.
  • Loading branch information
nvanbenschoten committed Aug 21, 2020
1 parent 904b7cb commit 93d5eb9
Show file tree
Hide file tree
Showing 19 changed files with 737 additions and 1,049 deletions.
29 changes: 0 additions & 29 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 0 additions & 21 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 0 additions & 23 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,17 +637,6 @@ func splitBatchAndCheckForRefreshSpans(
}()
if hasRefreshSpans {
ba.CanForwardReadTimestamp = false

// If the final part contains an EndTxn request, unset its
// CanCommitAtHigherTimestamp flag as well.
lastPart := parts[len(parts)-1]
if et := lastPart[len(lastPart)-1].GetEndTxn(); et != nil {
etCopy := *et
etCopy.CanCommitAtHigherTimestamp = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
}
}
}
return parts
Expand All @@ -670,18 +659,6 @@ func unsetCanForwardReadTimestampFlag(ctx context.Context, ba *roachpb.BatchRequ
if roachpb.NeedsRefresh(req.GetInner()) {
// Unset the flag.
ba.CanForwardReadTimestamp = false

// We would need to also unset the CanCommitAtHigherTimestamp flag
// on any EndTxn request in the batch, but it turns out that because
// we call this function when a batch is split across ranges, we'd
// already have bailed if the EndTxn wasn't a parallel commit — and
// if it was a parallel commit then we must not have any requests
// that need to refresh (see txnCommitter.canCommitInParallel).
// Assert this for our own sanity.
if _, ok := ba.GetArg(roachpb.EndTxn); ok {
log.Fatalf(ctx, "batch unexpected contained requests "+
"that need to refresh and an EndTxn request: %s", ba.String())
}
return
}
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,8 @@ func makeTxnCommitExplicitLocked(
// Construct a new batch with just an EndTxn request.
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn, CanForwardReadTimestamp: canFwdRTS}
et := roachpb.EndTxnRequest{Commit: true}
et := roachpb.EndTxnRequest{Commit: true, LockSpans: lockSpans}
et.Key = txn.Key
et.LockSpans = lockSpans
et.CanCommitAtHigherTimestamp = canFwdRTS
ba.Add(&et)

_, pErr := s.SendLocked(ctx, ba)
Expand Down
8 changes: 2 additions & 6 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,9 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {
etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}
ba.Add(&putArgs, &etArgs)

// Set the CanForwardReadTimestamp and CanCommitAtHigherTimestamp flags so
// we can make sure that these are propagated to the async explicit commit
// task.
// Set the CanForwardReadTimestamp flag so we can make sure that these are
// propagated to the async explicit commit task.
ba.Header.CanForwardReadTimestamp = true
etArgs.CanCommitAtHigherTimestamp = true

explicitCommitCh := make(chan struct{})
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
Expand All @@ -369,7 +367,6 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.CanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 1)
require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0])

Expand All @@ -388,7 +385,6 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.CanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func (m *mockLockedSender) MockSend(
m.mockFn = fn
}

// Reset resets the mockLockedSender mocking function to a no-op.
func (m *mockLockedSender) Reset() {
m.mockFn = nil
}

// ChainMockSend sets a series of mocking functions on the mockLockedSender.
// The provided mocking functions are set in the order that they are provided
// and a given mocking function is set after the previous one has been called.
Expand Down
18 changes: 11 additions & 7 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ func (sr *txnSpanRefresher) SendLocked(
// Set the batch's CanForwardReadTimestamp flag.
canFwdRTS := sr.canForwardReadTimestampWithoutRefresh(ba.Txn)
ba.CanForwardReadTimestamp = canFwdRTS
if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
et := rArgs.(*roachpb.EndTxnRequest)
et.CanCommitAtHigherTimestamp = canFwdRTS
}

maxAttempts := maxTxnRefreshAttempts
if knob := sr.knobs.MaxTxnRefreshAttempts; knob != 0 {
Expand Down Expand Up @@ -429,9 +425,17 @@ func (sr *txnSpanRefresher) appendRefreshSpans(
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
// forward its read timestamp without refreshing any read spans. This allows
// for the "server-side refresh" optimization, where batches are re-evaluated
// at a higher read-timestamp without returning to transaction coordinator.
// forward its read timestamp without refreshing any read spans. This allows for
// the "server-side refresh" optimization, where batches are re-evaluated at a
// higher read-timestamp without returning to transaction coordinator.
//
// This requires that the transaction has encountered no spans which require
// refreshing at the forwarded timestamp and that the transaction's timestamp
// has not leaked. If either of those conditions are true, a client-side refresh
// is required.
//
// Note that when deciding whether a transaction can be bumped to a particular
// timestamp, the transaction's deadling must also be taken into account.
func (sr *txnSpanRefresher) canForwardReadTimestampWithoutRefresh(txn *roachpb.Transaction) bool {
return sr.canAutoRetry && !sr.refreshInvalid && sr.refreshFootprint.empty() && !txn.CommitTimestampFixed
}
Expand Down
144 changes: 0 additions & 144 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,150 +612,6 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp tests that the
// txnSpanRefresher assigns the CanCommitAtHigherTimestamp flag on EndTxn
// requests, along with the CanForwardReadTimestamp on Batch headers.
func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send an EndTxn request. Should set CanCommitAtHigherTimestamp and
// CanForwardReadTimestamp flags.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send an EndTxn request for a transaction with a fixed commit timestamp.
// Should NOT set CanCommitAtHigherTimestamp and CanForwardReadTimestamp
// flags.
txnFixed := txn.Clone()
txnFixed.CommitTimestampFixed = true
var baFixed roachpb.BatchRequest
baFixed.Header = roachpb.Header{Txn: txnFixed}
baFixed.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, baFixed)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send a batch below the limit to collect refresh spans.
ba.Requests = nil
scanArgs := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}
ba.Add(&scanArgs)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)

// Send another EndTxn request. Should NOT set CanCommitAtHigherTimestamp
// and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another batch.
ba.Requests = nil
scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}}
ba.Add(&scanArgs2)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another EndTxn request. Still should NOT set
// CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Increment the transaction's epoch and send another EndTxn request. Should
// set CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

tsr.epochBumpedLocked()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherEpochIncrement tests that a txnSpanRefresher's refresh
// spans and span validity status are reset on an epoch increment.
func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
Expand Down
Loading

0 comments on commit 93d5eb9

Please sign in to comment.