diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index 31ea4492d5ef..83bdecc2856a 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -61,17 +61,34 @@ type Sender interface { // Send sends a batch for evaluation. // The contract about whether both a response and an error can be returned // varies between layers. - // The ownership of the pointers inside the batch (notably the Txn and the - // requests) is unusual, and the interface is leaky; the idea is that we don't - // clone batches before passing them to a transport, and the server on the - // other side of the transport might be local (and so the local server gets a - // shallow copy of the batch, like all the order Senders). Server-side modules - // are allowed to hold on to parts of the request and read them async (e.g. we - // might put a request in the timestamp cache). This all means that, once the - // batch reaches the transport boundary, all its deep fields are immutable - - // neither the server side nor the client side can change anything anymore - // (they must clone whatever they want to change). This is enforced in race - // tests by the "race transport" (transport_race.go). + // + // The caller retains ownership of all the memory referenced by the + // BatchRequest; the callee is not allowed to hold on to any parts of it past + // after it returns from the call (this is so that the client module can + // allocate requests from a pool and reuse them). For example, the DistSender + // makes sure that, if there are concurrent requests, it waits for all of them + // before returning, even in error cases. + // + // Once the request reaches the `transport` module, anothern restriction + // applies (particularly relevant for the case when the node that the + // transport is talking to is local, and so there's not gRPC + // marshaling/unmarshaling): + // - the callee has to treat everything inside the BatchRequest as + // read-only. This is so that the client module retains the right to pass + // pointers into its internals, like for example the Transaction. This + // wouldn't work if the server would be allowed to change the Transaction + // willy-nilly. + // TODO(andrei): The client does not currently use this last guarantee; it + // clones the txn for every request. Given that a client.Txn can be used + // concurrently, in order for the client to take advantage of this, it would + // need to switch to a copy-on-write scheme so that its updates to the txn do + // not race with the server reading it. We should do this to avoid the cloning + // allocations. And to be frank, it'd be a good idea for the + // BatchRequest/Response to generally stop carrying transactions; the requests + // usually only need a txn id and some timestamp. The responses would ideally + // contain a list of targeted instructions about what the client should + // update, as opposed to a full txn that the client is expected to diff with + // its copy and apply all the updates. Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) } diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index e1a3f1860023..06d0f9900dd4 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -38,6 +38,18 @@ type Txn struct { // typ indicates the type of transaction. typ TxnType + // alloc holds pre-allocated fields that can be used to avoid heap allocations + // for batches with lifetimes tied to a Txn. + // TODO(andrei): A lot more things should be pre-allocated or otherwise pooled + // - in particular the roachpb.Transaction proto and the TxnCoordSender which + // are quite large. Pooling them would also force us to clean up their + // lifetimes, which is a good idea on its own. + alloc struct { + requests [1]roachpb.RequestUnion + etUnion roachpb.RequestUnion_EndTransaction + et roachpb.EndTransactionRequest + } + // gatewayNodeID, if != 0, is the ID of the node on whose behalf this // transaction is running. Normally this is the current node, but in the case // of Txns created on remote nodes by DistSQL this will be the gateway. @@ -509,8 +521,7 @@ func (txn *Txn) Run(ctx context.Context, b *Batch) error { } func (txn *Txn) commit(ctx context.Context) error { - var ba roachpb.BatchRequest - ba.Add(endTxnReq(true /* commit */, txn.deadline(), txn.systemConfigTrigger)) + ba := txn.getCommitReq(txn.deadline(), txn.systemConfigTrigger) _, pErr := txn.Send(ctx, ba) if pErr == nil { for _, t := range txn.commitTriggers { @@ -534,6 +545,27 @@ func (txn *Txn) CleanupOnError(ctx context.Context, err error) { } } +func (txn *Txn) getCommitReq(deadline *hlc.Timestamp, hasTrigger bool) roachpb.BatchRequest { + ba := roachpb.BatchRequest{ + Requests: txn.alloc.requests[:1], + } + etReq := &txn.alloc.et + etUnion := &txn.alloc.etUnion + ba.Requests[0].Value = etUnion + etUnion.EndTransaction = etReq + etReq.Reset() + etReq.Commit = true + etReq.Deadline = deadline + if hasTrigger { + etReq.InternalCommitTrigger = &roachpb.InternalCommitTrigger{ + ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{ + SystemConfigSpan: true, + }, + } + } + return ba +} + // Commit is the same as CommitOrCleanup but will not attempt to clean // up on failure. This can be used when the caller is prepared to do proper // cleanup. diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 6d356a836137..0ccdf54f301f 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -786,6 +786,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( panic(r) } var hadSuccessWriting bool + // Combine all the responses. + // It's important that we wait for all of them even if an error is caught + // because the client.Sender() contract mandates that we don't "hold on" to + // any part of a request after DistSender.Send() returns. for _, responseCh := range responseChs { resp := <-responseCh if resp.pErr != nil { @@ -806,9 +810,10 @@ func (ds *DistSender) divideAndSendBatchToRanges( // Combine the new response with the existing one (including updating // the headers). - if err := br.Combine(resp.reply, resp.positions); err != nil { - pErr = roachpb.NewError(err) - return + if pErr == nil { + if err := br.Combine(resp.reply, resp.positions); err != nil { + pErr = roachpb.NewError(err) + } } } diff --git a/pkg/kv/transport_race.go b/pkg/kv/transport_race.go index b80a010f450e..8c2d00c523f2 100644 --- a/pkg/kv/transport_race.go +++ b/pkg/kv/transport_race.go @@ -22,6 +22,7 @@ import ( "encoding/json" "io/ioutil" "math/rand" + "reflect" "sync/atomic" "time" @@ -59,6 +60,18 @@ type raceTransport struct { func (tr raceTransport) SendNext( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { + // Make a copy of the requests slice, and shallow copies of the requests. + // The caller is allowed to mutate the request after the call returns. Since + // this transport has no way of checking who's doing mutations (the client - + // which is allowed, or the server - which is not). So, for now, we exclude + // the slice and the requests from any checks, since those are the parts that + // the client currently mutates. + requestsCopy := make([]roachpb.RequestUnion, len(ba.Requests)) + for i, ru := range ba.Requests { + // ru is a RequestUnion interface, so we need some hoops to dereference it. + requestsCopy[i] = reflect.Indirect(reflect.ValueOf(ru)).Interface().(roachpb.RequestUnion) + } + ba.Requests = requestsCopy select { // We have a shallow copy here and so the top level scalar fields can't // really race, but making more copies doesn't make anything more @@ -74,8 +87,17 @@ func (tr raceTransport) SendNext( // intercepts all BatchRequests, reading them asynchronously in a tight loop. // This allows the race detector to catch any mutations of a batch passed to the // transport. The dealio is that batches passed to the transport are immutable - -// neither the client nor the server are allowed to mutate anything and this -// transport makes sure they don't. See client.Sender() for more. +// the server is not allowed to mutate anything and this transport makes sure +// they don't. See client.Sender() for more. +// +// NOTE(andrei): We don't like this transport very much. It's slow, preventing +// us from running clusters with race binaries and, the way it's written, it +// prevents both the client and the server from mutating the BatchRequest. But +// only the server is prohibited (according to the client.Sender interface). In +// fact, we'd like to have the client reuse these requests and mutate them. +// Instead of this transport, we should find other mechanisms ensuring that: +// a) the server doesn't hold on to any memory, and +// b) the server doesn't mutate the request func GRPCTransportFactory( opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice, ) (Transport, error) { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index e7853a01e6c4..33254e3d25ee 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -241,7 +241,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error { rh.Txn = nil } if rh.ResumeSpan != nil { - panic(fmt.Sprintf("combining %+v with %+v", rh.ResumeSpan, otherRH.ResumeSpan)) + return errors.Errorf("combining %+v with %+v", rh.ResumeSpan, otherRH.ResumeSpan) } rh.ResumeSpan = otherRH.ResumeSpan rh.ResumeReason = otherRH.ResumeReason diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 0edd7e0485bb..7b45b43884ab 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -330,11 +330,13 @@ func actualSpan(req Request, resp Response) (Span, bool) { return h.Span(), true } -// Combine implements the Combinable interface. It combines each slot of the -// given request into the corresponding slot of the base response. The number -// of slots must be equal and the respective slots must be combinable. +// Combine combines each slot of the given request into the corresponding slot +// of the base response. The number of slots must be equal and the respective +// slots must be combinable. // On error, the receiver BatchResponse is in an invalid state. In either case, // the supplied BatchResponse must not be used any more. +// It is an error to call Combine on responses with errors in them. The +// DistSender strips the errors from any responses that it combines. func (br *BatchResponse) Combine(otherBatch *BatchResponse, positions []int) error { if err := br.BatchResponse_Header.combine(otherBatch.BatchResponse_Header); err != nil { return err