Skip to content

Commit

Permalink
Merge pull request #10207 from cockroachdb/spencerkimball/ambiguous-c…
Browse files Browse the repository at this point in the history
…ommit

Re-merge ambiguous commit error PR with locking fixes
  • Loading branch information
spencerkimball authored Oct 26, 2016
2 parents 5e14c66 + a477ff1 commit 5bc7bf1
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 136 deletions.
91 changes: 74 additions & 17 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"time"
"unsafe"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/pkg/errors"
"golang.org/x/net/context"

Expand Down Expand Up @@ -936,10 +939,11 @@ func fillSkippedResponses(
}
}

// sendToReplicas sends one or more RPCs to clients specified by the slice of
// replicas. On success, Send returns the first successful reply. Otherwise,
// Send returns an error if and as soon as the number of failed RPCs exceeds
// the available endpoints less the number of required replies.
// sendToReplicas sends one or more RPCs to clients specified by the
// slice of replicas. On success, Send returns the first successful
// reply. If an error occurs which is not specific to a single
// replica, it's returned immediately. Otherwise, when all replicas
// have been tried and failed, returns a send error.
func (ds *DistSender) sendToReplicas(
opts SendOptions,
rangeID roachpb.RangeID,
Expand All @@ -953,6 +957,14 @@ func (ds *DistSender) sendToReplicas(
len(replicas), 1))
}

var ambiguousCommit bool
var haveCommit bool
// We only check for committed txns, not aborts because aborts may
// be retried without any risk of inconsistencies.
if etArg, ok := args.GetArg(roachpb.EndTransaction); ok &&
etArg.(*roachpb.EndTransactionRequest).Commit {
haveCommit = true
}
done := make(chan BatchCall, len(replicas))

transportFactory := opts.transportFactory
Expand Down Expand Up @@ -1001,7 +1013,18 @@ func (ds *DistSender) sendToReplicas(
log.Infof(opts.ctx, "application error: %s", call.Reply.Error)
}

if !ds.handlePerReplicaError(rangeID, call.Reply.Error) {
if call.Reply.Error == nil {
return call.Reply, nil
} else if !ds.handlePerReplicaError(transport, rangeID, call.Reply.Error) {
// The error received is not specific to this replica, so we
// should return it instead of trying other replicas. However,
// if we're trying to commit a transaction and there are
// still other RPCs outstanding or an ambiguous RPC error
// was already received, we must return an ambiguous commit
// error instead of returned error.
if haveCommit && (pending > 0 || ambiguousCommit) {
return nil, roachpb.NewAmbiguousCommitError()
}
return call.Reply, nil
}

Expand All @@ -1013,8 +1036,34 @@ func (ds *DistSender) sendToReplicas(
// we've seen (for example, a NotLeaseHolderError conveys more
// information than a RangeNotFound).
err = call.Reply.Error.GoError()
} else if log.V(1) {
log.Warningf(opts.ctx, "RPC error: %s", err)
} else {
if log.V(1) {
log.Warningf(opts.ctx, "RPC error: %s", err)
}
// All connection errors except for an unavailable node (this
// is GRPC's fail-fast error), may mean that the request
// succeeded on the remote server, but we were unable to
// receive the reply. Set the ambiguous commit flag.
//
// We retry ambiguous commit batches to avoid returning the
// unrecoverable AmbiguousCommitError. This is safe because
// repeating an already-successfully applied batch is
// guaranteed to return either a TransactionReplayError (in
// case the replay happens at the original leader), or a
// TransactionRetryError (in case the replay happens at a new
// leader). If the original attempt merely timed out or was
// lost, then the batch will succeed and we can be assured the
// commit was applied just once.
//
// The Unavailable code is used by GRPC to indicate that a
// request fails fast and is not sent, so we can be sure there
// is no ambiguity on these errors. Note that these are common
// if a node is down.
// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/call.go#L182
// See https://github.com/grpc/grpc-go/blob/52f6504dc290bd928a8139ba94e3ab32ed9a6273/stream.go#L158
if haveCommit && grpc.Code(err) != codes.Unavailable {
ambiguousCommit = true
}
}

// Send to additional replicas if available.
Expand All @@ -1024,12 +1073,17 @@ func (ds *DistSender) sendToReplicas(
transport.SendNext(done)
}
if pending == 0 {
log.VEventf(opts.ctx, 2,
"sending to all %d replicas failed; last error: %s",
len(replicas), err)
return nil, roachpb.NewSendError(
fmt.Sprintf("sending to all %d replicas failed; last error: %v",
len(replicas), err))
if ambiguousCommit {
err = roachpb.NewAmbiguousCommitError()
} else {
err = roachpb.NewSendError(
fmt.Sprintf("sending to all %d replicas failed; last error: %v", len(replicas), err),
)
}
if log.V(2) {
log.ErrEvent(opts.ctx, err.Error())
}
return nil, err
}
}
}
Expand All @@ -1040,7 +1094,9 @@ func (ds *DistSender) sendToReplicas(
// replicas is likely to produce different results. This method should
// be called only once for each error as it may have side effects such
// as updating caches.
func (ds *DistSender) handlePerReplicaError(rangeID roachpb.RangeID, pErr *roachpb.Error) bool {
func (ds *DistSender) handlePerReplicaError(
transport Transport, rangeID roachpb.RangeID, pErr *roachpb.Error,
) bool {
switch tErr := pErr.GetDetail().(type) {
case *roachpb.RangeNotFoundError:
return true
Expand All @@ -1049,10 +1105,11 @@ func (ds *DistSender) handlePerReplicaError(rangeID roachpb.RangeID, pErr *roach
case *roachpb.NotLeaseHolderError:
if tErr.LeaseHolder != nil {
// If the replica we contacted knows the new lease holder, update the cache.
ds.updateLeaseHolderCache(rangeID, *tErr.LeaseHolder)
leaseHolder := *tErr.LeaseHolder
ds.updateLeaseHolderCache(rangeID, leaseHolder)

// TODO(bdarnell): Move the new lease holder to the head of the queue
// for the next retry.
// Move the new lease holder to the head of the queue for the next retry.
transport.MoveToFront(leaseHolder)
}
return true
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (l *legacyTransportAdapter) SendNext(done chan<- BatchCall) {
}
}

func (*legacyTransportAdapter) MoveToFront(roachpb.ReplicaDescriptor) {
}

func (*legacyTransportAdapter) Close() {
}

Expand Down Expand Up @@ -1838,6 +1841,9 @@ func (t *slowLeaseHolderTransport) SendNext(done chan<- BatchCall) {
}
}

func (t *slowLeaseHolderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
}

func (t *slowLeaseHolderTransport) Close() {
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func (c *channelSaveTransport) SendNext(done chan<- BatchCall) {
c.ch <- done
}

func (*channelSaveTransport) MoveToFront(roachpb.ReplicaDescriptor) {
}

func (*channelSaveTransport) Close() {
}

Expand Down Expand Up @@ -417,6 +420,9 @@ func (f *firstNErrorTransport) SendNext(done chan<- BatchCall) {
done <- call
}

func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) {
}

func (*firstNErrorTransport) Close() {
}

Expand Down
63 changes: 57 additions & 6 deletions pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -55,6 +56,8 @@ type batchClient struct {
client roachpb.InternalClient
args roachpb.BatchRequest
healthy bool
retried bool
pending bool
}

// BatchCall contains a response and an RPC error (note that the
Expand Down Expand Up @@ -93,6 +96,12 @@ type Transport interface {
// as needed.
SendNext(chan<- BatchCall)

// MoveToFront locates the specified replica and moves it to the
// front of the ordering of replicas to try. If the replica has
// already been tried, it will be retried. If the specified replica
// can't be found, this is a noop.
MoveToFront(roachpb.ReplicaDescriptor)

// Close is called when the transport is no longer needed. It may
// cancel any pending RPCs without writing any response to the channel.
Close()
Expand Down Expand Up @@ -135,21 +144,24 @@ func grpcTransportFactoryImpl(
}

type grpcTransport struct {
opts SendOptions
rpcContext *rpc.Context
orderedClients []batchClient
opts SendOptions
rpcContext *rpc.Context
clientIndex int
orderedClients []batchClient
clientPendingMu syncutil.Mutex // protects access to all batchClient pending flags
}

func (gt *grpcTransport) IsExhausted() bool {
return len(gt.orderedClients) == 0
return gt.clientIndex == len(gt.orderedClients)
}

// SendNext invokes the specified RPC on the supplied client when the
// client is ready. On success, the reply is sent on the channel;
// otherwise an error is sent.
func (gt *grpcTransport) SendNext(done chan<- BatchCall) {
client := gt.orderedClients[0]
gt.orderedClients = gt.orderedClients[1:]
client := gt.orderedClients[gt.clientIndex]
gt.clientIndex++
gt.setPending(client.args.Replica, true)

addr := client.remoteAddr
if log.V(2) {
Expand All @@ -170,6 +182,7 @@ func (gt *grpcTransport) SendNext(done chan<- BatchCall) {
}

reply, err := localServer.Batch(gt.opts.ctx, &client.args)
gt.setPending(client.args.Replica, false)
done <- BatchCall{Reply: reply, Err: err}
return
}
Expand All @@ -191,16 +204,51 @@ func (gt *grpcTransport) SendNext(done chan<- BatchCall) {
}
}
}
gt.setPending(client.args.Replica, false)
done <- BatchCall{Reply: reply, Err: err}
}()
}

func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
gt.clientPendingMu.Lock()
defer gt.clientPendingMu.Unlock()
for i := range gt.orderedClients {
if gt.orderedClients[i].args.Replica == replica {
// If a call to this replica is active or retried, don't move it.
if gt.orderedClients[i].pending || gt.orderedClients[i].retried {
return
}
// If we've already processed the replica, decrement the current
// index before we swap.
if i < gt.clientIndex {
gt.orderedClients[i].retried = true
gt.clientIndex--
}
// Swap the client representing this replica to the front.
gt.orderedClients[i], gt.orderedClients[gt.clientIndex] =
gt.orderedClients[gt.clientIndex], gt.orderedClients[i]
return
}
}
}

func (*grpcTransport) Close() {
// TODO(bdarnell): Save the cancel functions of all pending RPCs and
// call them here. (it's fine to ignore them for now since they'll
// time out anyway)
}

func (gt *grpcTransport) setPending(replica roachpb.ReplicaDescriptor, pending bool) {
gt.clientPendingMu.Lock()
defer gt.clientPendingMu.Unlock()
for i := range gt.orderedClients {
if gt.orderedClients[i].args.Replica == replica {
gt.orderedClients[i].pending = pending
return
}
}
}

// splitHealthy splits the provided client slice into healthy clients and
// unhealthy clients, based on their connection state. Healthy clients will
// be rearranged first in the slice, and unhealthy clients will be rearranged
Expand Down Expand Up @@ -270,5 +318,8 @@ func (s *senderTransport) SendNext(done chan<- BatchCall) {
done <- BatchCall{Reply: br}
}

func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) {
}

func (s *senderTransport) Close() {
}
Loading

0 comments on commit 5bc7bf1

Please sign in to comment.