From 0edc56ab84ded2967fab82d3661aac6e5ce8be63 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Feb 2021 01:25:24 -0500 Subject: [PATCH] kv/kvclient: add ForceRefresh support for transactions This commit adds support for client-initiated refreshes of transactions. The implementation is somewhat simplistic in that it hijacks existing logic that occurs during the sending of a request. This makese the implementation more uniform with the rest of the client library at the cost of being somewhat awkward and implicit from a code-reading perspective. The motivation for this change is to provide the necessary tools to allow the merge transaction to get pushed. The adoption follows in the next commit. Release note: None --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 21 +++ .../kvclient/kvcoord/txn_coord_sender_test.go | 163 ++++++++++++++++++ .../kvcoord/txn_interceptor_span_refresher.go | 12 +- pkg/kv/mock_transactional_sender.go | 5 + pkg/kv/sender.go | 14 ++ pkg/kv/txn.go | 19 +- 6 files changed, 226 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 0b1f597d36ad..9ead0ef015f4 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1143,3 +1143,24 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp } return curMode } + +// ForceRefreshTransaction is part of the TxnSender interface. +func (tc *TxnCoordSender) ForceRefreshTransaction(ctx context.Context) error { + tc.mu.Lock() + defer tc.mu.Unlock() + + // Hijack the pre-emptive refresh code path to perform the refresh but + // provide the force flag to ensure that the refresh occurs unconditionally. + var ba roachpb.BatchRequest + ba.Txn = tc.mu.txn.Clone() + const force = true + ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force) + if pErr != nil { + pErr = tc.updateStateLocked(ctx, ba, nil, pErr) + } else { + var br roachpb.BatchResponse + br.Txn = ba.Txn + pErr = tc.updateStateLocked(ctx, ba, &br, pErr) + } + return pErr.GoError() +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 0a828f0bb85e..b044eb994c1d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2400,3 +2400,166 @@ func TestPutsInStagingTxn(t *testing.T) { // seen a batch with the STAGING status. require.True(t, putInStagingSeen) } + +// TestTxnForceRefresh verifies that TxnCoordSender's ForceRefresh method +// works as expected. +func TestTxnForceRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Create some machinery to mock out the kvserver and allow the test to + // launch some requests from the client and then pass control flow of handling + // those requests back to the test. + type resp struct { + br *roachpb.BatchResponse + pErr *roachpb.Error + } + type req struct { + ba roachpb.BatchRequest + respCh chan resp + } + type testCase struct { + name string + run func( + ctx context.Context, + t *testing.T, + db *kv.DB, + clock *hlc.ManualClock, + reqCh <-chan req, + ) + } + var cases = []testCase{ + { + name: "no-op", + run: func( + ctx context.Context, t *testing.T, db *kv.DB, + clock *hlc.ManualClock, reqCh <-chan req, + ) { + txn := db.NewTxn(ctx, "test") + errCh := make(chan error) + go func() { + _, err := txn.Get(ctx, "foo") + errCh <- err + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Get) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn + br.Add(&roachpb.GetResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + require.NoError(t, txn.ForceRefresh(ctx)) + require.NoError(t, txn.Commit(ctx)) + }, + }, + { + name: "refresh occurs due to read", + run: func( + ctx context.Context, t *testing.T, db *kv.DB, + clock *hlc.ManualClock, reqCh <-chan req, + ) { + txn := db.NewTxn(ctx, "test") + errCh := make(chan error) + go func() { + _, err := txn.Get(ctx, "foo") + errCh <- err + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Get) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn + br.Add(&roachpb.GetResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.Put(ctx, "bar", "baz") + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Put) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn.Clone() + // Push the WriteTimestamp simulating an interaction with the + // timestamp cache. + br.Txn.WriteTimestamp = + br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0) + br.Add(&roachpb.PutResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.ForceRefresh(ctx) + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Refresh) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn.Clone() + br.Add(&roachpb.RefreshResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + // Now a refresh should be a no-op. + require.NoError(t, txn.ForceRefresh(ctx)) + }, + }, + } + run := func(t *testing.T, tc testCase) { + stopper := stop.NewStopper() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + ctx := context.Background() + defer stopper.Stop(ctx) + + reqCh := make(chan req) + var senderFn kv.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) ( + *roachpb.BatchResponse, *roachpb.Error) { + r := req{ + ba: ba, + respCh: make(chan resp), + } + select { + case reqCh <- r: + case <-ctx.Done(): + return nil, roachpb.NewError(ctx.Err()) + } + select { + case rr := <-r.respCh: + return rr.br, rr.pErr + case <-ctx.Done(): + return nil, roachpb.NewError(ctx.Err()) + } + } + ambient := log.AmbientContext{Tracer: tracing.NewTracer()} + tsf := NewTxnCoordSenderFactory( + TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + Clock: clock, + Stopper: stopper, + HeartbeatInterval: time.Hour, + }, + senderFn, + ) + db := kv.NewDB(ambient, tsf, clock, stopper) + + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + tc.run(cancelCtx, t, db, manual, reqCh) + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 5df5b8969f15..f584238df43f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -193,7 +193,7 @@ func (sr *txnSpanRefresher) SendLocked( } // Attempt a refresh before sending the batch. - ba, pErr := sr.maybeRefreshPreemptively(ctx, ba) + ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false) if pErr != nil { return nil, pErr } @@ -406,13 +406,15 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( return br, nil } -// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp +// maybeRefreshPreemptivelyLocked attempts to refresh a transaction's read timestamp // eagerly. Doing so can take advantage of opportunities where the refresh is // free or can avoid wasting work issuing a batch containing an EndTxn that will // necessarily throw a serializable error. The method returns a batch with an // updated transaction if the refresh is successful, or a retry error if not. -func (sr *txnSpanRefresher) maybeRefreshPreemptively( - ctx context.Context, ba roachpb.BatchRequest, +// If the force flag is true, the refresh will be attempted even if a refresh +// is not inevitable. +func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked( + ctx context.Context, ba roachpb.BatchRequest, force bool, ) (roachpb.BatchRequest, *roachpb.Error) { // If we know that the transaction will need a refresh at some point because // its write timestamp has diverged from its read timestamp, consider doing @@ -466,7 +468,7 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively( refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit // If neither condition is true, defer the refresh. - if !refreshFree && !refreshInevitable { + if !refreshFree && !refreshInevitable && !force { return ba, nil } diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 86c4e5299d74..4d43f05f3862 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -198,6 +198,11 @@ func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode return SteppingDisabled } +// ForceRefreshTransaction is part of the TxnSender interface. +func (m *MockTransactionalSender) ForceRefreshTransaction(ctx context.Context) error { + panic("unimplemented") +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index fe20e2559d56..96e1a686bbe6 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -290,6 +290,20 @@ type TxnSender interface { // GetSteppingMode accompanies ConfigureStepping. It is provided // for use in tests and assertion checks. GetSteppingMode(ctx context.Context) (curMode SteppingMode) + + // ForceRefreshTransaction attempts to refresh a transactions read timestamp + // up to its provisional commit timestamp. In the case that the two are + // already the same, it is a no-op. The reason one might want to do that is + // to ensure that a transaction can commit without experiencing another push. + // + // A transaction which has proven all of its intents and has been fully + // refreshed and does not perform any additional reads or writes that does not + // contend with any other transactions will not be pushed further. This + // method's reason for existence is to ensure that range merge requests can + // be pushed but then can later commit without the possibility of needing to + // refresh reads performed on the RHS after the RHS has been subsumed but + // before the merge transaction completed. + ForceRefreshTransaction(ctx context.Context) error } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 071ae667b1bc..1b43afc916a2 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -334,9 +334,7 @@ func (txn *Txn) SetSystemConfigTrigger(forSystemTenant bool) error { } // DisablePipelining instructs the transaction not to pipeline requests. It -// should rarely be necessary to call this method. It is only recommended for -// transactions that need extremely precise control over the request ordering, -// like the transaction that merges ranges together. +// should rarely be necessary to call this method. // // DisablePipelining must be called before any operations are performed on the // transaction. @@ -1297,3 +1295,18 @@ func (txn *Txn) ReleaseSavepoint(ctx context.Context, s SavepointToken) error { defer txn.mu.Unlock() return txn.mu.sender.ReleaseSavepoint(ctx, s) } + +// ForceRefresh forces a refresh of the read timestamp of a transaction to +// match that of its write timestamp. It is only recommended for transactions +// that need extremely precise control over the request ordering, like the +// transaction that merges ranges together. When combined with +// DisablePipelining, this feature allows the range merge transaction to +// prove that it will not be pushed between sending its SubsumeRequest and +// committing. This enables that request to be pushed at earlier points in +// its lifecycle. +func (txn *Txn) ForceRefresh(ctx context.Context) error { + txn.mu.Lock() + sender := txn.mu.sender + txn.mu.Unlock() + return sender.ForceRefreshTransaction(ctx) +}