From 824c4b6e6057a4edede437d8e7b98077f3ffc33c Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sun, 31 Jul 2016 17:09:28 -0400 Subject: [PATCH 1/3] storage: only get a grpc connection if we're creating a queue Previously we were getting the cached grpc connection on every call to RaftSender.SendAsync even though it was only needed when a queue was being created. This was unnecessary waste. In the majority of cases it was retrieving the cached grpc connection and never bothering to use it. Note that when the grpc connection is closed (e.g. because the remote terminates) the associated streams will be closed which will in turn cause the queues to be removed. --- storage/raft_transport.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/storage/raft_transport.go b/storage/raft_transport.go index c5dda3c3b909..91e904f3ae7a 100644 --- a/storage/raft_transport.go +++ b/storage/raft_transport.go @@ -318,13 +318,6 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool { } isSnap := req.Message.Type == raftpb.MsgSnap toReplica := req.ToReplica - // Get a connection to the node specified by the replica's node - // ID. If no connection can be made, return false to indicate caller - // should drop the Raft message. - conn := s.transport.getNodeConn(toReplica.NodeID) - if conn == nil { - return false - } s.transport.mu.Lock() // We use two queues; one will be used for snapshots, the other for all other @@ -342,6 +335,17 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool { s.transport.mu.Unlock() if !ok { + // Get a connection to the node specified by the replica's node + // ID. If no connection can be made, return false to indicate caller + // should drop the Raft message. + conn := s.transport.getNodeConn(toReplica.NodeID) + if conn == nil { + s.transport.mu.Lock() + delete(queues, toReplica) + s.transport.mu.Unlock() + return false + } + // Starting workers in a task prevents data races during shutdown. if err := s.transport.rpcContext.Stopper.RunTask(func() { s.transport.rpcContext.Stopper.RunWorker(func() { From 5c39e1aea2e3ba16c44f3dc26ae8e0fa46c60b68 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sat, 30 Jul 2016 15:44:22 -0400 Subject: [PATCH 2/3] kv: evict the first range descriptor when it changes See #8130 --- kv/dist_sender.go | 23 +++++++++++++++++++++-- kv/range_cache.go | 4 ---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/kv/dist_sender.go b/kv/dist_sender.go index 02e38092bf67..0f695ddfc589 100644 --- a/kv/dist_sender.go +++ b/kv/dist_sender.go @@ -140,7 +140,7 @@ type DistSenderContext struct { // Cockroach cluster via the supplied gossip instance. Supplying a // DistSenderContext or the fields within is optional. For omitted values, sane // defaults will be used. -func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { +func NewDistSender(ctx *DistSenderContext, g *gossip.Gossip) *DistSender { if ctx == nil { ctx = &DistSenderContext{} } @@ -150,7 +150,7 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { } ds := &DistSender{ clock: clock, - gossip: gossip, + gossip: g, } if ctx.nodeDescriptor != nil { atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(ctx.nodeDescriptor)) @@ -196,6 +196,25 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender { ds.sendNextTimeout = defaultSendNextTimeout } + if g != nil { + g.RegisterCallback(gossip.KeyFirstRangeDescriptor, + func(_ string, value roachpb.Value) { + ctx := context.Background() + if log.V(1) { + var desc roachpb.RangeDescriptor + if err := value.GetProto(&desc); err != nil { + log.Errorf(ctx, "unable to parse gossipped first range descriptor: %s", err) + } else { + log.Infof(ctx, + "gossipped first range descriptor: %+v", desc.Replicas) + } + } + err := ds.rangeCache.EvictCachedRangeDescriptor(roachpb.RKeyMin, nil, false) + if err != nil { + log.Warningf(ctx, "failed to evict first range descriptor: %s", err) + } + }) + } return ds } diff --git a/kv/range_cache.go b/kv/range_cache.go index ff83d39ce6fd..f8a354b720de 100644 --- a/kv/range_cache.go +++ b/kv/range_cache.go @@ -475,10 +475,6 @@ func (rdc *rangeDescriptorCache) EvictCachedRangeDescriptor(descKey roachpb.RKey } func (rdc *rangeDescriptorCache) evictCachedRangeDescriptorLocked(descKey roachpb.RKey, seenDesc *roachpb.RangeDescriptor, inclusive bool) error { - if seenDesc == nil { - log.Warningf(context.TODO(), "compare-and-evict for key %s with nil descriptor; clearing unconditionally", descKey) - } - rngKey, cachedDesc, err := rdc.getCachedRangeDescriptorLocked(descKey, inclusive) if err != nil { return err From a79b1b9406e83e7a1293cffff24c9918b46084fc Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sun, 31 Jul 2016 16:35:56 -0400 Subject: [PATCH 3/3] rpc: mark heartbeat RPCs as fail-fast We want heartbeat RPCs to be fail-fast so that we get notified of transport failures and close the connection. Failure to do this left the gRPC client connections permanently open and trying to reconnect to down nodes, fouling up the circuit breaker expectations in Raft transport. Changed client.NewSender to create a separate gRPC connection which will not be heartbeat by the rpc.Context. We don't want the heartbeat service and we don't want these connections closed. Fixes #8130 --- internal/client/rpc_sender.go | 13 ++++++++++++- rpc/context.go | 31 +++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/internal/client/rpc_sender.go b/internal/client/rpc_sender.go index dedd1cd3103b..22345931e74d 100644 --- a/internal/client/rpc_sender.go +++ b/internal/client/rpc_sender.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/rpc" + "github.com/cockroachdb/cockroach/util/stop" "github.com/pkg/errors" ) @@ -33,10 +34,20 @@ type sender struct { // database provided by a Cockroach cluster by connecting via RPC to a // Cockroach node. func NewSender(ctx *rpc.Context, target string) (Sender, error) { - conn, err := ctx.GRPCDial(target) + // We don't use ctx.GRPCDial because this is an external client connection + // and we don't want to run the heartbeat service which will close the + // connection if the transport fails. + dialOpt, err := ctx.GRPCDialOption() if err != nil { return nil, err } + conn, err := grpc.Dial(target, dialOpt) + if err != nil { + return nil, err + } + ctx.Stopper.AddCloser(stop.CloserFn(func() { + _ = conn.Close() // we're closing, ignore the error + })) return sender{roachpb.NewExternalClient(conn)}, nil } diff --git a/rpc/context.go b/rpc/context.go index 0c6d0a56700d..dabf2149bdda 100644 --- a/rpc/context.go +++ b/rpc/context.go @@ -146,15 +146,9 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return meta.conn, nil } - var dialOpt grpc.DialOption - if ctx.Insecure { - dialOpt = grpc.WithInsecure() - } else { - tlsConfig, err := ctx.GetClientTLSConfig() - if err != nil { - return nil, err - } - dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + dialOpt, err := ctx.GRPCDialOption() + if err != nil { + return nil, err } dialOpts := make([]grpc.DialOption, 0, 1+len(opts)) @@ -184,6 +178,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return conn, err } +// GRPCDialOption returns the GRPC dialing option appropriate for the context. +func (ctx *Context) GRPCDialOption() (grpc.DialOption, error) { + var dialOpt grpc.DialOption + if ctx.Insecure { + dialOpt = grpc.WithInsecure() + } else { + tlsConfig, err := ctx.GetClientTLSConfig() + if err != nil { + return dialOpt, err + } + dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + return dialOpt, nil +} + // setConnHealthy sets the health status of the connection. func (ctx *Context) setConnHealthy(remoteAddr string, healthy bool) { ctx.conns.Lock() @@ -268,5 +277,7 @@ func (ctx *Context) runHeartbeat(cc *grpc.ClientConn, remoteAddr string) error { func (ctx *Context) heartbeat(heartbeatClient HeartbeatClient, request PingRequest) (*PingResponse, error) { goCtx, cancel := context.WithTimeout(context.Background(), ctx.HeartbeatTimeout) defer cancel() - return heartbeatClient.Ping(goCtx, &request, grpc.FailFast(false)) + // NB: We want the request to fail-fast (the default), otherwise we won't be + // notified of transport failures. + return heartbeatClient.Ping(goCtx, &request) }