Skip to content

Commit

Permalink
Merge #26741 #26856
Browse files Browse the repository at this point in the history
26741: kv, client: don't send non-txn requests through the TxnCoordSender anymore r=andreimatei a=andreimatei

We were sending them through the TCS because the TCS was in charge of
wrapping them in a Txn and retrying if the batch spanned requests (cause
batches need to be atomic and you can only get that cross-range in
txns).
But that's nasty. The TCS is littered with checks about whether a
request is transactional or not, and the code to do the wrapped retry
did not belong there anyway.
This patch moves the wrapping/retry in a new Sender under the client.DB.
Now non-txn requests go through that and then straight to the
DistSender.

Release note: None

26856: distsql: change default disk monitor increment to 1MiB r=asubiotto a=asubiotto

The previous increment was 64MiB. This was unnecessarily large and
provided too high a granularity for stat reporting.

Closes #26793

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Alfonso Subiotto Marqués <[email protected]>
  • Loading branch information
3 people committed Jun 20, 2018
3 parents dc59fc8 + 12eec9a + ec3ce78 commit d72e996
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 272 deletions.
2 changes: 1 addition & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TempStorageConfigFromEnv(
mon.DiskResource,
nil, /* curCount */
nil, /* maxHist */
64*1024*1024, /* increment */
1024*1024, /* increment */
maxSizeBytes/10, /* noteworthy */
st,
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func backup(
StartTime: span.start,
MVCCFilter: roachpb.MVCCFilter(backupDesc.MVCCFilter),
}
rawRes, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req)
rawRes, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return pErr.GoError()
}
Expand Down Expand Up @@ -1198,7 +1198,7 @@ func getAllRevisions(
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}
resp, pErr := client.SendWrappedWith(ctx, db.GetSender(), header, req)
resp, pErr := client.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req)
if pErr != nil {
return nil, pErr.GoError()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(chunkSpan),
}
if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -844,7 +844,7 @@ func splitAndScatter(
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(newSpan),
}
if _, pErr := client.SendWrapped(ctx, db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the RESTORE when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func restore(
defer tracing.FinishSpan(importSpan)
defer func() { <-importsSem }()

importRes, pErr := client.SendWrapped(ctx, db.GetSender(), importRequest)
importRes, pErr := client.SendWrapped(ctx, db.NonTransactionalSender(), importRequest)
if pErr != nil {
return pErr.GoError()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func runChangefeedFlow(
func exportRequestPoll(
execCfg *sql.ExecutorConfig, details jobspb.ChangefeedDetails, progress jobspb.ChangefeedProgress,
) func(context.Context) (changedKVs, error) {
sender := execCfg.DB.GetSender()
sender := execCfg.DB.NonTransactionalSender()
var spans []roachpb.Span
for _, tableDesc := range details.TableDescs {
spans = append(spans, tableDesc.PrimaryIndexSpan())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (sp *sstWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
scatterReq := &roachpb.AdminScatterRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(sst.span),
}
if _, pErr := client.SendWrapped(ctx, sp.db.GetSender(), scatterReq); pErr != nil {
if _, pErr := client.SendWrapped(ctx, sp.db.NonTransactionalSender(), scatterReq); pErr != nil {
// TODO(dan): Unfortunately, Scatter is still too unreliable to
// fail the IMPORT when Scatter fails. I'm uncomfortable that
// this could break entirely and not start failing the tests,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func BenchmarkImport(b *testing.B) {
Files: files,
Rekeys: rekeys,
}
res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if pErr != nil {
b.Fatalf("%+v", pErr.GoError())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestExportCmd(t *testing.T) {
MVCCFilter: mvccFilter,
ReturnSST: true,
}
res, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
res, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if pErr != nil {
t.Fatalf("%+v", pErr)
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestExportGCThreshold(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: keys.UserTableDataMin, EndKey: keys.MaxKey},
StartTime: hlc.Timestamp{WallTime: -1},
}
_, pErr := client.SendWrapped(ctx, kvDB.GetSender(), req)
_, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), req)
if !testutils.IsPError(pErr, "must be after replica GC threshold") {
t.Fatalf(`expected "must be after replica GC threshold" error got: %+v`, pErr)
}
Expand Down
18 changes: 8 additions & 10 deletions pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,16 +798,14 @@ func TestReadConsistencyTypes(t *testing.T) {
t.Run(rc.String(), func(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := client.TxnSenderFactoryFunc(func(client.TxnType) client.TxnSender {
return client.TxnSenderFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
},
)
})
factory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
}
return ba.CreateReply(), nil
})

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
Expand Down
85 changes: 68 additions & 17 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,58 @@ func DefaultDBContext() DBContext {
}
}

// CrossRangeTxnWrapperSender is a Sender whose purpose is to wrap
// non-transactional requests that span ranges into a transaction so they can
// execute atomically.
//
// TODO(andrei, bdarnell): This is a wart. Our semantics are that batches are
// atomic, but there's only historical reason for that. We should disallow
// non-transactional batches and scans, forcing people to use transactions
// instead. And then this Sender can go away.
type CrossRangeTxnWrapperSender struct {
db *DB
wrapped Sender
}

var _ Sender = &CrossRangeTxnWrapperSender{}

// Send implements the Sender interface.
func (s *CrossRangeTxnWrapperSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn != nil {
log.Fatalf(ctx, "CrossRangeTxnWrapperSender can't handle transactional requests")
}

br, pErr := s.wrapped.Send(ctx, ba)
if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); !ok {
return br, pErr
}

err := s.db.Txn(ctx, func(ctx context.Context, txn *Txn) error {
txn.SetDebugName("auto-wrap")
b := txn.NewBatch()
b.Header = ba.Header
for _, arg := range ba.Requests {
req := arg.GetInner().ShallowCopy()
b.AddRawRequest(req)
}
err := txn.CommitInBatch(ctx, b)
br = b.RawResponse()
return err
})
if err != nil {
return nil, roachpb.NewError(err)
}
br.Txn = nil // hide the evidence
return br, nil
}

// Wrapped returns the wrapped sender.
func (s *CrossRangeTxnWrapperSender) Wrapped() Sender {
return s.wrapped
}

// DB is a database handle to a single cockroach cluster. A DB is safe for
// concurrent use by multiple goroutines.
type DB struct {
Expand All @@ -195,24 +247,18 @@ type DB struct {
factory TxnSenderFactory
clock *hlc.Clock
ctx DBContext
// crs is the sender used for non-transactional requests.
crs CrossRangeTxnWrapperSender
}

// GetSender returns a Sender that can be used to send requests through.
// Note that a new Sender created; it is not shared.
// NonTransactionalSender returns a Sender that can be used for sending
// non-transactional requests. The Sender is capable of transparently wrapping
// non-transactional requests that span ranges in transactions.
//
// The Sender returned should not be used for sending transactional requests.
// Use db.Txn() or db.NewTxn() for that.
func (db *DB) GetSender() Sender {
// We pass nil for the txn here because we don't have a txn on hand.
// That's why this method says to not use the Sender for transactional
// requests, plus the fact that if a Sender is used directly, the caller needs
// to be mindful of the need to start a heartbeat loop when writing.
//
// Note that even non-transactional requests need to go through a
// TxnCoordSender because batches that get split need to be wrapped in
// transactions (and the TxnCoordSender handles that). So we can't simply
// return the wrapped handler here.
return db.factory.New(RootTxn, nil /* txn */)
// The Sender returned should not be used for sending transactional requests -
// it bypasses the TxnCoordSender. Use db.Txn() or db.NewTxn() for transactions.
func (db *DB) NonTransactionalSender() Sender {
return &db.crs
}

// GetFactory returns the DB's TxnSenderFactory.
Expand All @@ -232,12 +278,17 @@ func NewDBWithContext(
if actx.Tracer == nil {
panic("no tracer set in AmbientCtx")
}
return &DB{
db := &DB{
AmbientContext: actx,
factory: factory,
clock: clock,
ctx: ctx,
crs: CrossRangeTxnWrapperSender{
wrapped: factory.NonTransactionalSender(),
},
}
db.crs.db = db
return db
}

// Get retrieves the value for a key, returning the retrieved key/value or an
Expand Down Expand Up @@ -545,7 +596,7 @@ func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) err
func (db *DB) send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return db.sendUsingSender(ctx, ba, db.GetSender())
return db.sendUsingSender(ctx, ba, db.NonTransactionalSender())
}

// sendUsingSender uses the specified sender to send the batch request.
Expand Down
42 changes: 31 additions & 11 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ type TxnSender interface {
// TxnSenderFactory is the interface used to create new instances
// of TxnSender.
type TxnSenderFactory interface {
// New returns a new instance of TxnSender.
// TransactionalSender returns a sender to be used for transactional requests.
// typ specifies whether the sender is the root or one of potentially many
// child "leaf" nodes in a tree of transaction objects, as is created during a
// DistSQL flow.
// txn is the transaction whose requests this sender will carry. It can be nil
// if the sender will not be used for transactional requests.
New(typ TxnType, txn *roachpb.Transaction) TxnSender
// WrappedSender returns the TxnSenderFactory's wrapped Sender.
WrappedSender() Sender
// txn is the transaction whose requests this sender will carry.
TransactionalSender(typ TxnType, txn *roachpb.Transaction) TxnSender
// NonTransactionalSender returns a sender to be used for non-transactional
// requests. Generally this is a sender that TransactionalSender() wraps.
NonTransactionalSender() Sender
}

// SenderFunc is an adapter to allow the use of ordinary functions
Expand Down Expand Up @@ -132,14 +132,34 @@ func (f TxnSenderFunc) OnFinish(_ func(error)) { panic("unimplemented") }
// as TxnSenderFactories. This is a helper mechanism to facilitate testing.
type TxnSenderFactoryFunc func(TxnType) TxnSender

// New calls f().
func (f TxnSenderFactoryFunc) New(typ TxnType, _ *roachpb.Transaction) TxnSender {
var _ TxnSenderFactory = TxnSenderFactoryFunc(nil)

// TransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) TransactionalSender(typ TxnType, _ *roachpb.Transaction) TxnSender {
return f(typ)
}

// WrappedSender is not implemented for TxnSenderFactoryFunc.
func (f TxnSenderFactoryFunc) WrappedSender() Sender {
panic("unimplemented")
// NonTransactionalSender is part of TxnSenderFactory.
func (f TxnSenderFactoryFunc) NonTransactionalSender() Sender {
return nil
}

// NonTransactionalFactoryFunc is a TxnSenderFactory that cannot, in fact,
// create any transactional senders, only non-transactional ones.
type NonTransactionalFactoryFunc SenderFunc

var _ TxnSenderFactory = NonTransactionalFactoryFunc(nil)

// TransactionalSender is part of the TxnSenderFactory.
func (f NonTransactionalFactoryFunc) TransactionalSender(
typ TxnType, _ *roachpb.Transaction,
) TxnSender {
panic("not supported ")
}

// NonTransactionalSender is part of the TxnSenderFactory.
func (f NonTransactionalFactoryFunc) NonTransactionalSender() Sender {
return SenderFunc(f)
}

// SendWrappedWith is a convenience function which wraps the request in a batch
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewTxnWithProto(
proto.AssertInitialized(context.TODO())
txn := &Txn{db: db, typ: typ, gatewayNodeID: gatewayNodeID}
txn.mu.Proto = proto
txn.mu.sender = db.factory.New(typ, &proto)
txn.mu.sender = db.factory.TransactionalSender(typ, &proto)
return txn
}

Expand Down Expand Up @@ -1258,7 +1258,7 @@ func (txn *Txn) updateStateOnRetryableErrLocked(
txn.mu.state = txnReadOnly

// Create a new txn sender.
txn.mu.sender = txn.db.factory.New(txn.typ, newTxn)
txn.mu.sender = txn.db.factory.TransactionalSender(txn.typ, newTxn)
} else {
// Update the transaction proto with the one to be used for the next
// attempt. The txn inside pErr was correctly prepared for this by
Expand Down
Loading

0 comments on commit d72e996

Please sign in to comment.