diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 150748ecc2d0..291e2daa5a1e 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -802,7 +802,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions }) } -const heartbeatFailureLogFormat = `failed node liveness heartbeat: %+v +const heartbeatFailureLogFormat = `failed node liveness heartbeat: %v An inability to maintain liveness will prevent a node from participating in a cluster. If this problem persists, it may be a sign of resource starvation or diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 320c71fa1ae0..dc7c5dc4068f 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -2496,6 +2496,7 @@ func (rpcCtx *Context) runHeartbeat( } if err != nil { + log.VEventf(ctx, 2, "received error on ping response from n%d, %v", conn.remoteNodeID, err) return err } @@ -2604,8 +2605,8 @@ func (rpcCtx *Context) NewHeartbeatService() *HeartbeatService { } } -// VerifyDialback verifies connectivity from the recipient of a PingRequest -// back to the sender. If there is already a connection in place, it will return +// VerifyDialback verifies connectivity from the recipient of a PingRequest back +// to the sender. If there is already a connection in place, it will return // immediately without error. If there is no connection in place and the // NeedsDialback on the PingRequest is not set to NONE, then it will establish a // connection in either blocking or non-blocking mode. @@ -2614,6 +2615,9 @@ func (rpcCtx *Context) NewHeartbeatService() *HeartbeatService { // established. // NON_BLOCKING mode will attempt to establish a reverse connection and send the // result on the next PingRequest that is sent on this connection. +// This method keeps track of non blocking attempts in the dialbackMu and will +// clear out any pending attempts as soon as a successful connection is +// established. func (rpcCtx *Context) VerifyDialback( ctx context.Context, request *PingRequest, response *PingResponse, locality roachpb.Locality, ) error { @@ -2623,71 +2627,81 @@ func (rpcCtx *Context) VerifyDialback( baseAddr := util.UnresolvedAddr{NetworkField: "tcp", AddressField: request.OriginAddr} target := locality.LookupAddress(request.LocalityAddress, &baseAddr).AddressField + // nodeID may be null for "bootstrapping" requests. In that case we always + // assume blocking mode since we can't track connection attempts. nodeID := request.OriginNodeID - // Initially the nodeID might not be set since it is assigned by the cluster - // not the node. In that case, we can't look up if we have a connection to the - // node and instead need to always try dialback. - if nodeID != 0 { - prevErr, found := rpcCtx.previousAttempt(nodeID) - if found { - return prevErr - } + // Check in our regular connection map to see if we are healthy. We use the + // System class because that is what is important from a liveness perspective. + // If we are unable to maintain a healthy connection on the System class we + // will fail other connections also. + connHealthErr := rpcCtx.ConnHealth(target, nodeID, SystemClass) - // Check in our regular connection map to see if we are healthy. We only - // care about the SystemClass because that is what is important from a Raft - // liveness perspective. - err := rpcCtx.ConnHealth(target, nodeID, SystemClass) - // We have a successful connection, nothing else to do. - if err == nil { - return nil - } - log.VEventf(ctx, 2, "unable to verify health on open conn, trying dialback conn to %s, n%d, %v", target, nodeID, err) + // We have a successful connection so report success. Any ongoing attempts no + // longer need to be tracked. + if connHealthErr == nil { + rpcCtx.clearPreviousAttempt(nodeID) + return nil } + log.VEventf(ctx, 2, "unable to verify health on existing conn, trying dialback conn to %s, n%d mode %v, %v", + target, nodeID, request.NeedsDialback, connHealthErr) + if nodeID == 0 || request.NeedsDialback == PingRequest_BLOCKING { // Since we don't have a successful reverse connection, try and dial back // manually. We don't use the regular dialer pool since we don't want to wait // for heartbeats on this connection. // TODO(baptist): Consider using GRPCUnvalidatedDial and use the // WaitForStateChange to detect when the TCP connection is established. This - // will keep this connection in the pool after establishment. Note the class - // here matter since this connection is not added to a pool and immediately - // closed. + // will keep this connection in the pool after establishment. Wait until + // https://github.com/grpc/grpc-go/issues/5496 is completed. ctx := rpcCtx.makeDialCtx(target, 0, SystemClass) conn, err := rpcCtx.grpcDialRaw(ctx, target, SystemClass, grpc.WithBlock()) if err != nil { - log.Warningf(ctx, "dialback connection failed to %s, n%d, %v", target, nodeID, err) + log.Infof(ctx, "blocking dialback connection failed to %s, n%d, %v", target, nodeID, err) return err } + log.VEventf(ctx, 2, "blocking dialback connection to n%d succeeded", nodeID) + // Clear any previous attempts since we are known to be able to initiate a + // TCP connection. + rpcCtx.clearPreviousAttempt(nodeID) _ = conn.Close() // nolint:grpcconnclose return nil } else { - // We don't have a previous attempt and the current health was not healthy, - // but we can't just trust that because there might not be new connection - // attempts. Instead, establish a new connection using the standard - // GRPCDialNode, this connection is added to the connection pool. Always - // return success on this ping, but check this connection attempt on future - // pings. Use the SystemClass to ensure that Raft traffic is not - // interrupted. It is unusual for some classes to be affected and not others - // but the SystemClass is the one we really care about. + // If the previous attempt ended in an error, we can confidently report we + // are unable to dialback. If the attempt is still ongoing, then we want + // to allow it to finish. Once it has finished, we will leave this error + // here until the connection health becomes healthy either through + // checking the health manually or a blocking ping succeeding. + return rpcCtx.loadOrCreateConnAttempt(nodeID, func() *Connection { + return rpcCtx.GRPCDialNode(target, nodeID, SystemClass) + }) + } +} + +// clearPreviousAttempt will clear out any previous errors on connection +// attempts. This is only done after we have verified we have a healthy +// established connection to the sender of this ping. +func (rpcCtx *Context) clearPreviousAttempt(nodeID roachpb.NodeID) { + if nodeID > 0 { rpcCtx.dialbackMu.Lock() defer rpcCtx.dialbackMu.Unlock() - rpcCtx.dialbackMu.m[nodeID] = rpcCtx.GRPCDialNode(target, nodeID, SystemClass) - return nil + rpcCtx.dialbackMu.m[nodeID] = nil } } -// previousAttempt checks if any prior attempt that started but hadn't complete -// has now completed. Until this attempt completes, the Pings will continue to -// return success. Once this completes, we remove this from our map and return -// whatever error this attempt returned. -func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) { - // Check if there was a previous attempt and if so use that and clear out - // the previous attempt. +// loadOrCreateConnAttempt checks if we have an in-progress connection attempt +// to a store, and if not will create a connection and store it in the map. It +// takes a function to create a connection because the connection is only +// created in the case where it doesn't already exist. If there is already a +// ongoing connection attempt, it will instead check the status of that attempt. +// If it is completed and is in error, then it will return that error, if it is +// still ongoing, then it returns nil to signify that it might be healthy. +func (rpcCtx *Context) loadOrCreateConnAttempt( + nodeID roachpb.NodeID, createConnFunc func() *Connection, +) error { rpcCtx.dialbackMu.Lock() defer rpcCtx.dialbackMu.Unlock() - previousAttempt := rpcCtx.dialbackMu.m[nodeID] // Check if the previous connection is completed (successfully or not). This // happens only on subsequent pings after not detecting a healthy reverse @@ -2697,19 +2711,26 @@ func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) { // happen if our previous connect attempt failed between pings. Without this // protection we would continually try opening new dialback connections, but // never observe the result. - if previousAttempt != nil { + if previousAttempt := rpcCtx.dialbackMu.m[nodeID]; previousAttempt != nil { select { case <-previousAttempt.initialHeartbeatDone: // The connection attempt was completed, return the outcome of it. - err, _ := previousAttempt.err.Load().(error) - rpcCtx.dialbackMu.m[nodeID] = nil - return err, true + err := previousAttempt.err.Load().(error) + if err == nil { + // If it completed without error then don't track the connection + // anymore. If it did have an error we need to track it until it later gets cleared. + rpcCtx.dialbackMu.m[nodeID] = nil + } + return err default: // We still don't know the outcome of the previous attempt. For now - // allow this Ping to continue and check on the following attempt. - return nil, true + // allow this attempt to continue and check in the future. + return nil } } - // There is no previous attempt in place. - return nil, false + + // There is no previous attempt in place. Create a connection and store it for + // the future, for now return success. + rpcCtx.dialbackMu.m[nodeID] = createConnFunc() + return nil } diff --git a/pkg/rpc/heartbeat.go b/pkg/rpc/heartbeat.go index 9653dbe9f9dc..ffe8b8d39b4a 100644 --- a/pkg/rpc/heartbeat.go +++ b/pkg/rpc/heartbeat.go @@ -171,6 +171,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, request *PingRequest) (*Pi if fn := hs.onHandlePing; fn != nil { if err := fn(ctx, request, &response); err != nil { + log.Infof(ctx, "failing ping request from node n%d", request.OriginNodeID) return nil, err } }