From 91352871d42632e696c09688ec10f50a54f790df Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Feb 2021 01:25:24 -0500 Subject: [PATCH 1/2] kv/kvclient: add ManualRefresh support for transactions This commit adds support for client-initiated refreshes of transactions. The implementation is somewhat simplistic in that it hijacks existing logic that occurs during the sending of a request. This makese the implementation more uniform with the rest of the client library at the cost of being somewhat awkward and implicit from a code-reading perspective. The motivation for this change is to provide the necessary tools to allow the merge transaction to get pushed. The adoption follows in the next commit. Release note: None --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 21 +++ .../kvclient/kvcoord/txn_coord_sender_test.go | 167 ++++++++++++++++++ .../kvcoord/txn_interceptor_span_refresher.go | 12 +- pkg/kv/mock_transactional_sender.go | 5 + pkg/kv/sender.go | 14 ++ pkg/kv/txn.go | 19 +- 6 files changed, 230 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 0b1f597d36ad..5a028e0617d2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1143,3 +1143,24 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp } return curMode } + +// ManualRefresh is part of the TxnSender interface. +func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error { + tc.mu.Lock() + defer tc.mu.Unlock() + + // Hijack the pre-emptive refresh code path to perform the refresh but + // provide the force flag to ensure that the refresh occurs unconditionally. + var ba roachpb.BatchRequest + ba.Txn = tc.mu.txn.Clone() + const force = true + ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force) + if pErr != nil { + pErr = tc.updateStateLocked(ctx, ba, nil, pErr) + } else { + var br roachpb.BatchResponse + br.Txn = ba.Txn + pErr = tc.updateStateLocked(ctx, ba, &br, pErr) + } + return pErr.GoError() +} diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 0a828f0bb85e..a40da6634ef9 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2400,3 +2400,170 @@ func TestPutsInStagingTxn(t *testing.T) { // seen a batch with the STAGING status. require.True(t, putInStagingSeen) } + +// TestTxnManualRefresh verifies that TxnCoordSender's ManualRefresh method +// works as expected. +func TestTxnManualRefresh(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Create some machinery to mock out the kvserver and allow the test to + // launch some requests from the client and then pass control flow of handling + // those requests back to the test. + type resp struct { + br *roachpb.BatchResponse + pErr *roachpb.Error + } + type req struct { + ba roachpb.BatchRequest + respCh chan resp + } + type testCase struct { + name string + run func( + ctx context.Context, + t *testing.T, + db *kv.DB, + clock *hlc.ManualClock, + reqCh <-chan req, + ) + } + var cases = []testCase{ + { + name: "no-op", + run: func( + ctx context.Context, t *testing.T, db *kv.DB, + clock *hlc.ManualClock, reqCh <-chan req, + ) { + txn := db.NewTxn(ctx, "test") + errCh := make(chan error) + go func() { + _, err := txn.Get(ctx, "foo") + errCh <- err + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Get) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn + br.Add(&roachpb.GetResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + // Now a refresh should be a no-op which is indicated by the fact that + // this call does not block to send requests. + require.NoError(t, txn.ManualRefresh(ctx)) + require.NoError(t, txn.Commit(ctx)) + }, + }, + { + name: "refresh occurs due to read", + run: func( + ctx context.Context, t *testing.T, db *kv.DB, + clock *hlc.ManualClock, reqCh <-chan req, + ) { + txn := db.NewTxn(ctx, "test") + errCh := make(chan error) + go func() { + _, err := txn.Get(ctx, "foo") + errCh <- err + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Get) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn + br.Add(&roachpb.GetResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.Put(ctx, "bar", "baz") + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Put) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn.Clone() + // Push the WriteTimestamp simulating an interaction with the + // timestamp cache. + br.Txn.WriteTimestamp = + br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0) + br.Add(&roachpb.PutResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.ManualRefresh(ctx) + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Refresh) + require.True(t, ok) + var br roachpb.BatchResponse + br.Txn = r.ba.Txn.Clone() + br.Add(&roachpb.RefreshResponse{}) + r.respCh <- resp{br: &br} + } + require.NoError(t, <-errCh) + + // Now a refresh should be a no-op which is indicated by the fact that + // this call does not block to send requests. + require.NoError(t, txn.ManualRefresh(ctx)) + }, + }, + } + run := func(t *testing.T, tc testCase) { + stopper := stop.NewStopper() + manual := hlc.NewManualClock(123) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + ctx := context.Background() + defer stopper.Stop(ctx) + + reqCh := make(chan req) + var senderFn kv.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) ( + *roachpb.BatchResponse, *roachpb.Error) { + r := req{ + ba: ba, + respCh: make(chan resp), + } + select { + case reqCh <- r: + case <-ctx.Done(): + return nil, roachpb.NewError(ctx.Err()) + } + select { + case rr := <-r.respCh: + return rr.br, rr.pErr + case <-ctx.Done(): + return nil, roachpb.NewError(ctx.Err()) + } + } + ambient := log.AmbientContext{Tracer: tracing.NewTracer()} + tsf := NewTxnCoordSenderFactory( + TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + Clock: clock, + Stopper: stopper, + HeartbeatInterval: time.Hour, + }, + senderFn, + ) + db := kv.NewDB(ambient, tsf, clock, stopper) + + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + tc.run(cancelCtx, t, db, manual, reqCh) + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 5df5b8969f15..f584238df43f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -193,7 +193,7 @@ func (sr *txnSpanRefresher) SendLocked( } // Attempt a refresh before sending the batch. - ba, pErr := sr.maybeRefreshPreemptively(ctx, ba) + ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false) if pErr != nil { return nil, pErr } @@ -406,13 +406,15 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend( return br, nil } -// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp +// maybeRefreshPreemptivelyLocked attempts to refresh a transaction's read timestamp // eagerly. Doing so can take advantage of opportunities where the refresh is // free or can avoid wasting work issuing a batch containing an EndTxn that will // necessarily throw a serializable error. The method returns a batch with an // updated transaction if the refresh is successful, or a retry error if not. -func (sr *txnSpanRefresher) maybeRefreshPreemptively( - ctx context.Context, ba roachpb.BatchRequest, +// If the force flag is true, the refresh will be attempted even if a refresh +// is not inevitable. +func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked( + ctx context.Context, ba roachpb.BatchRequest, force bool, ) (roachpb.BatchRequest, *roachpb.Error) { // If we know that the transaction will need a refresh at some point because // its write timestamp has diverged from its read timestamp, consider doing @@ -466,7 +468,7 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively( refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit // If neither condition is true, defer the refresh. - if !refreshFree && !refreshInevitable { + if !refreshFree && !refreshInevitable && !force { return ba, nil } diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go index 86c4e5299d74..1a27c205172a 100644 --- a/pkg/kv/mock_transactional_sender.go +++ b/pkg/kv/mock_transactional_sender.go @@ -198,6 +198,11 @@ func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode return SteppingDisabled } +// ManualRefresh is part of the TxnSender interface. +func (m *MockTransactionalSender) ManualRefresh(ctx context.Context) error { + panic("unimplemented") +} + // MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders. type MockTxnSenderFactory struct { senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) ( diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index fe20e2559d56..856fbc42e3b9 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -290,6 +290,20 @@ type TxnSender interface { // GetSteppingMode accompanies ConfigureStepping. It is provided // for use in tests and assertion checks. GetSteppingMode(ctx context.Context) (curMode SteppingMode) + + // ManualRefresh attempts to refresh a transactions read timestamp up to its + // provisional commit timestamp. In the case that the two are already the + // same, it is a no-op. The reason one might want to do that is to ensure + // that a transaction can commit without experiencing another push. + // + // A transaction which has proven all of its intents and has been fully + // refreshed and does not perform any additional reads or writes that does not + // contend with any other transactions will not be pushed further. This + // method's reason for existence is to ensure that range merge requests can + // be pushed but then can later commit without the possibility of needing to + // refresh reads performed on the RHS after the RHS has been subsumed but + // before the merge transaction completed. + ManualRefresh(ctx context.Context) error } // SteppingMode is the argument type to ConfigureStepping. diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 071ae667b1bc..e09f39ea6117 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -334,9 +334,7 @@ func (txn *Txn) SetSystemConfigTrigger(forSystemTenant bool) error { } // DisablePipelining instructs the transaction not to pipeline requests. It -// should rarely be necessary to call this method. It is only recommended for -// transactions that need extremely precise control over the request ordering, -// like the transaction that merges ranges together. +// should rarely be necessary to call this method. // // DisablePipelining must be called before any operations are performed on the // transaction. @@ -1297,3 +1295,18 @@ func (txn *Txn) ReleaseSavepoint(ctx context.Context, s SavepointToken) error { defer txn.mu.Unlock() return txn.mu.sender.ReleaseSavepoint(ctx, s) } + +// ManualRefresh forces a refresh of the read timestamp of a transaction to +// match that of its write timestamp. It is only recommended for transactions +// that need extremely precise control over the request ordering, like the +// transaction that merges ranges together. When combined with +// DisablePipelining, this feature allows the range merge transaction to +// prove that it will not be pushed between sending its SubsumeRequest and +// committing. This enables that request to be pushed at earlier points in +// its lifecycle. +func (txn *Txn) ManualRefresh(ctx context.Context) error { + txn.mu.Lock() + sender := txn.mu.sender + txn.mu.Unlock() + return sender.ManualRefresh(ctx) +} From 3ff8b3597d9fa13788f5ac18d5f9afdab8133559 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 14 Feb 2021 01:27:37 -0500 Subject: [PATCH 2/2] kv/kvserver: allow the merge transaction to be pushed Historically we have not allowed the merge transaction to be pushed. The reason we disabled this was because of the hazard due to attempting to refresh reads on the RHS of the merge after the SubsumeRequest has been sent. The `SubsumeRequest` effectively freezes the RHS until the merge commits or aborts. In order to side-step this hazard, this change ensures that nothing should prevent the merge transaction from either committing or aborting. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 69 +++++++++++++--------------- pkg/kv/kvserver/replica_command.go | 24 ++++++---- 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index a80926110b14..488048d39bed 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) { // transaction's only intent so far is on P's local range descriptor, and so the // split transaction can happily commit. // -// The merge transaction then continues, writing an intent on Q's local -// descriptor. Since the merge transaction is executing at an earlier timestamp -// than the split transaction, the intent is written "under" the updated -// descriptor written by the split transaction. -// -// In the past, the merge transaction would simply push its commit timestamp -// forward and proceed, even though, upon committing, it would discover that it -// was forbidden from committing with a pushed timestamp and abort instead. (For -// why merge transactions cannot forward their commit timestamps, see the -// discussion on the retry loop within AdminMerge.) This was problematic. Before -// the doomed merge transaction attempted to commit, it would send a Subsume -// request, launching a merge watcher goroutine on Q. This watcher goroutine -// could incorrectly think that the merge transaction committed. Why? To -// determine whether a merge has truly aborted, the watcher goroutine sends a -// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the -// Get request returns either nil or a descriptor for a different range, the -// merge is assumed to have committed. In this case, unfortunately, QEndKey is -// the Q's end key post-split. After all, the split has committed and updated -// Q's in-memory descriptor. The split transactions intents are cleaned up -// asynchronously, however, and since the watcher goroutine is not performing a -// consistent read it will not wait for the intents to be cleaned up. So -// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine -// will incorrectly infer that the merge committed. (Note that the watcher -// goroutine can't perform a consistent read, as that would look up the -// transaction record on Q and deadlock, since Q is blocked for merging.) -// -// The bug was fixed by updating Q's local descriptor with a conditional put -// instead of a put. This forces the merge transaction to fail early if writing -// the intent would require forwarding the commit timestamp. In other words, -// this ensures that the merge watcher goroutine is never launched if the RHS -// local descriptor is updated while the merge transaction is executing. +// The merge transaction then continues, reading and writing an intent on Q's +// local descriptor. The locking nature of the read request to Q's local +// descriptor ensures that the merge transaction will observe the post-split +// value for Q. func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1280,8 +1253,11 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { var distSender *kvcoord.DistSender var lhsDescKey atomic.Value + var lhsStartKey atomic.Value var launchSplit int64 - var mergeRetries int64 + var mergePreSplit atomic.Value + var splitCommit atomic.Value + var mergeEndTxnTimestamp atomic.Value testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { for _, req := range ba.Requests { if get := req.GetGet(); get != nil && get.KeyLocking != lock.None { @@ -1289,11 +1265,22 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { // If this is the first merge attempt, launch the split // before the merge's first locking read succeeds. if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) { + mergePreSplit.Store(ba.Txn.ReadTimestamp) _, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c"))) return pErr } - // Otherwise, record that the merge retried and proceed. - atomic.AddInt64(&mergeRetries, 1) + // Otherwise, proceed. + } + } + if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) { + splitCommit.Store(ba.Timestamp) + } + if endTxn := req.GetEndTxn(); endTxn != nil { + ct := endTxn.InternalCommitTrigger + startKey, _ := lhsStartKey.Load().(roachpb.RKey) + if ct != nil && ct.MergeTrigger != nil && startKey != nil && + startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) { + mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp) } } } @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey)) atomic.StoreInt64(&launchSplit, 1) + lhsStartKey.Store(lhsDesc.StartKey) mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) - if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil { + _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs) + if pErr != nil { t.Fatal(pErr) } - if atomic.LoadInt64(&mergeRetries) == 0 { - t.Fatal("expected merge to retry at least once due to concurrent split") + mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp) + splitTS := splitCommit.Load().(hlc.Timestamp) + mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp) + if splitTS.LessEq(mergePreSplitTS) { + t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS) + } + if mergePostSplitTS.LessEq(splitTS) { + t.Fatalf("expected merge to finish after concurrent split, %v <= %v", mergePostSplitTS, splitTS) } } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b3e2870fd6f0..053186a5faec 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge( log.Event(ctx, "merge txn begins") txn.SetDebugName(mergeTxnName) - // Observe the commit timestamp to force a client-side retry. See the - // comment on the retry loop after this closure for details. - // - // TODO(benesch): expose a proper API for preventing the fast path. - _ = txn.CommitTimestamp() - // Pipelining might send QueryIntent requests to the RHS after the RHS has // noticed the merge and started blocking all traffic. This causes the merge // transaction to deadlock. Just turn pipelining off; the structure of the @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge( return err } + // Refresh the transaction so that the transaction won't try to refresh + // its reads on the RHS after it is frozen. + if err := txn.ManualRefresh(ctx); err != nil { + return err + } + + // Freeze the commit timestamp of the transaction to prevent future pushes + // due to high-priority reads from other transactions. Any attempt to + // refresh reads on the RHS would result in a stalled merge because the + // RHS will be frozen after the Subsume is sent. + _ = txn.CommitTimestamp() + // Intents have been placed, so the merge is now in its critical phase. Get // a consistent view of the data from the right-hand range. If the merge // commits, we'll write this data to the left-hand range in the merge @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge( // we'll unlock the right-hand range, giving the next, fresh transaction a // chance to succeed. // - // Note that client.DB.Txn performs retries using the same transaction, so we - // have to use our own retry loop. + // A second reason to eschew kv.DB.Txn() is that the API to disable pipelining + // is finicky and only allows disabling pipelining before any operations have + // been sent, even in prior epochs. Calling DisablePipelining() on a restarted + // transaction yields an error. for { txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID()) err := runMergeTxn(txn)