From 22554dbbb0bc585105fdc9bbf53dd18a0d4ae550 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Wed, 6 Jun 2018 18:07:44 -0400 Subject: [PATCH] kv: fix issues around failed 1PC txns The recent #25541 changed the way "tracking" (the heartbeat loop and intent collection) is initiated for transactions. It aimed to simplify things and put the burden on the client to decide when a txn needs tracking. This introduced a problem - the client.Txn was not initiating tracking when sending 1PC batches. However, tracking is needed for these transactions too: even though usually they'll succeed and so the TxnCoordSender state can be quickly destroyed, when they fail their intents and heartbeat loop need to be kept around just like for any other txn. This patch backtracks on the previous move to make it the client's responsibility to initiate tracking (it didn't stand the test of time): the client.Txn is no longer in charge of calling tcs.StartTracking(). Instead, the TCS does whatever needs to be done when it sees an EndTransaction. I also took the opportunity to spruce up comments on the TxnCoordSender. Release note: None --- pkg/internal/client/client_test.go | 14 ++- pkg/internal/client/sender.go | 30 ++---- pkg/internal/client/txn.go | 6 -- pkg/internal/client/txn_test.go | 7 +- pkg/kv/txn_coord_sender.go | 102 +++++++++++++------- pkg/kv/txn_coord_sender_test.go | 150 ++++++++++++++++++++--------- pkg/sql/txn_state_test.go | 6 +- pkg/storage/store_test.go | 2 - 8 files changed, 196 insertions(+), 121 deletions(-) diff --git a/pkg/internal/client/client_test.go b/pkg/internal/client/client_test.go index 28e4bdc04555..ca7f4c6f6a5b 100644 --- a/pkg/internal/client/client_test.go +++ b/pkg/internal/client/client_test.go @@ -799,15 +799,14 @@ func TestReadConsistencyTypes(t *testing.T) { // Mock out DistSender's sender function to check the read consistency for // outgoing BatchRequests and return an empty reply. factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender { - return client.TxnSenderAdapter{ - StartTrackingWrapped: func(context.Context) error { panic("unimplemented") }, - Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return client.TxnSenderFunc( + func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if ba.ReadConsistency != rc { return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency) } return ba.CreateReply(), nil }, - } + ) }) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) @@ -976,12 +975,11 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) { // Mock out sender function to check that created transactions // have the observed timestamp set for the configured node ID. factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender { - return client.TxnSenderAdapter{ - StartTrackingWrapped: func(context.Context) error { panic("unimplemented") }, - Wrapped: func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return client.TxnSenderFunc( + func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { return ba.CreateReply(), nil }, - } + ) }) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index aeddc164f016..107346c58a78 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -78,9 +78,6 @@ type TxnSender interface { // if this method is invoked multiple times, the most recent callback // is the only one which will be invoked. OnFinish(func(error)) - - // StartTracking starts a heartbeat loop and tracking of intents. - StartTracking(ctx context.Context) error } // TxnSenderFactory is the interface used to create new instances @@ -108,37 +105,28 @@ func (f SenderFunc) Send( return f(ctx, ba) } -// TxnSenderAdapter is an adapter to allow the use of ordinary functions as +// TxnSenderFunc is an adapter to allow the use of ordinary functions as // TxnSenders with GetMeta or AugmentMeta panicing with unimplemented. This is // a helper mechanism to facilitate testing. -type TxnSenderAdapter struct { - Wrapped func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) - StartTrackingWrapped func(context.Context) error -} +type TxnSenderFunc func( + context.Context, roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) // Send calls f(ctx, c). -func (f TxnSenderAdapter) Send( +func (f TxnSenderFunc) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - return f.Wrapped(ctx, ba) + return f(ctx, ba) } // GetMeta is part of the TxnSender interface. -func (f TxnSenderAdapter) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") } +func (f TxnSenderFunc) GetMeta() roachpb.TxnCoordMeta { panic("unimplemented") } // AugmentMeta is part of the TxnSender interface. -func (f TxnSenderAdapter) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") } +func (f TxnSenderFunc) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic("unimplemented") } // OnFinish is part of the TxnSender interface. -func (f TxnSenderAdapter) OnFinish(_ func(error)) { panic("unimplemented") } - -// StartTracking is part the TxnSender interface. -func (f TxnSenderAdapter) StartTracking(ctx context.Context) error { - if f.StartTrackingWrapped != nil { - return f.StartTrackingWrapped(ctx) - } - panic("unimplemented") -} +func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") } // TxnSenderFactoryFunc is an adapter to allow the use of ordinary functions // as TxnSenderFactories. This is a helper mechanism to facilitate testing. diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 18924d8764b0..e87e0e5a5b26 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -972,12 +972,6 @@ func (txn *Txn) Send( // begin transaction request before the first write command and update // transaction state accordingly. if needBeginTxn { - // Unless it's a 1PC, ask the TxnCoordSender to track the transaction. - if txn.mu.state == txnReadOnly && !haveEndTxn { - if err := txn.mu.sender.StartTracking(ctx); err != nil { - return roachpb.NewError(err) - } - } // We're about to send a BeginTxn, so move to the Writing state. txn.mu.state = txnWriting // From now on, all requests need to be checked against the AbortCache on diff --git a/pkg/internal/client/txn_test.go b/pkg/internal/client/txn_test.go index 1bc64b2e14b1..5c5484df1b26 100644 --- a/pkg/internal/client/txn_test.go +++ b/pkg/internal/client/txn_test.go @@ -87,9 +87,8 @@ func newTestTxnFactory( createReply func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error), ) TxnSenderFactoryFunc { return TxnSenderFactoryFunc(func(TxnType) TxnSender { - return TxnSenderAdapter{ - StartTrackingWrapped: func(context.Context) error { return nil }, - Wrapped: func(_ context.Context, ba roachpb.BatchRequest, + return TxnSenderFunc( + func(_ context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { if ba.UserPriority == 0 { ba.UserPriority = 1 @@ -137,7 +136,7 @@ func newTestTxnFactory( } return br, pErr }, - } + ) }) } diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index c0649a6df1c8..e5e0cac4350a 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -83,20 +83,45 @@ const ( aborted ) -// A TxnCoordSender is an implementation of client.Sender which wraps -// a lower-level Sender (either a storage.Stores or a DistSender) to -// which it sends commands. It acts as a man-in-the-middle, -// coordinating transaction state for clients. Unlike other senders, -// the TxnCoordSender is stateful and holds information about an -// ongoing transaction. Among other things, it records the intent -// spans of keys mutated by the transaction for later -// resolution. +// A TxnCoordSender is the production implementation of client.TxnSender. It is +// a Sender which wraps a lower-level Sender (a DistSender) to which it sends +// commands. It works on behalf of the client to keep a transaction's state +// (e.g. intents) and to perform periodic heartbeating of the transaction +// required when necessary. Unlike other senders, TxnCoordSender is not a +// singleton - an instance is created for every transaction by the +// TxnCoordSenderFactory. // -// After a transaction has begun writing, the TxnCoordSender may start -// sending periodic heartbeat messages to that transaction's txn -// record, to keep it live. Note that heartbeating is done only from -// the root transaction coordinator, in the event that multiple +// Among the functions it performs are: +// - Heartbeating of the transaction record. Note that heartbeating is done only +// from the root transaction coordinator, in the event that multiple // coordinators are active (i.e. in a distributed SQL flow). +// - Accumulating intent spans. +// - Attaching intent spans to EndTransaction requests, for intent cleanup. +// - Handles retriable errors by either bumping the transaction's epoch or, in +// case of TransactionAbortedErrors, cleaning up the transaction (in this case, +// the client.Txn is expected to create a new TxnCoordSender instance +// transparently for the higher-level client). +// - Ensures atomic execution for non-transactional (write) batches by transparently +// wrapping them in transactions when the DistSender is forced to split them for +// multiple ranges. For this reason, generally even non-transactional batches +// need to be sent through a TxnCoordSender. +// +// Since it is stateful, the TxnCoordSender needs to understand when a +// transaction is "finished" and the state can be destroyed. As such there's a +// contract that the client.Txn needs obey. Read-only transactions don't matter +// - they're stateless. For the others, once a BeginTransaction is sent by the +// client, the TxnCoordSender considers the transactions completed in the +// following situations: +// - A batch containing an EndTransactions (commit or rollback) succeeds. +// - A batch containing an EndTransaction(commit=false) succeeds or fails. I.e. +// nothing is expected to follow a rollback attempt. +// - A batch returns a TransactionAbortedError. As mentioned above, the client +// is expected to create a new TxnCoordSender for the next transaction attempt. +// +// Note that "1PC" batches (i.e. batches containing both a Begin and an +// EndTransaction) are no exception from the contract - if the batch fails, the +// client is expected to send a rollback (or perform another transaction attempt +// in case of retriable errors). type TxnCoordSender struct { mu struct { syncutil.Mutex @@ -127,7 +152,7 @@ type TxnCoordSender struct { // transaction was instantiated. firstUpdateNanos int64 // txnEnd is closed when the transaction is aborted or committed, - // terminating the associated heartbeat instance. + // terminating the heartbeat loop. txnEnd chan struct{} // state indicates the state of the transaction coordinator, which // may briefly diverge from the state of the transaction record if @@ -175,6 +200,9 @@ var ( metaCommitsRates = metric.Metadata{ Name: "txn.commits", Help: "Number of committed KV transactions (including 1PC)"} + // NOTE: The 1PC rate is arguably not accurate because it counts batches + // containing both BeginTransaction and EndTransaction without caring if the + // DistSender had to split it for touching multiple ranges. metaCommits1PCRates = metric.Metadata{ Name: "txn.commits1PC", Help: "Number of committed one-phase KV transactions"} @@ -412,6 +440,23 @@ func (tc *TxnCoordSender) Send( txnIDStr := txnID.String() sp.SetBaggageItem("txnID", txnIDStr) + _, hasBegin := ba.GetArg(roachpb.BeginTransaction) + if hasBegin { + // If there's a BeginTransaction, we need to start the heartbeat loop and + // intent tracking. + // Perhaps surprisingly, this needs to be done even if the batch has both + // a BeginTransaction and an EndTransaction. Although on batch success the + // heartbeat loop will be stopped right away, on error we might need both + // the intents and the heartbeat loop: + // - on retriable error, we need to keep around the intents for cleanup in + // subsequent epochs. + // - on non-retriable error, we need to keep around the intents as the + // client is expected to send an EndTransaction(commit=false) to cleanup. + if err := tc.startTracking(ctx); err != nil { + return nil, roachpb.NewError(err) + } + } + var et *roachpb.EndTransactionRequest var hasET bool { @@ -568,6 +613,11 @@ func (tc *TxnCoordSender) Send( if br.Txn.Status != roachpb.PENDING { tc.mu.Lock() tc.mu.meta.Txn = br.Txn.Clone() + _, hasBT := ba.GetArg(roachpb.BeginTransaction) + onePC := br.Txn.Status == roachpb.COMMITTED && hasBT + if onePC { + tc.metrics.Commits1PC.Inc(1) + } tc.cleanupTxnLocked(ctx, done) tc.mu.Unlock() } @@ -975,13 +1025,12 @@ func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context) { defer func() { tc.mu.Lock() if tc.mu.txnEnd != nil { - close(tc.mu.txnEnd) tc.mu.txnEnd = nil } duration, restarts, status := tc.finalTxnStatsLocked() tc.mu.tracking = false tc.mu.Unlock() - tc.updateStats(duration, restarts, status, false) + tc.updateStats(duration, restarts, status) }() var closer <-chan struct{} @@ -1183,8 +1232,8 @@ func (tc *TxnCoordSender) heartbeat(ctx context.Context) bool { return true } -// StartTracking is part of the client.TxnSender interface. -func (tc *TxnCoordSender) StartTracking(ctx context.Context) error { +// startTracking starts a heartbeat loop and tracking of intents. +func (tc *TxnCoordSender) startTracking(ctx context.Context) error { tc.mu.Lock() defer tc.mu.Unlock() @@ -1210,7 +1259,7 @@ func (tc *TxnCoordSender) StartTracking(ctx context.Context) error { // In principle, we can relax this as needed though. tc.cleanupTxnLocked(ctx, aborted) duration, restarts, status := tc.finalTxnStatsLocked() - tc.updateStats(duration, restarts, status, false /* onePC */) + tc.updateStats(duration, restarts, status) return err } return nil @@ -1376,16 +1425,6 @@ func (tc *TxnCoordSender) updateState( // intents blocking concurrent writers for extended periods of time. // See #3346. tc.appendAndCondenseIntentsLocked(ctx, ba, br) - } else { - // If this was a successful one phase commit, update stats - // directly as they won't otherwise be updated on heartbeat - // loop shutdown. - _, isBeginning := ba.GetArg(roachpb.BeginTransaction) - _, isEnding := ba.GetArg(roachpb.EndTransaction) - if pErr == nil && isBeginning && isEnding { - etArgs, ok := br.Responses[len(br.Responses)-1].GetInner().(*roachpb.EndTransactionResponse) - tc.updateStats(tc.clock.PhysicalNow()-startNS, 0, newTxn.Status, ok && etArgs.OnePhaseCommit) - } } // Update our record of this transaction, even on error. @@ -1443,9 +1482,7 @@ func (tc *TxnCoordSender) resendWithTxn( } // updateStats updates transaction metrics after a transaction finishes. -func (tc *TxnCoordSender) updateStats( - duration, restarts int64, status roachpb.TransactionStatus, onePC bool, -) { +func (tc *TxnCoordSender) updateStats(duration, restarts int64, status roachpb.TransactionStatus) { tc.metrics.Durations.RecordValue(duration) tc.metrics.Restarts.RecordValue(restarts) switch status { @@ -1455,8 +1492,5 @@ func (tc *TxnCoordSender) updateStats( tc.metrics.Abandons.Inc(1) case roachpb.COMMITTED: tc.metrics.Commits.Inc(1) - if onePC { - tc.metrics.Commits1PC.Inc(1) - } } } diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index 6c121bc2c28c..216652409007 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -1150,50 +1150,13 @@ func TestTxnMultipleCoord(t *testing.T) { } } -// TestTxnCoordSenderSingleRoundtripTxn checks that a batch which completely -// holds the writing portion of a Txn (including EndTransaction) does not -// launch a heartbeat goroutine at all. -func TestTxnCoordSenderSingleRoundtripTxn(t *testing.T) { - defer leaktest.AfterTest(t)() - stopper := stop.NewStopper() - manual := hlc.NewManualClock(123) - clock := hlc.NewClock(manual.UnixNano, 20*time.Nanosecond) - - var senderFn client.SenderFunc = func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - br := ba.CreateReply() - txnClone := ba.Txn.Clone() - br.Txn = &txnClone - br.Txn.Writing = true - return br, nil - } - ambient := log.AmbientContext{Tracer: tracing.NewTracer()} - factory := NewTxnCoordSenderFactory( - ambient, cluster.MakeTestingClusterSettings(), - senderFn, clock, false, stopper, MakeTxnMetrics(metric.TestSampleInterval), - ) - - // Stop the stopper manually, prior to trying the transaction. This has the - // effect of returning a NodeUnavailableError for any attempts at launching - // a heartbeat goroutine. - stopper.Stop(context.TODO()) - - var ba roachpb.BatchRequest - key := roachpb.Key("test") - ba.Add(&roachpb.BeginTransactionRequest{RequestHeader: roachpb.RequestHeader{Key: key}}) - ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: key}}) - ba.Add(&roachpb.EndTransactionRequest{}) - txn := roachpb.MakeTransaction("test", key, 0, 0, clock.Now(), 0) - tc := factory.New(client.RootTxn, &txn) - ba.Txn = &txn - _, pErr := tc.Send(context.Background(), ba) - if pErr != nil { - t.Fatal(pErr) - } -} - // TestTxnCoordSenderErrorWithIntent validates that if a transactional request // returns an error but also indicates a Writing transaction, the coordinator // tracks it just like a successful request. +// +// Note(andrei): This test was written at a time when the Writing status +// returned by the server mattered for the client. As of June 2018, that's no +// longer the case. The test doesn't hurt, though. func TestTxnCoordSenderErrorWithIntent(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() @@ -1452,7 +1415,7 @@ func TestTxnOnePhaseCommit(t *testing.T) { if !bytes.Equal(val, value) { t.Fatalf("expected: %s, got: %s", value, val) } - checkTxnMetrics(t, metrics, "commit 1PC txn", 1, 1 /* 1PC */, 0, 0, 0) + checkTxnMetrics(t, metrics, "commit 1PC txn", 1 /* commits */, 1 /* 1PC */, 0, 0, 0) } // createTimeoutDB returns a DB whose txns are considered abandoned by the @@ -1851,6 +1814,9 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { } } +// mockSender is a client.Sender implementation that passes requests to a list +// of provided matchers, in sequence. The first matcher that returns either a +// response or an error is used to provide the result for the request. type mockSender struct { matchers []matcher } @@ -1859,10 +1825,12 @@ var _ client.Sender = &mockSender{} type matcher func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) +// match adds a matcher to the list of matchers. func (s *mockSender) match(m matcher) { s.matchers = append(s.matchers, m) } +// Send implements the client.Sender interface. func (s *mockSender) Send( _ context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { @@ -1924,10 +1892,106 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) { if _, pErr := client.SendWrappedWith( ctx, txn, txnHeader, - &roachpb.EndTransactionRequest{Commit: false}); !testutils.IsPError(pErr, "injected err") { + &roachpb.EndTransactionRequest{Commit: false}, + ); !testutils.IsPError(pErr, "injected err") { + t.Fatal(pErr) + } + + testutils.SucceedsSoon(t, func() error { + if txn.Sender().(*TxnCoordSender).IsTracking() { + return fmt.Errorf("still tracking") + } + return nil + }) +} + +// Test that intent tracking and the heartbeat loop behave correctly for +// transactions that attempt to run a batch containing both a BeginTransaction +// and an EndTransaction. Since in case of an error it's not easy to determine +// whether any intents have been laid down (i.e. in case the batch was split by +// the DistSender and then there was mixed success for the sub-batches, or in +// case a retriable error is returned), the test verifies that the heartbeat +// loop is still going after the error and that the intents are properly tracked +// and attached to a subsequent EndTransaction. +// There was a time when the TxnCoordSender was cutting corners for these +// wannabe 1PC batches, but that proved to be a bad idea. +func TestOnePCErrorTracking(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ambient := log.AmbientContext{Tracer: tracing.NewTracer()} + sender := &mockSender{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + factory := NewTxnCoordSenderFactory( + ambient, + cluster.MakeTestingClusterSettings(), + sender, + clock, + false, /* linearizable */ + stopper, + MakeTxnMetrics(metric.TestSampleInterval), + ) + db := client.NewDB(factory, clock) + var key = roachpb.Key("a") + + // Register a matcher catching the commit attempt. + sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if et, ok := ba.GetArg(roachpb.EndTransaction); !ok { + return nil, nil + } else if !et.(*roachpb.EndTransactionRequest).Commit { + return nil, nil + } + return nil, roachpb.NewErrorf("injected err") + }) + // Register a matcher catching the rollback attempt. + sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + et, ok := ba.GetArg(roachpb.EndTransaction) + if !ok { + return nil, nil + } + etReq := et.(*roachpb.EndTransactionRequest) + if etReq.Commit { + return nil, nil + } + expIntents := []roachpb.Span{{Key: key}} + intents := etReq.IntentSpans + if !reflect.DeepEqual(intents, expIntents) { + return nil, roachpb.NewErrorf("expected intents %s, got: %s", expIntents, intents) + } + resp := ba.CreateReply() + // Set the response's txn to the Aborted status (as the server would). This + // will make the TxnCoordSender stop the heartbeat loop. + txnCopy := ba.Txn.Clone() + resp.Txn = &txnCopy + resp.Txn.Status = roachpb.ABORTED + return resp, nil + }) + + txn := client.NewTxn(db, roachpb.NodeID(1), client.RootTxn) + txnHeader := roachpb.Header{ + Txn: txn.Proto(), + } + b := txn.NewBatch() + b.Put(key, "test value") + if err := txn.CommitInBatch(ctx, b); !testutils.IsError(err, "injected err") { + t.Fatal(err) + } + + if !txn.Sender().(*TxnCoordSender).IsTracking() { + t.Fatalf("expected TxnCoordSender to be tracking after the write") + } + + // Now send a rollback and verify that the TxnCoordSender attaches the intent + // to it. + if _, pErr := client.SendWrappedWith( + ctx, txn, txnHeader, + &roachpb.EndTransactionRequest{Commit: false}, + ); pErr != nil { t.Fatal(pErr) } + // As always, check that the rollback we just sent stops the heartbeat loop. testutils.SucceedsSoon(t, func() error { if txn.Sender().(*TxnCoordSender).IsTracking() { return fmt.Errorf("still tracking") diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index c48f3fec95d6..fd328b1db426 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -57,11 +57,11 @@ func makeTestContext() testContext { manual := hlc.NewManualClock(123) clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender { - return client.TxnSenderAdapter{ - Wrapped: func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + return client.TxnSenderFunc( + func(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { return nil, nil }, - } + ) }) settings := cluster.MakeTestingClusterSettings() diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 12262eea6631..43325562f4d4 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -97,8 +97,6 @@ func (db *testSender) AugmentMeta(context.Context, roachpb.TxnCoordMeta) { panic func (db *testSender) OnFinish(func(error)) { panic("unimplemented") } -func (db *testSender) StartTracking(context.Context) error { panic("unimplemented") } - // Send forwards the call to the single store. This is a poor man's // version of kv.TxnCoordSender, but it serves the purposes of // supporting tests in this package. Transactions are not supported.