Skip to content

Commit

Permalink
rpc: Fix blackhole recv
Browse files Browse the repository at this point in the history
Fixes: #99104
Informs #84289

As part of the previous fix for partition handling, we tracked the state
of a previous attempt and use that result on the next attempt. However
if there were multiple connections, we may only block system traffic
connections and not default class connections.  This change addresses
that by ensuring a failed dialback attempt is remembered until we are
able to successfully connect back to the pinging node.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Mar 30, 2023
1 parent 56bb002 commit 53c94ad
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
})
}

const heartbeatFailureLogFormat = `failed node liveness heartbeat: %+v
const heartbeatFailureLogFormat = `failed node liveness heartbeat: %v
An inability to maintain liveness will prevent a node from participating in a
cluster. If this problem persists, it may be a sign of resource starvation or
Expand Down
119 changes: 70 additions & 49 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,7 @@ func (rpcCtx *Context) runHeartbeat(
}

if err != nil {
log.VEventf(ctx, 2, "received error on ping response from n%d, %v", conn.remoteNodeID, err)
return err
}

Expand Down Expand Up @@ -2604,8 +2605,8 @@ func (rpcCtx *Context) NewHeartbeatService() *HeartbeatService {
}
}

// VerifyDialback verifies connectivity from the recipient of a PingRequest
// back to the sender. If there is already a connection in place, it will return
// VerifyDialback verifies connectivity from the recipient of a PingRequest back
// to the sender. If there is already a connection in place, it will return
// immediately without error. If there is no connection in place and the
// NeedsDialback on the PingRequest is not set to NONE, then it will establish a
// connection in either blocking or non-blocking mode.
Expand All @@ -2614,6 +2615,9 @@ func (rpcCtx *Context) NewHeartbeatService() *HeartbeatService {
// established.
// NON_BLOCKING mode will attempt to establish a reverse connection and send the
// result on the next PingRequest that is sent on this connection.
// This method keeps track of non blocking attempts in the dialbackMu and will
// clear out any pending attempts as soon as a successful connection is
// established.
func (rpcCtx *Context) VerifyDialback(
ctx context.Context, request *PingRequest, response *PingResponse, locality roachpb.Locality,
) error {
Expand All @@ -2623,71 +2627,81 @@ func (rpcCtx *Context) VerifyDialback(

baseAddr := util.UnresolvedAddr{NetworkField: "tcp", AddressField: request.OriginAddr}
target := locality.LookupAddress(request.LocalityAddress, &baseAddr).AddressField
// nodeID may be null for "bootstrapping" requests. In that case we always
// assume blocking mode since we can't track connection attempts.
nodeID := request.OriginNodeID

// Initially the nodeID might not be set since it is assigned by the cluster
// not the node. In that case, we can't look up if we have a connection to the
// node and instead need to always try dialback.
if nodeID != 0 {
prevErr, found := rpcCtx.previousAttempt(nodeID)
if found {
return prevErr
}
// Check in our regular connection map to see if we are healthy. We use the
// System class because that is what is important from a liveness perspective.
// If we are unable to maintain a healthy connection on the System class we
// will fail other connections also.
connHealthErr := rpcCtx.ConnHealth(target, nodeID, SystemClass)

// Check in our regular connection map to see if we are healthy. We only
// care about the SystemClass because that is what is important from a Raft
// liveness perspective.
err := rpcCtx.ConnHealth(target, nodeID, SystemClass)
// We have a successful connection, nothing else to do.
if err == nil {
return nil
}
log.VEventf(ctx, 2, "unable to verify health on open conn, trying dialback conn to %s, n%d, %v", target, nodeID, err)
// We have a successful connection so report success. Any ongoing attempts no
// longer need to be tracked.
if connHealthErr == nil {
rpcCtx.clearPreviousAttempt(nodeID)
return nil
}

log.VEventf(ctx, 2, "unable to verify health on existing conn, trying dialback conn to %s, n%d mode %v, %v",
target, nodeID, request.NeedsDialback, connHealthErr)

if nodeID == 0 || request.NeedsDialback == PingRequest_BLOCKING {
// Since we don't have a successful reverse connection, try and dial back
// manually. We don't use the regular dialer pool since we don't want to wait
// for heartbeats on this connection.
// TODO(baptist): Consider using GRPCUnvalidatedDial and use the
// WaitForStateChange to detect when the TCP connection is established. This
// will keep this connection in the pool after establishment. Note the class
// here matter since this connection is not added to a pool and immediately
// closed.
// will keep this connection in the pool after establishment. Wait until
// https://github.com/grpc/grpc-go/issues/5496 is completed.
ctx := rpcCtx.makeDialCtx(target, 0, SystemClass)
conn, err := rpcCtx.grpcDialRaw(ctx, target, SystemClass, grpc.WithBlock())
if err != nil {
log.Warningf(ctx, "dialback connection failed to %s, n%d, %v", target, nodeID, err)
log.Infof(ctx, "blocking dialback connection failed to %s, n%d, %v", target, nodeID, err)
return err
}
log.VEventf(ctx, 2, "blocking dialback connection to n%d succeeded", nodeID)
// Clear any previous attempts since we are known to be able to initiate a
// TCP connection.
rpcCtx.clearPreviousAttempt(nodeID)
_ = conn.Close() // nolint:grpcconnclose
return nil
} else {
// We don't have a previous attempt and the current health was not healthy,
// but we can't just trust that because there might not be new connection
// attempts. Instead, establish a new connection using the standard
// GRPCDialNode, this connection is added to the connection pool. Always
// return success on this ping, but check this connection attempt on future
// pings. Use the SystemClass to ensure that Raft traffic is not
// interrupted. It is unusual for some classes to be affected and not others
// but the SystemClass is the one we really care about.
// If the previous attempt ended in an error, we can confidently report we
// are unable to dialback. If the attempt is still ongoing, then we want
// to allow it to finish. Once it has finished, we will leave this error
// here until the connection health becomes healthy either through
// checking the health manually or a blocking ping succeeding.
return rpcCtx.loadOrCreateConnAttempt(nodeID, func() *Connection {
return rpcCtx.GRPCDialNode(target, nodeID, SystemClass)
})
}
}

// clearPreviousAttempt will clear out any previous errors on connection
// attempts. This is only done after we have verified we have a healthy
// established connection to the sender of this ping.
func (rpcCtx *Context) clearPreviousAttempt(nodeID roachpb.NodeID) {
if nodeID > 0 {
rpcCtx.dialbackMu.Lock()
defer rpcCtx.dialbackMu.Unlock()
rpcCtx.dialbackMu.m[nodeID] = rpcCtx.GRPCDialNode(target, nodeID, SystemClass)
return nil
rpcCtx.dialbackMu.m[nodeID] = nil
}
}

// previousAttempt checks if any prior attempt that started but hadn't complete
// has now completed. Until this attempt completes, the Pings will continue to
// return success. Once this completes, we remove this from our map and return
// whatever error this attempt returned.
func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) {
// Check if there was a previous attempt and if so use that and clear out
// the previous attempt.
// loadOrCreateConnAttempt checks if we have an in-progress connection attempt
// to a store, and if not will create a connection and store it in the map. It
// takes a function to create a connection because the connection is only
// created in the case where it doesn't already exist. If there is already a
// ongoing connection attempt, it will instead check the status of that attempt.
// If it is completed and is in error, then it will return that error, if it is
// still ongoing, then it returns nil to signify that it might be healthy.
func (rpcCtx *Context) loadOrCreateConnAttempt(
nodeID roachpb.NodeID, createConnFunc func() *Connection,
) error {
rpcCtx.dialbackMu.Lock()
defer rpcCtx.dialbackMu.Unlock()
previousAttempt := rpcCtx.dialbackMu.m[nodeID]

// Check if the previous connection is completed (successfully or not). This
// happens only on subsequent pings after not detecting a healthy reverse
Expand All @@ -2697,19 +2711,26 @@ func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) {
// happen if our previous connect attempt failed between pings. Without this
// protection we would continually try opening new dialback connections, but
// never observe the result.
if previousAttempt != nil {
if previousAttempt := rpcCtx.dialbackMu.m[nodeID]; previousAttempt != nil {
select {
case <-previousAttempt.initialHeartbeatDone:
// The connection attempt was completed, return the outcome of it.
err, _ := previousAttempt.err.Load().(error)
rpcCtx.dialbackMu.m[nodeID] = nil
return err, true
err := previousAttempt.err.Load().(error)
if err == nil {
// If it completed without error then don't track the connection
// anymore. If it did have an error we need to track it until it later gets cleared.
rpcCtx.dialbackMu.m[nodeID] = nil
}
return err
default:
// We still don't know the outcome of the previous attempt. For now
// allow this Ping to continue and check on the following attempt.
return nil, true
// allow this attempt to continue and check in the future.
return nil
}
}
// There is no previous attempt in place.
return nil, false

// There is no previous attempt in place. Create a connection and store it for
// the future, for now return success.
rpcCtx.dialbackMu.m[nodeID] = createConnFunc()
return nil
}
1 change: 1 addition & 0 deletions pkg/rpc/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, request *PingRequest) (*Pi

if fn := hs.onHandlePing; fn != nil {
if err := fn(ctx, request, &response); err != nil {
log.Infof(ctx, "failing ping request from node n%d", request.OriginNodeID)
return nil, err
}
}
Expand Down

0 comments on commit 53c94ad

Please sign in to comment.