Skip to content

Commit

Permalink
Merge pull request cockroachdb#8163 from petermattis/pmattis/evict-fi…
Browse files Browse the repository at this point in the history
…rst-range

all: fix various badness when resuscitating a node
  • Loading branch information
petermattis authored Aug 1, 2016
2 parents b624f98 + a79b1b9 commit 1e94f89
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 24 deletions.
13 changes: 12 additions & 1 deletion internal/client/rpc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/pkg/errors"
)

Expand All @@ -33,10 +34,20 @@ type sender struct {
// database provided by a Cockroach cluster by connecting via RPC to a
// Cockroach node.
func NewSender(ctx *rpc.Context, target string) (Sender, error) {
conn, err := ctx.GRPCDial(target)
// We don't use ctx.GRPCDial because this is an external client connection
// and we don't want to run the heartbeat service which will close the
// connection if the transport fails.
dialOpt, err := ctx.GRPCDialOption()
if err != nil {
return nil, err
}
conn, err := grpc.Dial(target, dialOpt)
if err != nil {
return nil, err
}
ctx.Stopper.AddCloser(stop.CloserFn(func() {
_ = conn.Close() // we're closing, ignore the error
}))
return sender{roachpb.NewExternalClient(conn)}, nil
}

Expand Down
23 changes: 21 additions & 2 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type DistSenderContext struct {
// Cockroach cluster via the supplied gossip instance. Supplying a
// DistSenderContext or the fields within is optional. For omitted values, sane
// defaults will be used.
func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender {
func NewDistSender(ctx *DistSenderContext, g *gossip.Gossip) *DistSender {
if ctx == nil {
ctx = &DistSenderContext{}
}
Expand All @@ -150,7 +150,7 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender {
}
ds := &DistSender{
clock: clock,
gossip: gossip,
gossip: g,
}
if ctx.nodeDescriptor != nil {
atomic.StorePointer(&ds.nodeDescriptor, unsafe.Pointer(ctx.nodeDescriptor))
Expand Down Expand Up @@ -196,6 +196,25 @@ func NewDistSender(ctx *DistSenderContext, gossip *gossip.Gossip) *DistSender {
ds.sendNextTimeout = defaultSendNextTimeout
}

if g != nil {
g.RegisterCallback(gossip.KeyFirstRangeDescriptor,
func(_ string, value roachpb.Value) {
ctx := context.Background()
if log.V(1) {
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
log.Errorf(ctx, "unable to parse gossipped first range descriptor: %s", err)
} else {
log.Infof(ctx,
"gossipped first range descriptor: %+v", desc.Replicas)
}
}
err := ds.rangeCache.EvictCachedRangeDescriptor(roachpb.RKeyMin, nil, false)
if err != nil {
log.Warningf(ctx, "failed to evict first range descriptor: %s", err)
}
})
}
return ds
}

Expand Down
4 changes: 0 additions & 4 deletions kv/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,6 @@ func (rdc *rangeDescriptorCache) EvictCachedRangeDescriptor(descKey roachpb.RKey
}

func (rdc *rangeDescriptorCache) evictCachedRangeDescriptorLocked(descKey roachpb.RKey, seenDesc *roachpb.RangeDescriptor, inclusive bool) error {
if seenDesc == nil {
log.Warningf(context.TODO(), "compare-and-evict for key %s with nil descriptor; clearing unconditionally", descKey)
}

rngKey, cachedDesc, err := rdc.getCachedRangeDescriptorLocked(descKey, inclusive)
if err != nil {
return err
Expand Down
31 changes: 21 additions & 10 deletions rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,9 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
return meta.conn, nil
}

var dialOpt grpc.DialOption
if ctx.Insecure {
dialOpt = grpc.WithInsecure()
} else {
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
return nil, err
}
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
dialOpt, err := ctx.GRPCDialOption()
if err != nil {
return nil, err
}

dialOpts := make([]grpc.DialOption, 0, 1+len(opts))
Expand Down Expand Up @@ -184,6 +178,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
return conn, err
}

// GRPCDialOption returns the GRPC dialing option appropriate for the context.
func (ctx *Context) GRPCDialOption() (grpc.DialOption, error) {
var dialOpt grpc.DialOption
if ctx.Insecure {
dialOpt = grpc.WithInsecure()
} else {
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
return dialOpt, err
}
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
return dialOpt, nil
}

// setConnHealthy sets the health status of the connection.
func (ctx *Context) setConnHealthy(remoteAddr string, healthy bool) {
ctx.conns.Lock()
Expand Down Expand Up @@ -268,5 +277,7 @@ func (ctx *Context) runHeartbeat(cc *grpc.ClientConn, remoteAddr string) error {
func (ctx *Context) heartbeat(heartbeatClient HeartbeatClient, request PingRequest) (*PingResponse, error) {
goCtx, cancel := context.WithTimeout(context.Background(), ctx.HeartbeatTimeout)
defer cancel()
return heartbeatClient.Ping(goCtx, &request, grpc.FailFast(false))
// NB: We want the request to fail-fast (the default), otherwise we won't be
// notified of transport failures.
return heartbeatClient.Ping(goCtx, &request)
}
18 changes: 11 additions & 7 deletions storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,6 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool {
}
isSnap := req.Message.Type == raftpb.MsgSnap
toReplica := req.ToReplica
// Get a connection to the node specified by the replica's node
// ID. If no connection can be made, return false to indicate caller
// should drop the Raft message.
conn := s.transport.getNodeConn(toReplica.NodeID)
if conn == nil {
return false
}

s.transport.mu.Lock()
// We use two queues; one will be used for snapshots, the other for all other
Expand All @@ -342,6 +335,17 @@ func (s RaftSender) SendAsync(req *RaftMessageRequest) bool {
s.transport.mu.Unlock()

if !ok {
// Get a connection to the node specified by the replica's node
// ID. If no connection can be made, return false to indicate caller
// should drop the Raft message.
conn := s.transport.getNodeConn(toReplica.NodeID)
if conn == nil {
s.transport.mu.Lock()
delete(queues, toReplica)
s.transport.mu.Unlock()
return false
}

// Starting workers in a task prevents data races during shutdown.
if err := s.transport.rpcContext.Stopper.RunTask(func() {
s.transport.rpcContext.Stopper.RunWorker(func() {
Expand Down

0 comments on commit 1e94f89

Please sign in to comment.