Skip to content

Commit

Permalink
Revert "kv: remove EndTxn.CanCommitAtHigherTimestamp flag"
Browse files Browse the repository at this point in the history
Fixes #53198.

This reverts commit 93d5eb9.

In #53198, we saw that v20.1 nodes still rely on v20.1 nodes properly
setting this flag. That means that we can't remove it yet. Instead,
all we can do is stop consulting it on the server so that we can
remove it in v21.1.
  • Loading branch information
nvanbenschoten committed Aug 21, 2020
1 parent 86165b9 commit 4189938
Show file tree
Hide file tree
Showing 10 changed files with 923 additions and 627 deletions.
29 changes: 29 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,17 @@ func splitBatchAndCheckForRefreshSpans(
}()
if hasRefreshSpans {
ba.CanForwardReadTimestamp = false

// If the final part contains an EndTxn request, unset its
// CanCommitAtHigherTimestamp flag as well.
lastPart := parts[len(parts)-1]
if et := lastPart[len(lastPart)-1].GetEndTxn(); et != nil {
etCopy := *et
etCopy.CanCommitAtHigherTimestamp = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
}
}
}
return parts
Expand All @@ -662,6 +673,18 @@ func unsetCanForwardReadTimestampFlag(ctx context.Context, ba *roachpb.BatchRequ
if roachpb.NeedsRefresh(req.GetInner()) {
// Unset the flag.
ba.CanForwardReadTimestamp = false

// We would need to also unset the CanCommitAtHigherTimestamp flag
// on any EndTxn request in the batch, but it turns out that because
// we call this function when a batch is split across ranges, we'd
// already have bailed if the EndTxn wasn't a parallel commit — and
// if it was a parallel commit then we must not have any requests
// that need to refresh (see txnCommitter.canCommitInParallel).
// Assert this for our own sanity.
if _, ok := ba.GetArg(roachpb.EndTxn); ok {
log.Fatalf(ctx, "batch unexpected contained requests "+
"that need to refresh and an EndTxn request: %s", ba.String())
}
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,10 @@ func makeTxnCommitExplicitLocked(
// Construct a new batch with just an EndTxn request.
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: txn, CanForwardReadTimestamp: canFwdRTS}
et := roachpb.EndTxnRequest{Commit: true, LockSpans: lockSpans}
et := roachpb.EndTxnRequest{Commit: true}
et.Key = txn.Key
et.LockSpans = lockSpans
et.CanCommitAtHigherTimestamp = canFwdRTS
ba.Add(&et)

_, pErr := s.SendLocked(ctx, ba)
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,11 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {
etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}
ba.Add(&putArgs, &etArgs)

// Set the CanForwardReadTimestamp flag so we can make sure that these are
// propagated to the async explicit commit task.
// Set the CanForwardReadTimestamp and CanCommitAtHigherTimestamp flags so
// we can make sure that these are propagated to the async explicit commit
// task.
ba.Header.CanForwardReadTimestamp = true
etArgs.CanCommitAtHigherTimestamp = true

explicitCommitCh := make(chan struct{})
mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
Expand All @@ -367,6 +369,7 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.CanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 1)
require.Equal(t, roachpb.SequencedWrite{Key: keyA, Sequence: 1}, et.InFlightWrites[0])

Expand All @@ -385,6 +388,7 @@ func TestTxnCommitterAsyncExplicitCommitTask(t *testing.T) {

et := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.CanCommitAtHigherTimestamp)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (m *mockLockedSender) MockSend(
m.mockFn = fn
}

// Reset resets the mockLockedSender mocking function to a no-op.
func (m *mockLockedSender) Reset() {
m.mockFn = nil
}

// ChainMockSend sets a series of mocking functions on the mockLockedSender.
// The provided mocking functions are set in the order that they are provided
// and a given mocking function is set after the previous one has been called.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (sr *txnSpanRefresher) SendLocked(

// Set the batch's CanForwardReadTimestamp flag.
ba.CanForwardReadTimestamp = sr.canForwardReadTimestampWithoutRefresh(ba.Txn)
if rArgs, hasET := ba.GetArg(roachpb.EndTxn); hasET {
et := rArgs.(*roachpb.EndTxnRequest)
et.CanCommitAtHigherTimestamp = ba.CanForwardReadTimestamp
}

// Attempt a refresh before sending the batch.
ba, pErr := sr.maybeRefreshPreemptively(ctx, ba)
Expand Down
144 changes: 144 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,150 @@ func TestTxnSpanRefresherAssignsCanForwardReadTimestamp(t *testing.T) {
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp tests that the
// txnSpanRefresher assigns the CanCommitAtHigherTimestamp flag on EndTxn
// requests, along with the CanForwardReadTimestamp on Batch headers.
func TestTxnSpanRefresherAssignsCanCommitAtHigherTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
keyC, keyD := roachpb.Key("c"), roachpb.Key("d")

// Send an EndTxn request. Should set CanCommitAtHigherTimestamp and
// CanForwardReadTimestamp flags.
var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br := ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send an EndTxn request for a transaction with a fixed commit timestamp.
// Should NOT set CanCommitAtHigherTimestamp and CanForwardReadTimestamp
// flags.
txnFixed := txn.Clone()
txnFixed.CommitTimestampFixed = true
var baFixed roachpb.BatchRequest
baFixed.Header = roachpb.Header{Txn: txnFixed}
baFixed.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, baFixed)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send a batch below the limit to collect refresh spans.
ba.Requests = nil
scanArgs := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyA, EndKey: keyB}}
ba.Add(&scanArgs)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)

// Send another EndTxn request. Should NOT set CanCommitAtHigherTimestamp
// and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another batch.
ba.Requests = nil
scanArgs2 := roachpb.ScanRequest{RequestHeader: roachpb.RequestHeader{Key: keyC, EndKey: keyD}}
ba.Add(&scanArgs2)

mockSender.Reset()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send another EndTxn request. Still should NOT set
// CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.False(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.False(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Increment the transaction's epoch and send another EndTxn request. Should
// set CanCommitAtHigherTimestamp and CanForwardReadTimestamp flags.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())
require.True(t, ba.Requests[0].GetEndTxn().CanCommitAtHigherTimestamp)

br = ba.CreateReply()
br.Txn = ba.Txn
return br, nil
})

tsr.epochBumpedLocked()
br, pErr = tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, []roachpb.Span(nil), tsr.refreshFootprint.asSlice())
require.False(t, tsr.refreshInvalid)
}

// TestTxnSpanRefresherEpochIncrement tests that a txnSpanRefresher's refresh
// spans and span validity status are reset on an epoch increment.
func TestTxnSpanRefresherEpochIncrement(t *testing.T) {
Expand Down
Loading

0 comments on commit 4189938

Please sign in to comment.