Skip to content

Commit

Permalink
Merge pull request cockroachdb#1750 from tamird/return-replies-5
Browse files Browse the repository at this point in the history
Return (response, error) #5
  • Loading branch information
tamird committed Jul 21, 2015
2 parents 3a70eae + 32e5447 commit 45a618a
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,18 @@ func (ds *DistSender) internalRangeLookup(key proto.Key, options lookupOptions,
MaxRanges: ds.rangeLookupMaxRanges,
IgnoreIntents: options.ignoreIntents,
}
reply := &proto.InternalRangeLookupResponse{}
replicas := newReplicaSlice(ds.gossip, desc)
// TODO(tschottdorf) consider a Trace here, potentially that of the request
// that had the cache miss and waits for the result.
if err := ds.sendRPC(nil /* Trace */, desc.RaftID, replicas, rpc.OrderRandom, args, reply); err != nil {
reply, err := ds.sendRPC(nil /* Trace */, desc.RaftID, replicas, rpc.OrderRandom, args)
if err != nil {
return nil, err
}
if reply.Error != nil {
return nil, reply.GoError()
rlReply := reply.(*proto.InternalRangeLookupResponse)
if rlReply.Error != nil {
return nil, rlReply.GoError()
}
return reply.Ranges, nil
return rlReply.Ranges, nil
}

// getFirstRangeDescriptor returns the RangeDescriptor for the first range on
Expand Down Expand Up @@ -408,9 +409,9 @@ func (ds *DistSender) getNodeDescriptor() *proto.NodeDescriptor {
// Note that the reply may contain a higher level error and must be checked in
// addition to the RPC error.
func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas replicaSlice, order rpc.OrderingPolicy,
args proto.Request, reply proto.Response) error {
args proto.Request) (proto.Response, error) {
if len(replicas) == 0 {
return util.Errorf("%s: replicas set is empty", args.Method())
return nil, util.Errorf("%s: replicas set is empty", args.Method())
}

// Build a slice of replica addresses (if gossiped).
Expand All @@ -423,7 +424,7 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas
replicaMap[addr.String()] = &replicas[i].Replica
}
if len(addrs) == 0 {
return noNodeAddrsAvailError{}
return nil, noNodeAddrsAvailError{}
}

// TODO(pmattis): This needs to be tested. If it isn't set we'll
Expand Down Expand Up @@ -467,13 +468,10 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas

replies, err := ds.rpcSend(rpcOpts, "Node."+args.Method().String(),
addrs, getArgs, getReply, ds.gossip.RPCContext)
if err == nil {
// Set content of replies[0] back to reply
dst := reflect.ValueOf(reply).Elem()
dst.Set(reflect.ValueOf(replies[0]).Elem())
if err != nil {
return nil, err
}

return err
return replies[0].(proto.Response), nil
}

// getDescriptors takes a call and looks up the corresponding range
Expand Down Expand Up @@ -525,7 +523,7 @@ func (ds *DistSender) getDescriptors(call proto.Call) (*proto.RangeDescriptor, *
// replicas before making a single attempt at sending the request. It returns
// the result of sending the RPC; a potential error contained in the reply has
// to be handled separately by the caller.
func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply proto.Response, desc *proto.RangeDescriptor) error {
func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, desc *proto.RangeDescriptor) (proto.Response, error) {
defer trace.Epoch("sending RPC")()
// Truncate the request to our current range, making sure not to
// touch it unless we have to (it is illegal to send EndKey on
Expand Down Expand Up @@ -554,7 +552,7 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply
}
}

return ds.sendRPC(trace, desc.RaftID, replicas, order, args, reply)
return ds.sendRPC(trace, desc.RaftID, replicas, order, args)
}

// Send implements the client.Sender interface. It verifies
Expand All @@ -569,7 +567,6 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply
// must not be used concurrently until Send has returned.
func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
args := call.Args
finalReply := call.Reply

// Verify permissions.
if err := ds.verifyPermissions(call.Args); err != nil {
Expand Down Expand Up @@ -603,19 +600,21 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
args.Header().Key = key
}(args.Header().Key)

first := true

// Retry logic for lookup of range by key and RPCs to range replicas.
curReply := finalReply
for {
call.Reply = curReply
call.Reply.Header().Reset()

var curReply proto.Response
var desc, descNext *proto.RangeDescriptor
var err error
for r := retry.Start(ds.rpcRetryOptions); r.Next(); {
// Get range descriptor (or, when spanning range, descriptors). Our
// error handling below may clear them on certain errors, so we
// refresh (likely from the cache) on every retry.
descDone := trace.Epoch("meta descriptor lookup")
// It is safe to pass call here (with its embedded reply) because
// the reply is only used to check that it implements
// proto.Combinable if the request spans multiple ranges.
desc, descNext, err = ds.getDescriptors(call)
descDone()
// getDescriptors may fail retryably if the first range isn't
Expand All @@ -629,7 +628,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
}
break
}
err = ds.sendAttempt(trace, args, curReply, desc)
// At this point reply.Header().Error may be non-nil!
curReply, err = ds.sendAttempt(trace, args, desc)
if err != nil {
trace.Event(fmt.Sprintf("send error: %T", err))
// For an RPC error to occur, we must've been unable to contact any
Expand Down Expand Up @@ -708,11 +708,15 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
return
}

if finalReply != curReply {
if first {
// Equivalent of `*call.Reply = curReply`. Generics!
dst := reflect.ValueOf(call.Reply).Elem()
dst.Set(reflect.ValueOf(curReply).Elem())
} else {
// This was the second or later call in a multi-range request.
// Combine the new response with the existing one.
if cFinalReply, ok := finalReply.(proto.Combinable); ok {
cFinalReply.Combine(curReply)
if cReply, ok := call.Reply.(proto.Combinable); ok {
cReply.Combine(curReply)
} else {
// This should never apply in practice, as we'll only end up here
// for range-spanning requests.
Expand All @@ -721,6 +725,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
}
}

first = false

// If this request has a bound, such as MaxResults in
// ScanRequest, check whether enough rows have been retrieved.
if argsBounded {
Expand Down Expand Up @@ -751,13 +757,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) {
// StartKey would move us to the beginning of the current range,
// resulting in a duplicate scan.
args.Header().Key = desc.EndKey

// This is a multi-range request, make a new reply object for
// subsequent iterations of the loop.
curReply = args.CreateReply()
trace.Event("querying next range")
}
call.Reply = finalReply
}

// updateLeaderCache updates the cached leader for the given Raft group,
Expand Down

0 comments on commit 45a618a

Please sign in to comment.