Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv, client: don't send non-txn requests through the TxnCoordSender anymore #26741

Merged
merged 1 commit into from
Jun 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func backup(
StartTime: span.start,
MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter),
}
rawRes, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req)
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return pErr.GoError()
}
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func getAllRevisions(
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}
resp, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req)
resp, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return nil, pErr.GoError()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(chunkSpan),
}
if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -844,7 +844,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(newSpan),
}
if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func restore(
defer tracing.FinishSpan(importSpan)
defer func() { <-importsSem }()

importRes, pErr := client.SendWrapped(ctx, db.GetSender(), importRequest)
importRes, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
if pErr != nil {
return pErr.GoError()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func runChangefeedFlow(
func exportRequestPoll(
execCfg *sql.ExecutorConfig, details jobspb.ChangefeedDetails, progress jobspb.ChangefeedProgress,
) func(context.Context) (changedKVs, error) {
sender := execCfg.DB.GetSender()
sender := execCfg.DB.NonTransactionalSender()
var spans []roachpb.Span
for _, tableDesc := range details.TableDescs {
spans = append(spans, tableDesc.PrimaryIndexSpan())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (sp *sstWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(sst.span),
}
if _, pErr := client.SendWrapped(ctx, sp.db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, sp.db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the IMPORT when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func BenchmarkImport(b *testing.B) {
Files: files,
Rekeys: rekeys,
}
res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if pErr != nil {
b.Fatalf("%+v", pErr.GoError())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestExportCmd(t *testing.T) {
MVCCFilter: mvccFilter,
ReturnSST: true,
}
res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if pErr != nil {
t.Fatalf("%+v", pErr)
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestExportGCThreshold(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey},
StartTime: hlc.Timestamp{WallTime: -1},
}
_, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
_, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if !testutils.IsPError(pErr, "must be after replica GC threshold") {
t.Fatalf(`expected "must be after replica GC threshold" error got: %+v`, pErr)
}
Expand Down
18 changes: 8 additions & 10 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,16 +798,14 @@ func TestReadConsistencyTypes(t *testing.T) {
t.Run(rc.String(), func(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
)
})
factory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
Expand Down
85 changes: 68 additions & 17 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,58 @@ func DefaultDBContext() DBContext {
}
}

// CrossRangeTxnWrapperSender is a Sender whose purpose is to wrap
// non-transactional requests that span ranges into a transaction so they can
// execute atomically.
//
// TODO(andrei, bdarnell): This is a wart. Our semantics are that batches are
// atomic, but there's only historical reason for that. We should disallow
// non-transactional batches and scans, forcing people to use transactions
// instead. And then this Sender can go away.
type CrossRangeTxnWrapperSender struct {
db *DB
wrapped Sender
}

var _ Sender = &CrossRangeTxnWrapperSender{}

// Send implements the Sender interface.
func (s *CrossRangeTxnWrapperSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn != nil {
log.Fatalf(ctx, "CrossRangeTxnWrapperSender can't handle transactional requests")
}

br, pErr := s.wrapped.Send(ctx, ba)
if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); !ok {
return br, pErr
}

err := s.db.Txn(ctx, func(ctx context.Context, txn *Txn) error {
txn.SetDebugName("auto-wrap")
b := txn.NewBatch()
b.Header = ba.Header
for _, arg := range ba.Requests {
req := arg.GetInner().ShallowCopy()
b.AddRawRequest(req)
}
err := txn.CommitInBatch(ctx, b)
br = b.RawResponse()
return err
})
if err != nil {
return nil, roachpb.NewError(err)
}
br.Txn = nil // hide the evidence
return br, nil
}

// Wrapped returns the wrapped sender.
func (s *CrossRangeTxnWrapperSender) Wrapped() Sender {
return s.wrapped
}

// DB is a database handle to a single cockroach cluster. A DB is safe for
// concurrent use by multiple goroutines.
type DB struct {
Expand All @@ -195,24 +247,18 @@ type DB struct {
factory TxnSenderFactory
clock *hlc.Clock
ctx DBContext
// crs is the sender used for non-transactional requests.
crs CrossRangeTxnWrapperSender
}

// GetSender returns a Sender that can be used to send requests through.
// Note that a new Sender created; it is not shared.
// NonTransactionalSender returns a Sender that can be used for sending
// non-transactional requests. The Sender is capable of transparently wrapping
// non-transactional requests that span ranges in transactions.
//
// The Sender returned should not be used for sending transactional requests.
// Use db.Txn() or db.NewTxn() for that.
func (db *DB) GetSender() Sender {
// We pass nil for the txn here because we don't have a txn on hand.
// That's why this method says to not use the Sender for transactional
// requests, plus the fact that if a Sender is used directly, the caller needs
// to be mindful of the need to start a heartbeat loop when writing.
//
// Note that even non-transactional requests need to go through a
// TxnCoordSender because batches that get split need to be wrapped in
// transactions (and the TxnCoordSender handles that). So we can't simply
// return the wrapped handler here.
return db.factory.New(RootTxn, nil /* txn */)
// The Sender returned should not be used for sending transactional requests -
// it bypasses the TxnCoordSender. Use db.Txn() or db.NewTxn() for transactions.
func (db *DB) NonTransactionalSender() Sender {
return &db.crs
}

// GetFactory returns the DB's TxnSenderFactory.
Expand All @@ -232,12 +278,17 @@ func NewDBWithContext(
if actx.Tracer == nil {
panic("no tracer set in AmbientCtx")
}
return &DB{
db := &DB{
AmbientContext: actx,
factory: factory,
clock: clock,
ctx: ctx,
crs: CrossRangeTxnWrapperSender{
wrapped: factory.NonTransactionalSender(),
},
}
db.crs.db = db
return db
}

// Get retrieves the value for a key, returning the retrieved key/value or an
Expand Down Expand Up @@ -545,7 +596,7 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
func (db *DB) send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return db.sendUsingSender(ctx, ba, db.GetSender())
return db.sendUsingSender(ctx, ba, db.NonTransactionalSender())
}

// sendUsingSender uses the specified sender to send the batch request.
Expand Down
42 changes: 31 additions & 11 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ type TxnSender interface {
// TxnSenderFactory is the interface used to create new instances
// of TxnSender.
type TxnSenderFactory interface {
// New returns a new instance of TxnSender.
// TransactionalSender returns a sender to be used for transactional requests.
// typ specifies whether the sender is the root or one of potentially many
// child "leaf" nodes in a tree of transaction objects, as is created during a
// DistSQL flow.
// txn is the transaction whose requests this sender will carry. It can be nil
// if the sender will not be used for transactional requests.
New(typ TxnType, txn *roachpb.Transaction) TxnSender
// WrappedSender returns the TxnSenderFactory's wrapped Sender.
WrappedSender() Sender
// txn is the transaction whose requests this sender will carry.
TransactionalSender(typ TxnType, txn *roachpb.Transaction) TxnSender
// NonTransactionalSender returns a sender to be used for non-transactional
// requests. Generally this is a sender that TransactionalSender() wraps.
NonTransactionalSender() Sender
}

// SenderFunc is an adapter to allow the use of ordinary functions
Expand Down Expand Up @@ -132,14 +132,34 @@ func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender

// New calls f().
func (f TxnSenderFactoryFunc) New(typ TxnType, _ *roachpb.Transaction) TxnSender {
var _ TxnSenderFactory = TxnSenderFactoryFunc(nil)

// TransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) TransactionalSender(typ TxnType, _ *roachpb.Transaction) TxnSender {
return f(typ)
}

// WrappedSender is not implemented for TxnSenderFactoryFunc.
func (f TxnSenderFactoryFunc) WrappedSender() Sender {
panic("unimplemented")
// NonTransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) NonTransactionalSender() Sender {
return nil
}

// NonTransactionalFactoryFunc is a TxnSenderFactory that cannot, in fact,
// create any transactional senders, only non-transactional ones.
type NonTransactionalFactoryFunc SenderFunc

var _ TxnSenderFactory = NonTransactionalFactoryFunc(nil)

// TransactionalSender is part of the TxnSenderFactory.
func (f NonTransactionalFactoryFunc) TransactionalSender(
typ TxnType, _ *roachpb.Transaction,
) TxnSender {
panic("not supported ")
}

// NonTransactionalSender is part of the TxnSenderFactory.
func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender {
return SenderFunc(f)
}

// SendWrappedWith is a convenience function which wraps the request in a batch
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewTxnWithProto(
proto.AssertInitialized(context.TODO())
txn := &Txn{db: db, typ: typ, gatewayNodeID: gatewayNodeID}
txn.mu.Proto = proto
txn.mu.sender = db.factory.New(typ, &proto)
txn.mu.sender = db.factory.TransactionalSender(typ, &proto)
return txn
}

Expand Down Expand Up @@ -1258,7 +1258,7 @@ func (txn *Txn) updateStateOnRetryableErrLocked(
txn.mu.state = txnReadOnly

// Create a new txn sender.
txn.mu.sender = txn.db.factory.New(txn.typ, newTxn)
txn.mu.sender = txn.db.factory.TransactionalSender(txn.typ, newTxn)
} else {
// Update the transaction proto with the one to be used for the next
// attempt. The txn inside pErr was correctly prepared for this by
Expand Down
Loading