diff --git a/internal/client/rpc_sender.go b/internal/client/rpc_sender.go index dedd1cd3103b..22345931e74d 100644 --- a/internal/client/rpc_sender.go +++ b/internal/client/rpc_sender.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/rpc" + "github.com/cockroachdb/cockroach/util/stop" "github.com/pkg/errors" ) @@ -33,10 +34,20 @@ type sender struct { // database provided by a Cockroach cluster by connecting via RPC to a // Cockroach node. func NewSender(ctx *rpc.Context, target string) (Sender, error) { - conn, err := ctx.GRPCDial(target) + // We don't use ctx.GRPCDial because this is an external client connection + // and we don't want to run the heartbeat service which will close the + // connection if the transport fails. + dialOpt, err := ctx.GRPCDialOption() if err != nil { return nil, err } + conn, err := grpc.Dial(target, dialOpt) + if err != nil { + return nil, err + } + ctx.Stopper.AddCloser(stop.CloserFn(func() { + _ = conn.Close() // we're closing, ignore the error + })) return sender{roachpb.NewExternalClient(conn)}, nil } diff --git a/kv/dist_sender.go b/kv/dist_sender.go index 02e38092bf67..0f695ddfc589 100644 --- a/kv/dist_sender.go +++ b/kv/dist_sender.go @@ -140,7 +140,7 @@ type DistSenderContext struct { // Cockroach cluster via the supplied gossip instance. Supplying a // DistSenderContext or the fields within is optional. For omitted values, sane // defaults will be used. -func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { +func NewDistSender(ctx *DistSenderContext, g *gossip.Gossip) *DistSender { if ctx == nil { ctx = &DistSenderContext{} } @@ -150,7 +150,7 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { } ds := &DistSender{ clock: clock, - gossip: gossip, + gossip: g, } if ctx.nodeDescriptor != nil { atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(ctx.nodeDescriptor)) @@ -196,6 +196,25 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { ds.sendNextTimeout = defaultSendNextTimeout } + if g != nil { + g.RegisterCallback(gossip.KeyFirstRangeDescriptor, + func(_ string, value roachpb.Value) { + ctx := context.Background() + if log.V(1) { + var desc roachpb.RangeDescriptor + if err := value.GetProto(&desc); err != nil { + log.Errorf(ctx, "unable to parse gossipped first range descriptor: %s", err) + } else { + log.Infof(ctx, + "gossipped first range descriptor: %+v", desc.Replicas) + } + } + err := ds.rangeCache.EvictCachedRangeDescriptor(roachpb.RKeyMin, nil, false) + if err != nil { + log.Warningf(ctx, "failed to evict first range descriptor: %s", err) + } + }) + } return ds } diff --git a/kv/range_cache.go b/kv/range_cache.go index ff83d39ce6fd..f8a354b720de 100644 --- a/kv/range_cache.go +++ b/kv/range_cache.go @@ -475,10 +475,6 @@ func (rdc *rangeDescriptorCache) EvictCachedRangeDescriptor(descKey roachpb.RKey } func (rdc *rangeDescriptorCache) evictCachedRangeDescriptorLocked(descKey roachpb.RKey, seenDesc *roachpb.RangeDescriptor, inclusive bool) error { - if seenDesc == nil { - log.Warningf(context.TODO(), "compare-and-evict for key %s with nil descriptor; clearing unconditionally", descKey) - } - rngKey, cachedDesc, err := rdc.getCachedRangeDescriptorLocked(descKey, inclusive) if err != nil { return err diff --git a/rpc/context.go b/rpc/context.go index 0c6d0a56700d..dabf2149bdda 100644 --- a/rpc/context.go +++ b/rpc/context.go @@ -146,15 +146,9 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return meta.conn, nil } - var dialOpt grpc.DialOption - if ctx.Insecure { - dialOpt = grpc.WithInsecure() - } else { - tlsConfig, err := ctx.GetClientTLSConfig() - if err != nil { - return nil, err - } - dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + dialOpt, err := ctx.GRPCDialOption() + if err != nil { + return nil, err } dialOpts := make([]grpc.DialOption, 0, 1+len(opts)) @@ -184,6 +178,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return conn, err } +// GRPCDialOption returns the GRPC dialing option appropriate for the context. +func (ctx *Context) GRPCDialOption() (grpc.DialOption, error) { + var dialOpt grpc.DialOption + if ctx.Insecure { + dialOpt = grpc.WithInsecure() + } else { + tlsConfig, err := ctx.GetClientTLSConfig() + if err != nil { + return dialOpt, err + } + dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + return dialOpt, nil +} + // setConnHealthy sets the health status of the connection. func (ctx *Context) setConnHealthy(remoteAddr string, healthy bool) { ctx.conns.Lock() @@ -268,5 +277,7 @@ func (ctx *Context) runHeartbeat(cc *grpc.ClientConn, remoteAddr string) error { func (ctx *Context) heartbeat(heartbeatClient HeartbeatClient, request PingRequest) (*PingResponse, error) { goCtx, cancel := context.WithTimeout(context.Background(), ctx.HeartbeatTimeout) defer cancel() - return heartbeatClient.Ping(goCtx, &request, grpc.FailFast(false)) + // NB: We want the request to fail-fast (the default), otherwise we won't be + // notified of transport failures. + return heartbeatClient.Ping(goCtx, &request) } diff --git a/storage/raft_transport.go b/storage/raft_transport.go index c5dda3c3b909..91e904f3ae7a 100644 --- a/storage/raft_transport.go +++ b/storage/raft_transport.go @@ -318,13 +318,6 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool { } isSnap := req.Message.Type == raftpb.MsgSnap toReplica := req.ToReplica - // Get a connection to the node specified by the replica's node - // ID. If no connection can be made, return false to indicate caller - // should drop the Raft message. - conn := s.transport.getNodeConn(toReplica.NodeID) - if conn == nil { - return false - } s.transport.mu.Lock() // We use two queues; one will be used for snapshots, the other for all other @@ -342,6 +335,17 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool { s.transport.mu.Unlock() if !ok { + // Get a connection to the node specified by the replica's node + // ID. If no connection can be made, return false to indicate caller + // should drop the Raft message. + conn := s.transport.getNodeConn(toReplica.NodeID) + if conn == nil { + s.transport.mu.Lock() + delete(queues, toReplica) + s.transport.mu.Unlock() + return false + } + // Starting workers in a task prevents data races during shutdown. if err := s.transport.rpcContext.Stopper.RunTask(func() { s.transport.rpcContext.Stopper.RunWorker(func() {