Skip to content

Commit

Permalink
sendAttempt
Browse files Browse the repository at this point in the history
  • Loading branch information
tamird committed Jul 21, 2015
1 parent 12ae84a commit 32e5447
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func (ds *DistSender) getDescriptors(call proto.Call) (*proto.RangeDescriptor, *
// replicas before making a single attempt at sending the request. It returns
// the result of sending the RPC; a potential error contained in the reply has
// to be handled separately by the caller.
func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply proto.Response, desc *proto.RangeDescriptor) error {
func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, desc *proto.RangeDescriptor) (proto.Response, error) {
defer trace.Epoch("sending RPC")()
// Truncate the request to our current range, making sure not to
// touch it unless we have to (it is illegal to send EndKey on
Expand Down Expand Up @@ -552,13 +552,7 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply
}
}

rpcReply, err := ds.sendRPC(trace, desc.RaftID, replicas, order, args)
if err == nil {
// Modify at the pointer's target
dst := reflect.ValueOf(reply).Elem()
dst.Set(reflect.ValueOf(rpcReply).Elem())
}
return err
return ds.sendRPC(trace, desc.RaftID, replicas, order, args)
}

// Send implements the client.Sender interface. It verifies
Expand All @@ -573,7 +567,6 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply
// must not be used concurrently until Send has returned.
func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
args := call.Args
finalReply := call.Reply

// Verify permissions.
if err := ds.verifyPermissions(call.Args); err != nil {
Expand Down Expand Up @@ -607,19 +600,21 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
args.Header().Key = key
}(args.Header().Key)

first := true

// Retry logic for lookup of range by key and RPCs to range replicas.
curReply := finalReply
for {
call.Reply = curReply
call.Reply.Header().Reset()

var curReply proto.Response
var desc, descNext *proto.RangeDescriptor
var err error
for r := retry.Start(ds.rpcRetryOptions); r.Next(); {
// Get range descriptor (or, when spanning range, descriptors). Our
// error handling below may clear them on certain errors, so we
// refresh (likely from the cache) on every retry.
descDone := trace.Epoch("meta descriptor lookup")
// It is safe to pass call here (with its embedded reply) because
// the reply is only used to check that it implements
// proto.Combinable if the request spans multiple ranges.
desc, descNext, err = ds.getDescriptors(call)
descDone()
// getDescriptors may fail retryably if the first range isn't
Expand All @@ -633,7 +628,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
}
break
}
err = ds.sendAttempt(trace, args, curReply, desc)
// At this point reply.Header().Error may be non-nil!
curReply, err = ds.sendAttempt(trace, args, desc)
if err != nil {
trace.Event(fmt.Sprintf("send error: %T", err))
// For an RPC error to occur, we must've been unable to contact any
Expand Down Expand Up @@ -712,11 +708,15 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
return
}

if finalReply != curReply {
if first {
// Equivalent of `*call.Reply = curReply`. Generics!
dst := reflect.ValueOf(call.Reply).Elem()
dst.Set(reflect.ValueOf(curReply).Elem())
} else {
// This was the second or later call in a multi-range request.
// Combine the new response with the existing one.
if cFinalReply, ok := finalReply.(proto.Combinable); ok {
cFinalReply.Combine(curReply)
if cReply, ok := call.Reply.(proto.Combinable); ok {
cReply.Combine(curReply)
} else {
// This should never apply in practice, as we'll only end up here
// for range-spanning requests.
Expand All @@ -725,6 +725,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
}
}

first = false

// If this request has a bound, such as MaxResults in
// ScanRequest, check whether enough rows have been retrieved.
if argsBounded {
Expand Down Expand Up @@ -755,13 +757,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
// StartKey would move us to the beginning of the current range,
// resulting in a duplicate scan.
args.Header().Key = desc.EndKey

// This is a multi-range request, make a new reply object for
// subsequent iterations of the loop.
curReply = args.CreateReply()
trace.Event("querying next range")
}
call.Reply = finalReply
}

// updateLeaderCache updates the cached leader for the given Raft group,
Expand Down

0 comments on commit 32e5447

Please sign in to comment.