Skip to content

Commit

Permalink
rpc: Fix blackhole recv
Browse files Browse the repository at this point in the history
Fixes: #99104
Informs #84289

As part of the previous fix for partition handling, we tracked the state
of a previous attempt and use that result on the next attempt. However
if there were multiple connections, we may only block system traffic
connections and not default class connections.  This change addresses
that by ensuring a failed dialback attempt is remembered until we are
able to successfully connect back to the pinging node.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Mar 29, 2023
1 parent f91b4dd commit 5af4759
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 59 deletions.
14 changes: 3 additions & 11 deletions pkg/cmd/roachtest/tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,21 +734,13 @@ func (f *blackholeFailer) Fail(ctx context.Context, nodeID int) {
f.t.Status("skipping fail on local cluster")
return
}
// When dropping both input and output, we use multiport to block traffic both
// to port 26257 and from port 26257 on either side of the connection, to
// avoid any spurious packets from making it through.
//
// We don't do this when only blocking in one direction, because e.g. in the
// input case we don't want inbound connections to work (INPUT to 26257), but
// we do want responses for outbound connections to work (INPUT from 26257).
if f.input && f.output {
f.c.Run(ctx, f.c.Node(nodeID),
`sudo iptables -A INPUT -m multiport -p tcp --ports 26257 -j DROP`)
f.c.Run(ctx, f.c.Node(nodeID),
`sudo iptables -A OUTPUT -m multiport -p tcp --ports 26257 -j DROP`)
} else if f.input {
if f.input {
f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A INPUT -p tcp --dport 26257 -j DROP`)
} else if f.output {
}
if f.output {
f.c.Run(ctx, f.c.Node(nodeID), `sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP`)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
})
}

const heartbeatFailureLogFormat = `failed node liveness heartbeat: %+v
const heartbeatFailureLogFormat = `failed node liveness heartbeat: %v
An inability to maintain liveness will prevent a node from participating in a
cluster. If this problem persists, it may be a sign of resource starvation or
Expand Down
117 changes: 70 additions & 47 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,7 @@ func (rpcCtx *Context) runHeartbeat(
}

if err != nil {
log.VEventf(ctx, 2, "received error on ping response from n%d, %v", conn.remoteNodeID, err)
return err
}

Expand Down Expand Up @@ -2623,71 +2624,86 @@ func (rpcCtx *Context) VerifyDialback(

baseAddr := util.UnresolvedAddr{NetworkField: "tcp", AddressField: request.OriginAddr}
target := locality.LookupAddress(request.LocalityAddress, &baseAddr).AddressField
// nodeID may be null for "bootstrapping" requests. In that case we always
// assume blocking mode since we can't track connection attempts.
nodeID := request.OriginNodeID

// Initially the nodeID might not be set since it is assigned by the cluster
// not the node. In that case, we can't look up if we have a connection to the
// node and instead need to always try dialback.
if nodeID != 0 {
prevErr, found := rpcCtx.previousAttempt(nodeID)
if found {
return prevErr
}
// Check in our regular connection map to see if we are healthy. We only
// care about the SystemClass because that is what is important from a Raft
// liveness perspective.
connHealthErr := rpcCtx.ConnHealth(target, nodeID, SystemClass)

// Check in our regular connection map to see if we are healthy. We only
// care about the SystemClass because that is what is important from a Raft
// liveness perspective.
err := rpcCtx.ConnHealth(target, nodeID, SystemClass)
// We have a successful connection, nothing else to do.
if err == nil {
return nil
}
log.VEventf(ctx, 2, "unable to verify health on open conn, trying dialback conn to %s, n%d, %v", target, nodeID, err)
// We have a successful connection so report success. Any ongoing attempts no
// longer need to be tracked.
if connHealthErr == nil {
rpcCtx.clearPreviousAttempt(nodeID)
return nil
}

log.VEventf(ctx, 2, "unable to verify health on existing conn, trying dialback conn to %s, n%d mode %v, %v",
target, nodeID, request.NeedsDialback, connHealthErr)

if nodeID == 0 || request.NeedsDialback == PingRequest_BLOCKING {
// Since we don't have a successful reverse connection, try and dial back
// manually. We don't use the regular dialer pool since we don't want to wait
// for heartbeats on this connection.
// TODO(baptist): Consider using GRPCUnvalidatedDial and use the
// WaitForStateChange to detect when the TCP connection is established. This
// will keep this connection in the pool after establishment. Note the class
// here matter since this connection is not added to a pool and immediately
// closed.
// will keep this connection in the pool after establishment. Wait until
// https://github.com/grpc/grpc-go/issues/5496 is completed.
ctx := rpcCtx.makeDialCtx(target, 0, SystemClass)
conn, err := rpcCtx.grpcDialRaw(ctx, target, SystemClass, grpc.WithBlock())
if err != nil {
log.Warningf(ctx, "dialback connection failed to %s, n%d, %v", target, nodeID, err)
log.Infof(ctx, "blocking dialback connection failed to %s, n%d, %v", target, nodeID, err)
return err
}
log.VEventf(ctx, 2, "blocking dialback connection to n%d succeeded", nodeID)
// Clear any previous attempts since we are known to be able to initiate a
// TCP connection.
rpcCtx.clearPreviousAttempt(nodeID)
_ = conn.Close() // nolint:grpcconnclose
return nil
} else {
// We don't have a previous attempt and the current health was not healthy,
// but we can't just trust that because there might not be new connection
// attempts. Instead, establish a new connection using the standard
// GRPCDialNode, this connection is added to the connection pool. Always
// return success on this ping, but check this connection attempt on future
// pings. Use the SystemClass to ensure that Raft traffic is not
// interrupted. It is unusual for some classes to be affected and not others
// but the SystemClass is the one we really care about.
// If the previous attempt ended in an error, we can confidently report we
// are unable to dialback. If the attempt is still ongoing, then we want
// to allow it to finish. Once it has finished, we will leave this error
// here until the connection health becomes healthy either through
// checking the health manually or a blocking ping succeeding.
return rpcCtx.loadOrCreateConnAttempt(nodeID, func() *Connection {
return rpcCtx.GRPCDialNode(target, nodeID, SystemClass)
})
}
}

// clearPreviousAttempt will clear out any previous errors on connection
// attempts. This is only done after we have verified we have a healthy
// established connection to the sender of this ping.
func (rpcCtx *Context) clearPreviousAttempt(nodeID roachpb.NodeID) {
if nodeID > 0 {
rpcCtx.dialbackMu.Lock()
defer rpcCtx.dialbackMu.Unlock()
rpcCtx.dialbackMu.m[nodeID] = rpcCtx.GRPCDialNode(target, nodeID, SystemClass)
return nil
rpcCtx.dialbackMu.m[nodeID] = nil
}
}

// previousAttempt checks if any prior attempt that started but hadn't complete
// has now completed. Until this attempt completes, the Pings will continue to
// return success. Once this completes, we remove this from our map and return
// whatever error this attempt returned.
func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) {
// Check if there was a previous attempt and if so use that and clear out
// the previous attempt.
// createPreviousAttempt handles the case where we don't have a previous attempt
// and the current health is not healthy, but we can't just trust that because
// there might not be new initiated connection attempts. Instead, establish a
// new connection using the standard GRPCDialNode, this connection is added to
// the connection pool. Always return success on this ping, but check this
// connection attempt on future pings. Use the SystemClass to ensure that Raft
// traffic is not interrupted. It is unusual for some classes to be affected and
// not others but the SystemClass is the one we really care about.

// loadPreviousAttempt checks if any prior attempt that started but hadn't
// complete has now completed. Until this attempt completes, the Pings will
// continue to return success. Once this completes, we remove this from our map
// and return whatever error this attempt returned.
func (rpcCtx *Context) loadOrCreateConnAttempt(
nodeID roachpb.NodeID, createConnFunc func() *Connection,
) error {
rpcCtx.dialbackMu.Lock()
defer rpcCtx.dialbackMu.Unlock()
previousAttempt := rpcCtx.dialbackMu.m[nodeID]

// Check if the previous connection is completed (successfully or not). This
// happens only on subsequent pings after not detecting a healthy reverse
Expand All @@ -2697,19 +2713,26 @@ func (rpcCtx *Context) previousAttempt(nodeID roachpb.NodeID) (error, bool) {
// happen if our previous connect attempt failed between pings. Without this
// protection we would continually try opening new dialback connections, but
// never observe the result.
if previousAttempt != nil {
if previousAttempt := rpcCtx.dialbackMu.m[nodeID]; previousAttempt != nil {
select {
case <-previousAttempt.initialHeartbeatDone:
// The connection attempt was completed, return the outcome of it.
err, _ := previousAttempt.err.Load().(error)
rpcCtx.dialbackMu.m[nodeID] = nil
return err, true
err := previousAttempt.err.Load().(error)
if err == nil {
// If it completed without error then don't track the connection
// anymore. If it did have an error we need to track it until it later gets cleared.
rpcCtx.dialbackMu.m[nodeID] = nil
}
return err
default:
// We still don't know the outcome of the previous attempt. For now
// allow this Ping to continue and check on the following attempt.
return nil, true
// allow this attempt to continue and check in the future.
return nil
}
}
// There is no previous attempt in place.
return nil, false

// There is no previous attempt in place. Create a connection and store it for
// the future, for now return success.
rpcCtx.dialbackMu.m[nodeID] = createConnFunc()
return nil
}
1 change: 1 addition & 0 deletions pkg/rpc/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (hs *HeartbeatService) Ping(ctx context.Context, request *PingRequest) (*Pi

if fn := hs.onHandlePing; fn != nil {
if err := fn(ctx, request, &response); err != nil {
log.Infof(ctx, "failing ping request from node n%d", request.OriginNodeID)
return nil, err
}
}
Expand Down

0 comments on commit 5af4759

Please sign in to comment.