diff --git a/kv/dist_sender.go b/kv/dist_sender.go index 9a75457c09ac..cc5c25d55500 100644 --- a/kv/dist_sender.go +++ b/kv/dist_sender.go @@ -290,17 +290,18 @@ func (ds *DistSender) internalRangeLookup(key proto.Key, options lookupOptions, MaxRanges: ds.rangeLookupMaxRanges, IgnoreIntents: options.ignoreIntents, } - reply := &proto.InternalRangeLookupResponse{} replicas := newReplicaSlice(ds.gossip, desc) // TODO(tschottdorf) consider a Trace here, potentially that of the request // that had the cache miss and waits for the result. - if err := ds.sendRPC(nil /* Trace */, desc.RaftID, replicas, rpc.OrderRandom, args, reply); err != nil { + reply, err := ds.sendRPC(nil /* Trace */, desc.RaftID, replicas, rpc.OrderRandom, args) + if err != nil { return nil, err } - if reply.Error != nil { - return nil, reply.GoError() + rlReply := reply.(*proto.InternalRangeLookupResponse) + if rlReply.Error != nil { + return nil, rlReply.GoError() } - return reply.Ranges, nil + return rlReply.Ranges, nil } // getFirstRangeDescriptor returns the RangeDescriptor for the first range on @@ -408,9 +409,9 @@ func (ds *DistSender) getNodeDescriptor() *proto.NodeDescriptor { // Note that the reply may contain a higher level error and must be checked in // addition to the RPC error. func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas replicaSlice, order rpc.OrderingPolicy, - args proto.Request, reply proto.Response) error { + args proto.Request) (proto.Response, error) { if len(replicas) == 0 { - return util.Errorf("%s: replicas set is empty", args.Method()) + return nil, util.Errorf("%s: replicas set is empty", args.Method()) } // Build a slice of replica addresses (if gossiped). @@ -423,7 +424,7 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas replicaMap[addr.String()] = &replicas[i].Replica } if len(addrs) == 0 { - return noNodeAddrsAvailError{} + return nil, noNodeAddrsAvailError{} } // TODO(pmattis): This needs to be tested. If it isn't set we'll @@ -467,13 +468,10 @@ func (ds *DistSender) sendRPC(trace *tracer.Trace, raftID proto.RaftID, replicas replies, err := ds.rpcSend(rpcOpts, "Node."+args.Method().String(), addrs, getArgs, getReply, ds.gossip.RPCContext) - if err == nil { - // Set content of replies[0] back to reply - dst := reflect.ValueOf(reply).Elem() - dst.Set(reflect.ValueOf(replies[0]).Elem()) + if err != nil { + return nil, err } - - return err + return replies[0].(proto.Response), nil } // getDescriptors takes a call and looks up the corresponding range @@ -525,7 +523,7 @@ func (ds *DistSender) getDescriptors(call proto.Call) (*proto.RangeDescriptor, * // replicas before making a single attempt at sending the request. It returns // the result of sending the RPC; a potential error contained in the reply has // to be handled separately by the caller. -func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply proto.Response, desc *proto.RangeDescriptor) error { +func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, desc *proto.RangeDescriptor) (proto.Response, error) { defer trace.Epoch("sending RPC")() // Truncate the request to our current range, making sure not to // touch it unless we have to (it is illegal to send EndKey on @@ -554,7 +552,7 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply } } - return ds.sendRPC(trace, desc.RaftID, replicas, order, args, reply) + return ds.sendRPC(trace, desc.RaftID, replicas, order, args) } // Send implements the client.Sender interface. It verifies @@ -569,7 +567,6 @@ func (ds *DistSender) sendAttempt(trace *tracer.Trace, args proto.Request, reply // must not be used concurrently until Send has returned. func (ds *DistSender) Send(ctx context.Context, call proto.Call) { args := call.Args - finalReply := call.Reply // Verify permissions. if err := ds.verifyPermissions(call.Args); err != nil { @@ -603,12 +600,11 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { args.Header().Key = key }(args.Header().Key) + first := true + // Retry logic for lookup of range by key and RPCs to range replicas. - curReply := finalReply for { - call.Reply = curReply - call.Reply.Header().Reset() - + var curReply proto.Response var desc, descNext *proto.RangeDescriptor var err error for r := retry.Start(ds.rpcRetryOptions); r.Next(); { @@ -616,6 +612,9 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { // error handling below may clear them on certain errors, so we // refresh (likely from the cache) on every retry. descDone := trace.Epoch("meta descriptor lookup") + // It is safe to pass call here (with its embedded reply) because + // the reply is only used to check that it implements + // proto.Combinable if the request spans multiple ranges. desc, descNext, err = ds.getDescriptors(call) descDone() // getDescriptors may fail retryably if the first range isn't @@ -629,7 +628,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { } break } - err = ds.sendAttempt(trace, args, curReply, desc) + // At this point reply.Header().Error may be non-nil! + curReply, err = ds.sendAttempt(trace, args, desc) if err != nil { trace.Event(fmt.Sprintf("send error: %T", err)) // For an RPC error to occur, we must've been unable to contact any @@ -708,11 +708,15 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { return } - if finalReply != curReply { + if first { + // Equivalent of `*call.Reply = curReply`. Generics! + dst := reflect.ValueOf(call.Reply).Elem() + dst.Set(reflect.ValueOf(curReply).Elem()) + } else { // This was the second or later call in a multi-range request. // Combine the new response with the existing one. - if cFinalReply, ok := finalReply.(proto.Combinable); ok { - cFinalReply.Combine(curReply) + if cReply, ok := call.Reply.(proto.Combinable); ok { + cReply.Combine(curReply) } else { // This should never apply in practice, as we'll only end up here // for range-spanning requests. @@ -721,6 +725,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { } } + first = false + // If this request has a bound, such as MaxResults in // ScanRequest, check whether enough rows have been retrieved. if argsBounded { @@ -751,13 +757,8 @@ func (ds *DistSender) Send(ctx context.Context, call proto.Call) { // StartKey would move us to the beginning of the current range, // resulting in a duplicate scan. args.Header().Key = desc.EndKey - - // This is a multi-range request, make a new reply object for - // subsequent iterations of the loop. - curReply = args.CreateReply() trace.Event("querying next range") } - call.Reply = finalReply } // updateLeaderCache updates the cached leader for the given Raft group,