Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: clarify the Sender contract and pre-allocate commit batches #30485

Merged
merged 3 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,34 @@ type Sender interface {
// Send sends a batch for evaluation.
// The contract about whether both a response and an error can be returned
// varies between layers.
// The ownership of the pointers inside the batch (notably the Txn and the
// requests) is unusual, and the interface is leaky; the idea is that we don't
// clone batches before passing them to a transport, and the server on the
// other side of the transport might be local (and so the local server gets a
// shallow copy of the batch, like all the order Senders). Server-side modules
// are allowed to hold on to parts of the request and read them async (e.g. we
// might put a request in the timestamp cache). This all means that, once the
// batch reaches the transport boundary, all its deep fields are immutable -
// neither the server side nor the client side can change anything anymore
// (they must clone whatever they want to change). This is enforced in race
// tests by the "race transport" (transport_race.go).
//
// The caller retains ownership of all the memory referenced by the
// BatchRequest; the callee is not allowed to hold on to any parts of it past
// after it returns from the call (this is so that the client module can
// allocate requests from a pool and reuse them). For example, the DistSender
// makes sure that, if there are concurrent requests, it waits for all of them
// before returning, even in error cases.
//
// Once the request reaches the `transport` module, anothern restriction
// applies (particularly relevant for the case when the node that the
// transport is talking to is local, and so there's not gRPC
// marshaling/unmarshaling):
// - the callee has to treat everything inside the BatchRequest as
// read-only. This is so that the client module retains the right to pass
// pointers into its internals, like for example the Transaction. This
// wouldn't work if the server would be allowed to change the Transaction
// willy-nilly.
// TODO(andrei): The client does not currently use this last guarantee; it
// clones the txn for every request. Given that a client.Txn can be used
// concurrently, in order for the client to take advantage of this, it would
// need to switch to a copy-on-write scheme so that its updates to the txn do
// not race with the server reading it. We should do this to avoid the cloning
// allocations. And to be frank, it'd be a good idea for the
// BatchRequest/Response to generally stop carrying transactions; the requests
// usually only need a txn id and some timestamp. The responses would ideally
// contain a list of targeted instructions about what the client should
// update, as opposed to a full txn that the client is expected to diff with
// its copy and apply all the updates.
Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}

Expand Down
36 changes: 34 additions & 2 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ type Txn struct {
// typ indicates the type of transaction.
typ TxnType

// alloc holds pre-allocated fields that can be used to avoid heap allocations
// for batches with lifetimes tied to a Txn.
// TODO(andrei): A lot more things should be pre-allocated or otherwise pooled
// - in particular the roachpb.Transaction proto and the TxnCoordSender which
// are quite large. Pooling them would also force us to clean up their
// lifetimes, which is a good idea on its own.
alloc struct {
requests [1]roachpb.RequestUnion
etUnion roachpb.RequestUnion_EndTransaction
et roachpb.EndTransactionRequest
}

// gatewayNodeID, if != 0, is the ID of the node on whose behalf this
// transaction is running. Normally this is the current node, but in the case
// of Txns created on remote nodes by DistSQL this will be the gateway.
Expand Down Expand Up @@ -509,8 +521,7 @@ func (txn *Txn) Run(ctx context.Context, b *Batch) error {
}

func (txn *Txn) commit(ctx context.Context) error {
var ba roachpb.BatchRequest
ba.Add(endTxnReq(true /* commit */, txn.deadline(), txn.systemConfigTrigger))
ba := txn.getCommitReq(txn.deadline(), txn.systemConfigTrigger)
_, pErr := txn.Send(ctx, ba)
if pErr == nil {
for _, t := range txn.commitTriggers {
Expand All @@ -534,6 +545,27 @@ func (txn *Txn) CleanupOnError(ctx context.Context, err error) {
}
}

func (txn *Txn) getCommitReq(deadline *hlc.Timestamp, hasTrigger bool) roachpb.BatchRequest {
ba := roachpb.BatchRequest{
Requests: txn.alloc.requests[:1],
}
etReq := &txn.alloc.et
etUnion := &txn.alloc.etUnion
ba.Requests[0].Value = etUnion
etUnion.EndTransaction = etReq
etReq.Reset()
etReq.Commit = true
etReq.Deadline = deadline
if hasTrigger {
etReq.InternalCommitTrigger = &roachpb.InternalCommitTrigger{
ModifiedSpanTrigger: &roachpb.ModifiedSpanTrigger{
SystemConfigSpan: true,
},
}
}
return ba
}

// Commit is the same as CommitOrCleanup but will not attempt to clean
// up on failure. This can be used when the caller is prepared to do proper
// cleanup.
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ func (ds *DistSender) divideAndSendBatchToRanges(
panic(r)
}
var hadSuccessWriting bool
// Combine all the responses.
// It's important that we wait for all of them even if an error is caught
// because the client.Sender() contract mandates that we don't "hold on" to
// any part of a request after DistSender.Send() returns.
for _, responseCh := range responseChs {
resp := <-responseCh
if resp.pErr != nil {
Expand All @@ -806,9 +810,10 @@ func (ds *DistSender) divideAndSendBatchToRanges(

// Combine the new response with the existing one (including updating
// the headers).
if err := br.Combine(resp.reply, resp.positions); err != nil {
pErr = roachpb.NewError(err)
return
if pErr == nil {
if err := br.Combine(resp.reply, resp.positions); err != nil {
pErr = roachpb.NewError(err)
}
}
}

Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/transport_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"io/ioutil"
"math/rand"
"reflect"
"sync/atomic"
"time"

Expand Down Expand Up @@ -59,6 +60,18 @@ type raceTransport struct {
func (tr raceTransport) SendNext(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
// Make a copy of the requests slice, and shallow copies of the requests.
// The caller is allowed to mutate the request after the call returns. Since
// this transport has no way of checking who's doing mutations (the client -
// which is allowed, or the server - which is not). So, for now, we exclude
// the slice and the requests from any checks, since those are the parts that
// the client currently mutates.
requestsCopy := make([]roachpb.RequestUnion, len(ba.Requests))
for i, ru := range ba.Requests {
// ru is a RequestUnion interface, so we need some hoops to dereference it.
requestsCopy[i] = reflect.Indirect(reflect.ValueOf(ru)).Interface().(roachpb.RequestUnion)
}
ba.Requests = requestsCopy
select {
// We have a shallow copy here and so the top level scalar fields can't
// really race, but making more copies doesn't make anything more
Expand All @@ -74,8 +87,17 @@ func (tr raceTransport) SendNext(
// intercepts all BatchRequests, reading them asynchronously in a tight loop.
// This allows the race detector to catch any mutations of a batch passed to the
// transport. The dealio is that batches passed to the transport are immutable -
// neither the client nor the server are allowed to mutate anything and this
// transport makes sure they don't. See client.Sender() for more.
// the server is not allowed to mutate anything and this transport makes sure
// they don't. See client.Sender() for more.
//
// NOTE(andrei): We don't like this transport very much. It's slow, preventing
// us from running clusters with race binaries and, the way it's written, it
// prevents both the client and the server from mutating the BatchRequest. But
// only the server is prohibited (according to the client.Sender interface). In
// fact, we'd like to have the client reuse these requests and mutate them.
// Instead of this transport, we should find other mechanisms ensuring that:
// a) the server doesn't hold on to any memory, and
// b) the server doesn't mutate the request
func GRPCTransportFactory(
opts SendOptions, nodeDialer *nodedialer.Dialer, replicas ReplicaSlice,
) (Transport, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error {
rh.Txn = nil
}
if rh.ResumeSpan != nil {
panic(fmt.Sprintf("combining %+v with %+v", rh.ResumeSpan, otherRH.ResumeSpan))
return errors.Errorf("combining %+v with %+v", rh.ResumeSpan, otherRH.ResumeSpan)
}
rh.ResumeSpan = otherRH.ResumeSpan
rh.ResumeReason = otherRH.ResumeReason
Expand Down
8 changes: 5 additions & 3 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,13 @@ func actualSpan(req Request, resp Response) (Span, bool) {
return h.Span(), true
}

// Combine implements the Combinable interface. It combines each slot of the
// given request into the corresponding slot of the base response. The number
// of slots must be equal and the respective slots must be combinable.
// Combine combines each slot of the given request into the corresponding slot
// of the base response. The number of slots must be equal and the respective
// slots must be combinable.
// On error, the receiver BatchResponse is in an invalid state. In either case,
// the supplied BatchResponse must not be used any more.
// It is an error to call Combine on responses with errors in them. The
// DistSender strips the errors from any responses that it combines.
func (br *BatchResponse) Combine(otherBatch *BatchResponse, positions []int) error {
if err := br.BatchResponse_Header.combine(otherBatch.BatchResponse_Header); err != nil {
return err
Expand Down