Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/kvserver: allow the merge transaction to be pushed #60567

Merged
merged 2 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,3 +1143,24 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp
}
return curMode
}

// ManualRefresh is part of the TxnSender interface.
func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error {
tc.mu.Lock()
defer tc.mu.Unlock()

// Hijack the pre-emptive refresh code path to perform the refresh but
// provide the force flag to ensure that the refresh occurs unconditionally.
var ba roachpb.BatchRequest
ba.Txn = tc.mu.txn.Clone()
const force = true
ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force)
if pErr != nil {
pErr = tc.updateStateLocked(ctx, ba, nil, pErr)
} else {
var br roachpb.BatchResponse
br.Txn = ba.Txn
pErr = tc.updateStateLocked(ctx, ba, &br, pErr)
}
return pErr.GoError()
}
167 changes: 167 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2400,3 +2400,170 @@ func TestPutsInStagingTxn(t *testing.T) {
// seen a batch with the STAGING status.
require.True(t, putInStagingSeen)
}

// TestTxnManualRefresh verifies that TxnCoordSender's ManualRefresh method
// works as expected.
func TestTxnManualRefresh(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Create some machinery to mock out the kvserver and allow the test to
// launch some requests from the client and then pass control flow of handling
// those requests back to the test.
type resp struct {
br *roachpb.BatchResponse
pErr *roachpb.Error
}
type req struct {
ba roachpb.BatchRequest
respCh chan resp
}
type testCase struct {
name string
run func(
ctx context.Context,
t *testing.T,
db *kv.DB,
clock *hlc.ManualClock,
reqCh <-chan req,
)
}
var cases = []testCase{
{
name: "no-op",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
require.NoError(t, txn.Commit(ctx))
},
},
{
name: "refresh occurs due to read",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.Put(ctx, "bar", "baz")
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Put)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
// Push the WriteTimestamp simulating an interaction with the
// timestamp cache.
br.Txn.WriteTimestamp =
br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0)
br.Add(&roachpb.PutResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.ManualRefresh(ctx)
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Refresh)
require.True(t, ok)
var br roachpb.BatchResponse
br.Txn = r.ba.Txn.Clone()
br.Add(&roachpb.RefreshResponse{})
r.respCh <- resp{br: &br}
}
require.NoError(t, <-errCh)

// Now a refresh should be a no-op which is indicated by the fact that
// this call does not block to send requests.
require.NoError(t, txn.ManualRefresh(ctx))
},
},
}
run := func(t *testing.T, tc testCase) {
stopper := stop.NewStopper()
manual := hlc.NewManualClock(123)
clock := hlc.NewClock(manual.UnixNano, time.Nanosecond)
ctx := context.Background()
defer stopper.Stop(ctx)

reqCh := make(chan req)
var senderFn kv.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (
*roachpb.BatchResponse, *roachpb.Error) {
r := req{
ba: ba,
respCh: make(chan resp),
}
select {
case reqCh <- r:
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
select {
case rr := <-r.respCh:
return rr.br, rr.pErr
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
tsf := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Clock: clock,
Stopper: stopper,
HeartbeatInterval: time.Hour,
},
senderFn,
)
db := kv.NewDB(ambient, tsf, clock, stopper)

cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
tc.run(cancelCtx, t, db, manual, reqCh)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (sr *txnSpanRefresher) SendLocked(
}

// Attempt a refresh before sending the batch.
ba, pErr := sr.maybeRefreshPreemptively(ctx, ba)
ba, pErr := sr.maybeRefreshPreemptivelyLocked(ctx, ba, false)
if pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -406,13 +406,15 @@ func (sr *txnSpanRefresher) splitEndTxnAndRetrySend(
return br, nil
}

// maybeRefreshPreemptively attempts to refresh a transaction's read timestamp
// maybeRefreshPreemptivelyLocked attempts to refresh a transaction's read timestamp
// eagerly. Doing so can take advantage of opportunities where the refresh is
// free or can avoid wasting work issuing a batch containing an EndTxn that will
// necessarily throw a serializable error. The method returns a batch with an
// updated transaction if the refresh is successful, or a retry error if not.
func (sr *txnSpanRefresher) maybeRefreshPreemptively(
ctx context.Context, ba roachpb.BatchRequest,
// If the force flag is true, the refresh will be attempted even if a refresh
// is not inevitable.
func (sr *txnSpanRefresher) maybeRefreshPreemptivelyLocked(
ctx context.Context, ba roachpb.BatchRequest, force bool,
) (roachpb.BatchRequest, *roachpb.Error) {
// If we know that the transaction will need a refresh at some point because
// its write timestamp has diverged from its read timestamp, consider doing
Expand Down Expand Up @@ -466,7 +468,7 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
refreshInevitable := hasET && args.(*roachpb.EndTxnRequest).Commit

// If neither condition is true, defer the refresh.
if !refreshFree && !refreshInevitable {
if !refreshFree && !refreshInevitable && !force {
return ba, nil
}

Expand Down
69 changes: 32 additions & 37 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) {
// transaction's only intent so far is on P's local range descriptor, and so the
// split transaction can happily commit.
//
// The merge transaction then continues, writing an intent on Q's local
// descriptor. Since the merge transaction is executing at an earlier timestamp
// than the split transaction, the intent is written "under" the updated
// descriptor written by the split transaction.
//
// In the past, the merge transaction would simply push its commit timestamp
// forward and proceed, even though, upon committing, it would discover that it
// was forbidden from committing with a pushed timestamp and abort instead. (For
// why merge transactions cannot forward their commit timestamps, see the
// discussion on the retry loop within AdminMerge.) This was problematic. Before
// the doomed merge transaction attempted to commit, it would send a Subsume
// request, launching a merge watcher goroutine on Q. This watcher goroutine
// could incorrectly think that the merge transaction committed. Why? To
// determine whether a merge has truly aborted, the watcher goroutine sends a
// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the
// Get request returns either nil or a descriptor for a different range, the
// merge is assumed to have committed. In this case, unfortunately, QEndKey is
// the Q's end key post-split. After all, the split has committed and updated
// Q's in-memory descriptor. The split transactions intents are cleaned up
// asynchronously, however, and since the watcher goroutine is not performing a
// consistent read it will not wait for the intents to be cleaned up. So
// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine
// will incorrectly infer that the merge committed. (Note that the watcher
// goroutine can't perform a consistent read, as that would look up the
// transaction record on Q and deadlock, since Q is blocked for merging.)
//
// The bug was fixed by updating Q's local descriptor with a conditional put
// instead of a put. This forces the merge transaction to fail early if writing
// the intent would require forwarding the commit timestamp. In other words,
// this ensures that the merge watcher goroutine is never launched if the RHS
// local descriptor is updated while the merge transaction is executing.
// The merge transaction then continues, reading and writing an intent on Q's
// local descriptor. The locking nature of the read request to Q's local
// descriptor ensures that the merge transaction will observe the post-split
// value for Q.
func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -1280,20 +1253,34 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {

var distSender *kvcoord.DistSender
var lhsDescKey atomic.Value
var lhsStartKey atomic.Value
var launchSplit int64
var mergeRetries int64
var mergePreSplit atomic.Value
var splitCommit atomic.Value
var mergeEndTxnTimestamp atomic.Value
testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if get := req.GetGet(); get != nil && get.KeyLocking != lock.None {
if v := lhsDescKey.Load(); v != nil && v.(roachpb.Key).Equal(get.Key) {
// If this is the first merge attempt, launch the split
// before the merge's first locking read succeeds.
if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) {
mergePreSplit.Store(ba.Txn.ReadTimestamp)
_, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c")))
return pErr
}
// Otherwise, record that the merge retried and proceed.
atomic.AddInt64(&mergeRetries, 1)
// Otherwise, proceed.
}
}
if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) {
splitCommit.Store(ba.Timestamp)
}
if endTxn := req.GetEndTxn(); endTxn != nil {
ct := endTxn.InternalCommitTrigger
startKey, _ := lhsStartKey.Load().(roachpb.RKey)
if ct != nil && ct.MergeTrigger != nil && startKey != nil &&
startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) {
mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp)
}
}
}
Expand Down Expand Up @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
}
lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey))
atomic.StoreInt64(&launchSplit, 1)
lhsStartKey.Store(lhsDesc.StartKey)

mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil {
_, pErr := kv.SendWrapped(ctx, distSender, mergeArgs)
if pErr != nil {
t.Fatal(pErr)
}
if atomic.LoadInt64(&mergeRetries) == 0 {
t.Fatal("expected merge to retry at least once due to concurrent split")
mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp)
splitTS := splitCommit.Load().(hlc.Timestamp)
mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp)
if splitTS.LessEq(mergePreSplitTS) {
t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS)
}
if mergePostSplitTS.LessEq(splitTS) {
t.Fatalf("expected merge to finish after concurrent split, %v <= %v", mergePostSplitTS, splitTS)
}
}

Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge(
log.Event(ctx, "merge txn begins")
txn.SetDebugName(mergeTxnName)

// Observe the commit timestamp to force a client-side retry. See the
// comment on the retry loop after this closure for details.
//
// TODO(benesch): expose a proper API for preventing the fast path.
_ = txn.CommitTimestamp()

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down Expand Up @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge(
return err
}

// Refresh the transaction so that the transaction won't try to refresh
// its reads on the RHS after it is frozen.
if err := txn.ManualRefresh(ctx); err != nil {
return err
}

// Freeze the commit timestamp of the transaction to prevent future pushes
// due to high-priority reads from other transactions. Any attempt to
// refresh reads on the RHS would result in a stalled merge because the
// RHS will be frozen after the Subsume is sent.
_ = txn.CommitTimestamp()

// Intents have been placed, so the merge is now in its critical phase. Get
// a consistent view of the data from the right-hand range. If the merge
// commits, we'll write this data to the left-hand range in the merge
Expand Down Expand Up @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge(
// we'll unlock the right-hand range, giving the next, fresh transaction a
// chance to succeed.
//
// Note that client.DB.Txn performs retries using the same transaction, so we
// have to use our own retry loop.
// A second reason to eschew kv.DB.Txn() is that the API to disable pipelining
// is finicky and only allows disabling pipelining before any operations have
// been sent, even in prior epochs. Calling DisablePipelining() on a restarted
// transaction yields an error.
for {
txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID())
err := runMergeTxn(txn)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ func (m *MockTransactionalSender) GetSteppingMode(context.Context) SteppingMode
return SteppingDisabled
}

// ManualRefresh is part of the TxnSender interface.
func (m *MockTransactionalSender) ManualRefresh(ctx context.Context) error {
panic("unimplemented")
}

// MockTxnSenderFactory is a TxnSenderFactory producing MockTxnSenders.
type MockTxnSenderFactory struct {
senderFunc func(context.Context, *roachpb.Transaction, roachpb.BatchRequest) (
Expand Down
Loading