From 12eec9aaa8f14fd345ea936a9b410cb534aff1b3 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Thu, 14 Jun 2018 20:11:05 -0400 Subject: [PATCH] kv, client: don't send non-txn requests through the TxnCoordSender any more We were sending them through the TCS because the TCS was in charge of wrapping them in a Txn and retrying if the batch spanned requests (cause batches need to be atomic and you can only get that cross-range in txns). But that's nasty. The TCS is littered with checks about whether a request is transactional or not, and the code to do the wrapped retry did not belong there anyway. This patch moves the wrapping/retry in a new Sender under the client.DB. Now non-txn requests go through that and then straight to the DistSender. Release note: None --- pkg/ccl/backupccl/backup.go | 4 +- pkg/ccl/backupccl/restore.go | 6 +- pkg/ccl/changefeedccl/changefeed.go | 2 +- pkg/ccl/importccl/sst_writer_proc.go | 2 +- pkg/ccl/storageccl/bench_test.go | 2 +- pkg/ccl/storageccl/export_test.go | 4 +- pkg/internal/client/client_test.go | 18 ++-- pkg/internal/client/db.go | 85 ++++++++++++--- pkg/internal/client/sender.go | 42 ++++++-- pkg/internal/client/txn.go | 4 +- pkg/kv/txn_coord_sender.go | 131 +++++++++++------------- pkg/kv/txn_coord_sender_server_test.go | 2 +- pkg/kv/txn_coord_sender_test.go | 4 +- pkg/kv/txn_interceptor_batch_wrapper.go | 91 ---------------- pkg/server/server_test.go | 70 +++++++------ pkg/server/testserver.go | 6 +- pkg/sql/scatter.go | 2 +- pkg/storage/client_raft_test.go | 2 +- pkg/storage/client_replica_test.go | 8 +- pkg/storage/client_split_test.go | 8 +- pkg/storage/replica_gc_queue.go | 2 +- pkg/storage/replica_test.go | 4 +- pkg/storage/store_test.go | 25 ++++- 23 files changed, 253 insertions(+), 271 deletions(-) delete mode 100644 pkg/kv/txn_interceptor_batch_wrapper.go diff --git a/pkg/ccl/backupccl/backup.go b/pkg/ccl/backupccl/backup.go index 4ae888cd600c..545c4aba9071 100644 --- a/pkg/ccl/backupccl/backup.go +++ b/pkg/ccl/backupccl/backup.go @@ -676,7 +676,7 @@ func backup( StartTime: span.start, MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter), } - rawRes, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req) + rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) if pErr != nil { return pErr.GoError() } @@ -1198,7 +1198,7 @@ func getAllRevisions( MVCCFilter: roachpb.MVCCFilter_All, ReturnSST: true, } - resp, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req) + resp, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) if pErr != nil { return nil, pErr.GoError() } diff --git a/pkg/ccl/backupccl/restore.go b/pkg/ccl/backupccl/restore.go index a591e342b27a..b12f971843c7 100644 --- a/pkg/ccl/backupccl/restore.go +++ b/pkg/ccl/backupccl/restore.go @@ -800,7 +800,7 @@ func splitAndScatter( scatterReq := &roachpb.AdminScatterRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(chunkSpan), } - if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil { + if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil { // TODO(dan): Unfortunately, Scatter is still too unreliable to // fail the RESTORE when Scatter fails. I'm uncomfortable that // this could break entirely and not start failing the tests, @@ -844,7 +844,7 @@ func splitAndScatter( scatterReq := &roachpb.AdminScatterRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(newSpan), } - if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil { + if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil { // TODO(dan): Unfortunately, Scatter is still too unreliable to // fail the RESTORE when Scatter fails. I'm uncomfortable that // this could break entirely and not start failing the tests, @@ -1126,7 +1126,7 @@ func restore( defer tracing.FinishSpan(importSpan) defer func() { <-importsSem }() - importRes, pErr := client.SendWrapped(ctx, db.GetSender(), importRequest) + importRes, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), importRequest) if pErr != nil { return pErr.GoError() } diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 67a0f989caf4..858c7f179908 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -139,7 +139,7 @@ func runChangefeedFlow( func exportRequestPoll( execCfg *sql.ExecutorConfig, details jobspb.ChangefeedDetails, progress jobspb.ChangefeedProgress, ) func(context.Context) (changedKVs, error) { - sender := execCfg.DB.GetSender() + sender := execCfg.DB.NonTransactionalSender() var spans []roachpb.Span for _, tableDesc := range details.TableDescs { spans = append(spans, tableDesc.PrimaryIndexSpan()) diff --git a/pkg/ccl/importccl/sst_writer_proc.go b/pkg/ccl/importccl/sst_writer_proc.go index 67733a15c851..f25ed099d8f1 100644 --- a/pkg/ccl/importccl/sst_writer_proc.go +++ b/pkg/ccl/importccl/sst_writer_proc.go @@ -182,7 +182,7 @@ func (sp *sstWriter) Run(ctx context.Context, wg *sync.WaitGroup) { scatterReq := &roachpb.AdminScatterRequest{ RequestHeader: roachpb.RequestHeaderFromSpan(sst.span), } - if _, pErr := client.SendWrapped(ctx, sp.db.GetSender(), scatterReq); pErr != nil { + if _, pErr := client.SendWrapped(ctx, sp.db.NonTransactionalSender(), scatterReq); pErr != nil { // TODO(dan): Unfortunately, Scatter is still too unreliable to // fail the IMPORT when Scatter fails. I'm uncomfortable that // this could break entirely and not start failing the tests, diff --git a/pkg/ccl/storageccl/bench_test.go b/pkg/ccl/storageccl/bench_test.go index 9c1357022f0c..58c6709607da 100644 --- a/pkg/ccl/storageccl/bench_test.go +++ b/pkg/ccl/storageccl/bench_test.go @@ -204,7 +204,7 @@ func BenchmarkImport(b *testing.B) { Files: files, Rekeys: rekeys, } - res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req) + res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) if pErr != nil { b.Fatalf("%+v", pErr.GoError()) } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 0d2ead58d1be..e09eb8ddf2ab 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -51,7 +51,7 @@ func TestExportCmd(t *testing.T) { MVCCFilter: mvccFilter, ReturnSST: true, } - res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req) + res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) if pErr != nil { t.Fatalf("%+v", pErr) } @@ -185,7 +185,7 @@ func TestExportGCThreshold(t *testing.T) { RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey}, StartTime: hlc.Timestamp{WallTime: -1}, } - _, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req) + _, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req) if !testutils.IsPError(pErr, "must be after replica GC threshold") { t.Fatalf(`expected "must be after replica GC threshold" error got: %+v`, pErr) } diff --git a/pkg/internal/client/client_test.go b/pkg/internal/client/client_test.go index b70feeac7849..31b2102e4759 100644 --- a/pkg/internal/client/client_test.go +++ b/pkg/internal/client/client_test.go @@ -798,16 +798,14 @@ func TestReadConsistencyTypes(t *testing.T) { t.Run(rc.String(), func(t *testing.T) { // Mock out DistSender's sender function to check the read consistency for // outgoing BatchRequests and return an empty reply. - factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender { - return client.TxnSenderFunc( - func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { - if ba.ReadConsistency != rc { - return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency) - } - return ba.CreateReply(), nil - }, - ) - }) + factory := client.NonTransactionalFactoryFunc( + func(_ context.Context, ba roachpb.BatchRequest, + ) (*roachpb.BatchResponse, *roachpb.Error) { + if ba.ReadConsistency != rc { + return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency) + } + return ba.CreateReply(), nil + }) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock) diff --git a/pkg/internal/client/db.go b/pkg/internal/client/db.go index 8da4bccb76c4..61f7ed4a2e54 100644 --- a/pkg/internal/client/db.go +++ b/pkg/internal/client/db.go @@ -187,6 +187,58 @@ func DefaultDBContext() DBContext { } } +// CrossRangeTxnWrapperSender is a Sender whose purpose is to wrap +// non-transactional requests that span ranges into a transaction so they can +// execute atomically. +// +// TODO(andrei, bdarnell): This is a wart. Our semantics are that batches are +// atomic, but there's only historical reason for that. We should disallow +// non-transactional batches and scans, forcing people to use transactions +// instead. And then this Sender can go away. +type CrossRangeTxnWrapperSender struct { + db *DB + wrapped Sender +} + +var _ Sender = &CrossRangeTxnWrapperSender{} + +// Send implements the Sender interface. +func (s *CrossRangeTxnWrapperSender) Send( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + if ba.Txn != nil { + log.Fatalf(ctx, "CrossRangeTxnWrapperSender can't handle transactional requests") + } + + br, pErr := s.wrapped.Send(ctx, ba) + if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); !ok { + return br, pErr + } + + err := s.db.Txn(ctx, func(ctx context.Context, txn *Txn) error { + txn.SetDebugName("auto-wrap") + b := txn.NewBatch() + b.Header = ba.Header + for _, arg := range ba.Requests { + req := arg.GetInner().ShallowCopy() + b.AddRawRequest(req) + } + err := txn.CommitInBatch(ctx, b) + br = b.RawResponse() + return err + }) + if err != nil { + return nil, roachpb.NewError(err) + } + br.Txn = nil // hide the evidence + return br, nil +} + +// Wrapped returns the wrapped sender. +func (s *CrossRangeTxnWrapperSender) Wrapped() Sender { + return s.wrapped +} + // DB is a database handle to a single cockroach cluster. A DB is safe for // concurrent use by multiple goroutines. type DB struct { @@ -195,24 +247,18 @@ type DB struct { factory TxnSenderFactory clock *hlc.Clock ctx DBContext + // crs is the sender used for non-transactional requests. + crs CrossRangeTxnWrapperSender } -// GetSender returns a Sender that can be used to send requests through. -// Note that a new Sender created; it is not shared. +// NonTransactionalSender returns a Sender that can be used for sending +// non-transactional requests. The Sender is capable of transparently wrapping +// non-transactional requests that span ranges in transactions. // -// The Sender returned should not be used for sending transactional requests. -// Use db.Txn() or db.NewTxn() for that. -func (db *DB) GetSender() Sender { - // We pass nil for the txn here because we don't have a txn on hand. - // That's why this method says to not use the Sender for transactional - // requests, plus the fact that if a Sender is used directly, the caller needs - // to be mindful of the need to start a heartbeat loop when writing. - // - // Note that even non-transactional requests need to go through a - // TxnCoordSender because batches that get split need to be wrapped in - // transactions (and the TxnCoordSender handles that). So we can't simply - // return the wrapped handler here. - return db.factory.New(RootTxn, nil /* txn */) +// The Sender returned should not be used for sending transactional requests - +// it bypasses the TxnCoordSender. Use db.Txn() or db.NewTxn() for transactions. +func (db *DB) NonTransactionalSender() Sender { + return &db.crs } // GetFactory returns the DB's TxnSenderFactory. @@ -232,12 +278,17 @@ func NewDBWithContext( if actx.Tracer == nil { panic("no tracer set in AmbientCtx") } - return &DB{ + db := &DB{ AmbientContext: actx, factory: factory, clock: clock, ctx: ctx, + crs: CrossRangeTxnWrapperSender{ + wrapped: factory.NonTransactionalSender(), + }, } + db.crs.db = db + return db } // Get retrieves the value for a key, returning the retrieved key/value or an @@ -545,7 +596,7 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err func (db *DB) send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { - return db.sendUsingSender(ctx, ba, db.GetSender()) + return db.sendUsingSender(ctx, ba, db.NonTransactionalSender()) } // sendUsingSender uses the specified sender to send the batch request. diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 107346c58a78..252003494a02 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -83,15 +83,15 @@ type TxnSender interface { // TxnSenderFactory is the interface used to create new instances // of TxnSender. type TxnSenderFactory interface { - // New returns a new instance of TxnSender. + // TransactionalSender returns a sender to be used for transactional requests. // typ specifies whether the sender is the root or one of potentially many // child "leaf" nodes in a tree of transaction objects, as is created during a // DistSQL flow. - // txn is the transaction whose requests this sender will carry. It can be nil - // if the sender will not be used for transactional requests. - New(typ TxnType, txn *roachpb.Transaction) TxnSender - // WrappedSender returns the TxnSenderFactory's wrapped Sender. - WrappedSender() Sender + // txn is the transaction whose requests this sender will carry. + TransactionalSender(typ TxnType, txn *roachpb.Transaction) TxnSender + // NonTransactionalSender returns a sender to be used for non-transactional + // requests. Generally this is a sender that TransactionalSender() wraps. + NonTransactionalSender() Sender } // SenderFunc is an adapter to allow the use of ordinary functions @@ -132,14 +132,34 @@ func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") } // as TxnSenderFactories. This is a helper mechanism to facilitate testing. type TxnSenderFactoryFunc func(TxnType) TxnSender -// New calls f(). -func (f TxnSenderFactoryFunc) New(typ TxnType, _ *roachpb.Transaction) TxnSender { +var _ TxnSenderFactory = TxnSenderFactoryFunc(nil) + +// TransactionalSender is part of TxnSenderFactory. +func (f TxnSenderFactoryFunc) TransactionalSender(typ TxnType, _ *roachpb.Transaction) TxnSender { return f(typ) } -// WrappedSender is not implemented for TxnSenderFactoryFunc. -func (f TxnSenderFactoryFunc) WrappedSender() Sender { - panic("unimplemented") +// NonTransactionalSender is part of TxnSenderFactory. +func (f TxnSenderFactoryFunc) NonTransactionalSender() Sender { + return nil +} + +// NonTransactionalFactoryFunc is a TxnSenderFactory that cannot, in fact, +// create any transactional senders, only non-transactional ones. +type NonTransactionalFactoryFunc SenderFunc + +var _ TxnSenderFactory = NonTransactionalFactoryFunc(nil) + +// TransactionalSender is part of the TxnSenderFactory. +func (f NonTransactionalFactoryFunc) TransactionalSender( + typ TxnType, _ *roachpb.Transaction, +) TxnSender { + panic("not supported ") +} + +// NonTransactionalSender is part of the TxnSenderFactory. +func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender { + return SenderFunc(f) } // SendWrappedWith is a convenience function which wraps the request in a batch diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index d1923d2182dc..a1407d1e9313 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -176,7 +176,7 @@ func NewTxnWithProto( proto.AssertInitialized(context.TODO()) txn := &Txn{db: db, typ: typ, gatewayNodeID: gatewayNodeID} txn.mu.Proto = proto - txn.mu.sender = db.factory.New(typ, &proto) + txn.mu.sender = db.factory.TransactionalSender(typ, &proto) return txn } @@ -1258,7 +1258,7 @@ func (txn *Txn) updateStateOnRetryableErrLocked( txn.mu.state = txnReadOnly // Create a new txn sender. - txn.mu.sender = txn.db.factory.New(txn.typ, newTxn) + txn.mu.sender = txn.db.factory.TransactionalSender(txn.typ, newTxn) } else { // Update the transaction proto with the one to be used for the next // attempt. The txn inside pErr was correctly prepared for this by diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index 21a7ea37a16b..fa8f5db1425c 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -142,11 +142,10 @@ type TxnCoordSender struct { // is embedded in the interceptorAlloc struct, so the entire stack is // allocated together with TxnCoordSender without any additional heap // allocations necessary. - interceptorStack [3]txnInterceptor + interceptorStack [2]txnInterceptor interceptorAlloc struct { txnIntentCollector txnSpanRefresher - txnBatchWrapper txnLockGatekeeper // not in interceptorStack array. } @@ -355,14 +354,15 @@ func NewTxnCoordSenderFactory( return tcf } -// New is part of the TxnSenderFactory interface. -func (tcf *TxnCoordSenderFactory) New( +// TransactionalSender is part of the TxnSenderFactory interface. +func (tcf *TxnCoordSenderFactory) TransactionalSender( typ client.TxnType, txn *roachpb.Transaction, ) client.TxnSender { tcs := &TxnCoordSender{ typ: typ, TxnCoordSenderFactory: tcf, } + tcs.mu.txn = txn.Clone() // Create a stack of request/response interceptors. All of the objects in // this stack are pre-allocated on the TxnCoordSender struct, so this just @@ -388,9 +388,6 @@ func (tcf *TxnCoordSenderFactory) New( canAutoRetry: typ == client.RootTxn, autoRetryCounter: tcs.metrics.AutoRetries, } - tcs.interceptorAlloc.txnBatchWrapper = txnBatchWrapper{ - tcf: tcf, - } tcs.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{ mu: &tcs.mu, wrapped: tcs.wrapped, @@ -398,7 +395,6 @@ func (tcf *TxnCoordSenderFactory) New( tcs.interceptorStack = [...]txnInterceptor{ &tcs.interceptorAlloc.txnIntentCollector, &tcs.interceptorAlloc.txnSpanRefresher, - &tcs.interceptorAlloc.txnBatchWrapper, } for i, reqInt := range tcs.interceptorStack { if i < len(tcs.interceptorStack)-1 { @@ -408,21 +404,11 @@ func (tcf *TxnCoordSenderFactory) New( } } - // If a transaction was passed in bind the TxnCoordSender to it. - // TODO(andrei): Ideally, if a transaction is not passed it, we should take - // that to mean that a TxnCoordSender is not needed and we should return the - // wrapped sender directly. However, there are tests that pass nil and still - // send transactional requests. That's why the TxnCoordSender is still - // littered with code handling the case where it is not yet bound to a - // transaction. - if txn != nil { - tcs.mu.txn = txn.Clone() - } return tcs } -// WrappedSender is part of the TxnSenderFactory interface. -func (tcf *TxnCoordSenderFactory) WrappedSender() client.Sender { +// NonTransactionalSender is part of the TxnSenderFactory interface. +func (tcf *TxnCoordSenderFactory) NonTransactionalSender() client.Sender { return tcf.wrapped } @@ -481,6 +467,11 @@ func (tc *TxnCoordSender) OnFinish(onFinishFn func(error)) { func (tc *TxnCoordSender) Send( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { + + if ba.Txn == nil { + log.Fatalf(ctx, "can't send non-transactional request through a TxnCoordSender: %s", ba) + } + ctx = tc.AnnotateCtx(ctx) // Start new or pick up active trace. From here on, there's always an active @@ -496,62 +487,61 @@ func (tc *TxnCoordSender) Send( tc.mu.Lock() defer tc.mu.Unlock() - if ba.Txn != nil { - if tc.mu.txn.ID == (uuid.UUID{}) { - log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender") - } - ctx = log.WithLogTag(ctx, "txn", uuid.ShortStringer(ba.Txn.ID)) - if log.V(2) { - ctx = log.WithLogTag(ctx, "ts", ba.Txn.Timestamp) - } + if tc.mu.txn.ID == (uuid.UUID{}) { + log.Fatalf(ctx, "cannot send transactional request through unbound TxnCoordSender") + } - // If this request is part of a transaction... - if err := tc.validateTxnForBatchLocked(ctx, &ba); err != nil { - return nil, roachpb.NewError(err) - } + ctx = log.WithLogTag(ctx, "txn", uuid.ShortStringer(ba.Txn.ID)) + if log.V(2) { + ctx = log.WithLogTag(ctx, "ts", ba.Txn.Timestamp) + } - // Associate the txnID with the trace. - txnIDStr := ba.Txn.ID.String() - sp.SetBaggageItem("txnID", txnIDStr) - - _, hasBegin := ba.GetArg(roachpb.BeginTransaction) - if hasBegin { - // If there's a BeginTransaction, we need to start the heartbeat loop. - // Perhaps surprisingly, this needs to be done even if the batch has both - // a BeginTransaction and an EndTransaction. Although on batch success the - // heartbeat loop will be stopped right away, on retriable errors we need the - // heartbeat loop: the intents and txn record should be kept in place just - // like for non-1PC txns. - if err := tc.startHeartbeatLoopLocked(ctx); err != nil { - return nil, roachpb.NewError(err) - } - } + // If this request is part of a transaction... + if err := tc.validateTxnForBatchLocked(ctx, &ba); err != nil { + return nil, roachpb.NewError(err) + } - // Copy a few fields from the request's txn. This is technically only - // required during the first send, as these fields are set before - // the first send and can't change afterwards. Keeping these fields in - // sync between the TxnCoordSender and the client.Txn is needed because, - // when the TxnCoordSender synthesizes TransactionAbortedErrors, it - // creates a new proto that it passes to the client.Txn and then these - // fields are used when creating that proto that will then be used for the - // client.Txn. On subsequent retries of the transaction, it's important - // for the values of these fields to have been preserved because that - // makes future calls to txn.SetIsolation() and such no-ops. - // If this makes no sense it's because the TxnCoordSender having a copy of - // the Transaction proto generally makes no sense. - tc.mu.txn.Name = ba.Txn.Name - tc.mu.txn.Isolation = ba.Txn.Isolation - tc.mu.txn.Priority = ba.Txn.Priority - - if pErr := tc.maybeRejectClientLocked(ctx, ba.Txn.ID, &ba); pErr != nil { - return nil, pErr + // Associate the txnID with the trace. + txnIDStr := ba.Txn.ID.String() + sp.SetBaggageItem("txnID", txnIDStr) + + _, hasBegin := ba.GetArg(roachpb.BeginTransaction) + if hasBegin { + // If there's a BeginTransaction, we need to start the heartbeat loop. + // Perhaps surprisingly, this needs to be done even if the batch has both + // a BeginTransaction and an EndTransaction. Although on batch success the + // heartbeat loop will be stopped right away, on retriable errors we need the + // heartbeat loop: the intents and txn record should be kept in place just + // like for non-1PC txns. + if err := tc.startHeartbeatLoopLocked(ctx); err != nil { + return nil, roachpb.NewError(err) } + } - // Update the command count. - tc.mu.commandCount += int32(len(ba.Requests)) + // Copy a few fields from the request's txn. This is technically only + // required during the first send, as these fields are set before + // the first send and can't change afterwards. Keeping these fields in + // sync between the TxnCoordSender and the client.Txn is needed because, + // when the TxnCoordSender synthesizes TransactionAbortedErrors, it + // creates a new proto that it passes to the client.Txn and then these + // fields are used when creating that proto that will then be used for the + // client.Txn. On subsequent retries of the transaction, it's important + // for the values of these fields to have been preserved because that + // makes future calls to txn.SetIsolation() and such no-ops. + // If this makes no sense it's because the TxnCoordSender having a copy of + // the Transaction proto generally makes no sense. + tc.mu.txn.Name = ba.Txn.Name + tc.mu.txn.Isolation = ba.Txn.Isolation + tc.mu.txn.Priority = ba.Txn.Priority + + if pErr := tc.maybeRejectClientLocked(ctx, ba.Txn.ID, &ba); pErr != nil { + return nil, pErr } + // Update the command count. + tc.mu.commandCount += int32(len(ba.Requests)) + // Send the command through the txnInterceptor stack. br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba) pErr = tc.updateStateLocked(ctx, startNS, ba, br, pErr) @@ -937,11 +927,6 @@ func (tc *TxnCoordSender) updateStateLocked( pErr *roachpb.Error, ) *roachpb.Error { - if ba.Txn == nil { - // Not a transactional request. - return pErr - } - txnID := ba.Txn.ID var newTxn roachpb.Transaction if pErr == nil { diff --git a/pkg/kv/txn_coord_sender_server_test.go b/pkg/kv/txn_coord_sender_server_test.go index 648a8882b4bc..92dfc08ad71f 100644 --- a/pkg/kv/txn_coord_sender_server_test.go +++ b/pkg/kv/txn_coord_sender_server_test.go @@ -101,7 +101,7 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) { // Now check that the intent on key2 has been cleared. We'll do a // READ_UNCOMMITTED Get for that. testutils.SucceedsSoon(t, func() error { - reply, err := client.SendWrappedWith(ctx, origDB.GetSender(), roachpb.Header{ + reply, err := client.SendWrappedWith(ctx, origDB.NonTransactionalSender(), roachpb.Header{ ReadConsistency: roachpb.READ_UNCOMMITTED, }, roachpb.NewGet(key2)) if err != nil { diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index ee4f1f4b5645..8e5b4cbe6553 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -1030,7 +1030,7 @@ func TestTxnCoordSenderErrorWithIntent(t *testing.T) { ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: key}}) ba.Add(&roachpb.EndTransactionRequest{}) txn := roachpb.MakeTransaction("test", key, 0, 0, clock.Now(), 0) - tc := factory.New(client.RootTxn, &txn) + tc := factory.TransactionalSender(client.RootTxn, &txn) defer teardownHeartbeat(tc.(*TxnCoordSender)) ba.Txn = &txn _, pErr := tc.Send(context.Background(), ba) @@ -1701,7 +1701,7 @@ func TestIntentTrackingBeforeBeginTransaction(t *testing.T) { clock.Now(), clock.MaxOffset().Nanoseconds(), ) - tcs := factory.New(client.RootTxn, &txn) + tcs := factory.TransactionalSender(client.RootTxn, &txn) txnHeader := roachpb.Header{ Txn: &txn, } diff --git a/pkg/kv/txn_interceptor_batch_wrapper.go b/pkg/kv/txn_interceptor_batch_wrapper.go deleted file mode 100644 index 0073c3ad780d..000000000000 --- a/pkg/kv/txn_interceptor_batch_wrapper.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2018 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package kv - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/internal/client" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/util/log" -) - -// txnBatchWrapper is a txnInterceptor that catches requests that produced -// OpRequiresTxnErrors and re-runs them in the context of a transaction. -// -// TODO(tschottdorf): this handling is somewhat awkward but unless we want to -// give this error back to the client, our options are limited. We'll have to -// run the whole thing for them, or any restart will still end up at the client -// which will not be prepared to be handed a Txn. -// TODO(andrei): if we lifted the retry loop for non-transaction requests that -// hit OpRequiresTxnErrors into client.DB then we wouldn't have to send them -// through a TxnCoordSender at all. This would allow us to get rid of this -// interceptor. -type txnBatchWrapper struct { - wrapped lockedSender - tcf *TxnCoordSenderFactory -} - -// SendLocked implements the lockedSender interface. -func (bw *txnBatchWrapper) SendLocked( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - // Send through wrapped lockedSender. Unlocks while sending then re-locks. - br, pErr := bw.wrapped.SendLocked(ctx, ba) - if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); !ok { - return br, pErr - } - - // Run a one-off transaction with that single command. - log.VEventf(ctx, 2, "%s: auto-wrapping in txn and re-executing", ba) - // TODO(bdarnell): need to be able to pass other parts of DBContext - // through here. - dbCtx := client.DefaultDBContext() - dbCtx.UserPriority = ba.UserPriority - dbCtx.Stopper = bw.tcf.stopper - tmpDB := client.NewDBWithContext(bw.tcf.AmbientContext, bw.tcf, bw.tcf.clock, dbCtx) - err := tmpDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - txn.SetDebugName("auto-wrap") - b := txn.NewBatch() - b.Header = ba.Header - for _, arg := range ba.Requests { - req := arg.GetInner().ShallowCopy() - b.AddRawRequest(req) - } - err := txn.CommitInBatch(ctx, b) - br = b.RawResponse() - return err - }) - if err != nil { - return nil, roachpb.NewError(err) - } - br.Txn = nil // hide the evidence - return br, nil -} - -// setWrapped implements the txnInterceptor interface. -func (bw *txnBatchWrapper) setWrapped(wrapped lockedSender) { bw.wrapped = wrapped } - -// populateMetaLocked implements the txnInterceptor interface. -func (*txnBatchWrapper) populateMetaLocked(meta *roachpb.TxnCoordMeta) {} - -// augmentMetaLocked implements the txnInterceptor interface. -func (*txnBatchWrapper) augmentMetaLocked(meta roachpb.TxnCoordMeta) {} - -// epochBumpedLocked implements the txnInterceptor interface. -func (*txnBatchWrapper) epochBumpedLocked() {} - -// closeLocked implements the txnInterceptor interface. -func (*txnBatchWrapper) closeLocked() {} diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 19ae5d6b0b38..d0e05b0f5b6c 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -139,7 +139,7 @@ func TestServerStartClock(t *testing.T) { RequestHeader: roachpb.RequestHeader{Key: roachpb.Key("a")}, } if _, err := client.SendWrapped( - context.Background(), s.DB().GetSender(), get, + context.Background(), s.DB().NonTransactionalSender(), get, ); err != nil { t.Fatal(err) } @@ -285,7 +285,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.TODO()) ts := s.(*TestServer) - tds := db.GetSender() + tds := db.NonTransactionalSender() if err := ts.node.storeCfg.DB.AdminSplit(context.TODO(), "m", "m"); err != nil { t.Fatal(err) @@ -376,43 +376,47 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) { } for i, tc := range testCases { - s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.TODO()) - ts := s.(*TestServer) - tds := db.GetSender() - - for _, sk := range tc.splitKeys { - if err := ts.node.storeCfg.DB.AdminSplit(context.TODO(), sk, sk); err != nil { - t.Fatal(err) - } - } - - for _, k := range tc.keys { - put := roachpb.NewPut(k, roachpb.MakeValueFromBytes(k)) - if _, err := client.SendWrapped(context.Background(), tds, put); err != nil { - t.Fatal(err) + t.Run("", func(t *testing.T) { + ctx := context.Background() + s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + ts := s.(*TestServer) + tds := db.NonTransactionalSender() + + for _, sk := range tc.splitKeys { + if err := ts.node.storeCfg.DB.AdminSplit(ctx, sk, sk); err != nil { + t.Fatal(err) + } } - } - // Try every possible ScanRequest startKey. - for start := 0; start < len(tc.keys); start++ { - // Try every possible maxResults, from 1 to beyond the size of key array. - for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ { - scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next()) - reply, err := client.SendWrappedWith( - context.Background(), tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan, - ) - if err != nil { + for _, k := range tc.keys { + put := roachpb.NewPut(k, roachpb.MakeValueFromBytes(k)) + if _, err := client.SendWrapped(ctx, tds, put); err != nil { t.Fatal(err) } - rows := reply.(*roachpb.ScanResponse).Rows - if start+maxResults <= len(tc.keys) && len(rows) != maxResults { - t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows)) - } else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 { - t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows)) + } + + // Try every possible ScanRequest startKey. + for start := 0; start < len(tc.keys); start++ { + // Try every possible maxResults, from 1 to beyond the size of key array. + for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ { + scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next()) + reply, err := client.SendWrappedWith( + ctx, tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan, + ) + if err != nil { + t.Fatal(err) + } + rows := reply.(*roachpb.ScanResponse).Rows + if start+maxResults <= len(tc.keys) && len(rows) != maxResults { + t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows)) + } else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 { + t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows)) + } } } - } + + }) } } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 907d5cc4dcba..ee97ecb05410 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -619,7 +619,7 @@ func (ts *TestServer) GetFirstStoreID() roachpb.StoreID { // LookupRange returns the descriptor of the range containing key. func (ts *TestServer) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error) { - rs, _, err := client.RangeLookup(context.Background(), ts.DB().GetSender(), + rs, _, err := client.RangeLookup(context.Background(), ts.DB().NonTransactionalSender(), key, roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */) if err != nil { return roachpb.RangeDescriptor{}, errors.Errorf( @@ -649,7 +649,7 @@ func (ts *TestServer) SplitRange( }, SplitKey: splitKey, } - _, pErr := client.SendWrapped(ctx, ts.DB().GetSender(), &splitReq) + _, pErr := client.SendWrapped(ctx, ts.DB().NonTransactionalSender(), &splitReq) if pErr != nil { return roachpb.RangeDescriptor{}, roachpb.RangeDescriptor{}, errors.Errorf( @@ -735,7 +735,7 @@ func (ts *TestServer) GetRangeLease( } leaseResp, pErr := client.SendWrappedWith( ctx, - ts.DB().GetSender(), + ts.DB().NonTransactionalSender(), roachpb.Header{ // INCONSISTENT read, since we want to make sure that the node used to // send this is the one that processes the command, for the hint to diff --git a/pkg/sql/scatter.go b/pkg/sql/scatter.go index 8acd183506d8..3c36b23f0197 100644 --- a/pkg/sql/scatter.go +++ b/pkg/sql/scatter.go @@ -137,7 +137,7 @@ func (n *scatterNode) startExec(params runParams) error { RequestHeader: roachpb.RequestHeader{Key: n.run.span.Key, EndKey: n.run.span.EndKey}, RandomizeLeases: true, } - res, pErr := client.SendWrapped(params.ctx, db.GetSender(), req) + res, pErr := client.SendWrapped(params.ctx, db.NonTransactionalSender(), req) if pErr != nil { return pErr.GoError() } diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 29f15f76ba25..abc1bcfabde9 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1344,7 +1344,7 @@ func getRangeMetadata( // want to do a consistent read here. This is important when we are // considering one of the metadata ranges: we must not do an // inconsistent lookup in our own copy of the range. - sender := mtc.dbs[0].GetSender() + sender := mtc.dbs[0].NonTransactionalSender() rs, _, err := client.RangeLookup(context.TODO(), sender, key.AsRawKey(), roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */) if err != nil { diff --git a/pkg/storage/client_replica_test.go b/pkg/storage/client_replica_test.go index 947c4425f7ff..eb814baedd9c 100644 --- a/pkg/storage/client_replica_test.go +++ b/pkg/storage/client_replica_test.go @@ -1173,7 +1173,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) { Key: key, }, } - if _, pErr := client.SendWrappedWith(context.Background(), s.DB().GetSender(), + if _, pErr := client.SendWrappedWith(context.Background(), s.DB().NonTransactionalSender(), roachpb.Header{UserPriority: 42}, &getReq); pErr != nil { errChan <- pErr.GoError() @@ -1214,7 +1214,7 @@ func TestLeaseExtensionNotBlockedByRead(t *testing.T) { }, PrevLease: curLease, } - if _, pErr := client.SendWrapped(context.Background(), s.DB().GetSender(), &leaseReq); pErr != nil { + if _, pErr := client.SendWrapped(context.Background(), s.DB().NonTransactionalSender(), &leaseReq); pErr != nil { t.Fatal(pErr) } // Unblock the read. @@ -1234,7 +1234,7 @@ func LeaseInfo( Key: rangeDesc.StartKey.AsRawKey(), }, } - reply, pErr := client.SendWrappedWith(context.Background(), db.GetSender(), roachpb.Header{ + reply, pErr := client.SendWrappedWith(context.Background(), db.NonTransactionalSender(), roachpb.Header{ ReadConsistency: readConsistency, }, leaseInfoReq) if pErr != nil { @@ -1366,7 +1366,7 @@ func TestErrorHandlingForNonKVCommand(t *testing.T) { } _, pErr := client.SendWrappedWith( context.Background(), - s.DB().GetSender(), + s.DB().NonTransactionalSender(), roachpb.Header{UserPriority: 42}, &leaseReq, ) diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 2321275eda14..63c8bfbf2e8a 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -2709,13 +2709,13 @@ func TestRangeLookupAfterMeta2Split(t *testing.T) { // will require a scan that continues into the next meta2 range. const tableID = keys.MinUserDescID + 1 // 51 splitReq := adminSplitArgs(keys.MakeTablePrefix(tableID - 3 /* 48 */)) - if _, pErr := client.SendWrapped(ctx, s.DB().GetSender(), splitReq); pErr != nil { + if _, pErr := client.SendWrapped(ctx, s.DB().NonTransactionalSender(), splitReq); pErr != nil { t.Fatal(pErr) } metaKey := keys.RangeMetaKey(keys.MakeTablePrefix(tableID)).AsRawKey() splitReq = adminSplitArgs(metaKey) - if _, pErr := client.SendWrapped(ctx, s.DB().GetSender(), splitReq); pErr != nil { + if _, pErr := client.SendWrapped(ctx, s.DB().NonTransactionalSender(), splitReq); pErr != nil { t.Fatal(pErr) } @@ -2741,7 +2741,7 @@ func TestRangeLookupAfterMeta2Split(t *testing.T) { } else { lookupReq = &roachpb.ScanRequest{RequestHeader: header} } - if _, err := client.SendWrapped(ctx, s.DB().GetSender(), lookupReq); err != nil { + if _, err := client.SendWrapped(ctx, s.DB().NonTransactionalSender(), lookupReq); err != nil { t.Fatalf("%T %v", err.GoError(), err) } }) @@ -2947,7 +2947,7 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) { // Clear the range descriptor cache so that any future requests will first // need to perform a RangeLookup. - store.DB().GetFactory().WrappedSender().(*kv.DistSender).RangeDescriptorCache().Clear() + store.DB().NonTransactionalSender().(*client.CrossRangeTxnWrapperSender).Wrapped().(*kv.DistSender).RangeDescriptorCache().Clear() // Now send a request, forcing the RangeLookup. Since the lookup is // inconsistent, there's no WriteIntentError, but we'll try to resolve any diff --git a/pkg/storage/replica_gc_queue.go b/pkg/storage/replica_gc_queue.go index feef83080fed..16657363daca 100644 --- a/pkg/storage/replica_gc_queue.go +++ b/pkg/storage/replica_gc_queue.go @@ -196,7 +196,7 @@ func (rgcq *replicaGCQueue) process( // want to do a consistent read here. This is important when we are // considering one of the metadata ranges: we must not do an inconsistent // lookup in our own copy of the range. - rs, _, err := client.RangeLookup(ctx, rgcq.db.GetSender(), desc.StartKey.AsRawKey(), + rs, _, err := client.RangeLookup(ctx, rgcq.db.NonTransactionalSender(), desc.StartKey.AsRawKey(), roachpb.CONSISTENT, 0 /* prefetchNum */, false /* reverse */) if err != nil { return err diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 585eb5ea76f7..1d2352721e45 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -196,7 +196,7 @@ func (tc *testContext) StartWithStoreConfig(t testing.TB, stopper *stop.Stopper, t.Fatal(err) } // Now that we have our actual store, monkey patch the factory used in cfg.DB. - factory.store = tc.store + factory.setStore(tc.store) // We created the store without a real KV client, so it can't perform splits. tc.store.splitQueue.SetDisabled(true) @@ -9060,7 +9060,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { ba.Header.Txn = txn ba.Add(&btArgs) assignSeqNumsForReqs(txn, &btArgs) - if _, pErr := s.DB().GetFactory().WrappedSender().Send(context.TODO(), ba); pErr != nil { + if _, pErr := s.DB().GetFactory().NonTransactionalSender().Send(context.TODO(), ba); pErr != nil { t.Fatal(pErr.GoError()) } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 8f90b4fb8c8a..bcc2aa040fdd 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -74,15 +74,30 @@ func (s *Store) TestSender() client.Sender { // testSenderFactory is an implementation of the // client.TxnSenderFactory interface. type testSenderFactory struct { - store *Store + store *Store + nonTxnSender *testSender } -func (f *testSenderFactory) New(typ client.TxnType, _ *roachpb.Transaction) client.TxnSender { +func (f *testSenderFactory) TransactionalSender( + typ client.TxnType, _ *roachpb.Transaction, +) client.TxnSender { return &testSender{store: f.store} } -func (f *testSenderFactory) WrappedSender() client.Sender { - return f.store +func (f *testSenderFactory) NonTransactionalSender() client.Sender { + if f.nonTxnSender != nil { + return f.nonTxnSender + } + f.nonTxnSender = &testSender{store: f.store} + return f.nonTxnSender +} + +func (f *testSenderFactory) setStore(s *Store) { + f.store = s + if f.nonTxnSender != nil { + // monkey-patch an already created Sender, helping with test bootstrapping. + f.nonTxnSender.store = s + } } // testSender is an implementation of the client.TxnSender interface @@ -165,7 +180,7 @@ func createTestStoreWithoutStart(t testing.TB, stopper *stop.Stopper, cfg *Store factory := &testSenderFactory{} cfg.DB = client.NewDB(cfg.AmbientCtx, factory, cfg.Clock) store := NewStore(*cfg, eng, &roachpb.NodeDescriptor{NodeID: 1}) - factory.store = store + factory.setStore(store) if err := store.Bootstrap( context.TODO(), roachpb.StoreIdent{NodeID: 1, StoreID: 1}, cfg.Settings.Version.BootstrapVersion(), ); err != nil {