Skip to content

Commit

Permalink
Merge pull request #89618 from nvanbenschoten/backport21.2-89539
Browse files Browse the repository at this point in the history
release-21.2: rpc: don't leak shared, poisoned RPC connections
  • Loading branch information
nvanbenschoten authored Oct 11, 2022
2 parents 88b9136 + 0a4319c commit ed55c13
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ go_test(
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//stats",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
37 changes: 24 additions & 13 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,36 +1125,47 @@ func (ctx *Context) GRPCDialPod(
func (ctx *Context) grpcDialNodeInternal(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
thisConnKeys := []connKey{{target, remoteNodeID, class}}
value, ok := ctx.conns.Load(thisConnKeys[0])
thisKey := connKey{target, remoteNodeID, class}
value, ok := ctx.conns.Load(thisKey)
if !ok {
value, _ = ctx.conns.LoadOrStore(thisConnKeys[0], newConnectionToNodeID(ctx.Stopper, remoteNodeID))
value, _ = ctx.conns.LoadOrStore(thisKey, newConnectionToNodeID(ctx.Stopper, remoteNodeID))
}

conn := value.(*Connection)
conn.initOnce.Do(func() {
connKeys := []connKey{thisKey}
if remoteNodeID != 0 {
// If the first connection established at a target address is
// for a specific node ID, then we want to reuse that connection
// also for other dials (eg for gossip) which don't require a
// specific node ID. (We do this as an optimization to reduce
// the number of TCP connections alive between nodes. This is
// not strictly required for correctness.) This LoadOrStore will
// ensure we're registering the connection we just created for
// future use by these other dials.
// ensure we're registering this connection for future use by
// these other dials.
//
// We need to be careful to unregister both connKeys when the
// connection breaks. Otherwise, we leak the entry below which
// "simulates" a hard network partition for anyone dialing without
// the nodeID (gossip).
//
// See:
// https://github.com/cockroachdb/cockroach/issues/37200
// See: https://github.com/cockroachdb/cockroach/issues/37200
//
// We also need to be careful that the goroutine which stores the
// connection under the second connKey (and accounts for this in
// connKeys) is the same one that will remove the connection from the
// conns map on disconnect. If a separate goroutine were to remove the
// conn, it may only do so for one of the two connKeys, leaking the
// other. We ensure this by running the logic to share connections
// inside the sync.Once function. In doing so, we ensure that the
// goroutine removing the conn from the map is aware of all keys that
// it is stored under.
otherKey := connKey{target, 0, class}
if _, loaded := ctx.conns.LoadOrStore(otherKey, value); !loaded {
thisConnKeys = append(thisConnKeys, otherKey)
connKeys = append(connKeys, otherKey)
}
}
}

conn := value.(*Connection)
conn.initOnce.Do(func() {
// Either we kick off the heartbeat loop (and clean up when it's done),
// or we clean up the connKey entries immediately.
var redialChan <-chan struct{}
Expand All @@ -1167,13 +1178,13 @@ func (ctx *Context) grpcDialNodeInternal(
!grpcutil.IsConnectionRejected(err) {
log.Health.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
ctx.removeConn(conn, thisConnKeys...)
ctx.removeConn(conn, connKeys...)
}); err != nil {
conn.dialErr = err
}
}
if conn.dialErr != nil {
ctx.removeConn(conn, thisConnKeys...)
ctx.removeConn(conn, connKeys...)
}
})

Expand Down
57 changes: 56 additions & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -526,7 +527,7 @@ func TestConnectionRemoveNodeIDZero(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
clientCtx := newTestContext(uuid.MakeV4(), clock, stopper)
// Provoke an error.
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1, DefaultClass).Connect(context.Background())
_, err := clientCtx.GRPCDialNode("127.0.0.1:notaport", 1, DefaultClass).Connect(ctx)
if err == nil {
t.Fatal("expected some kind of error, got nil")
}
Expand All @@ -547,6 +548,60 @@ func TestConnectionRemoveNodeIDZero(t *testing.T) {
})
}

// TestConnectionSharingDoesNotLeak attempts to create race conditions between
// multiple dial attempts which are allowed to share the same underlying RPC
// connection. It verifies that if the shared connection fails, all references
// to the connection are cleaned up and none are leaked.
func TestConnectionSharingDoesNotLeak(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
clientCtx := newTestContext(uuid.MakeV4(), clock, stopper)

// Launch three goroutines, two of which use the same node ID and one of which
// uses node ID 0. All three point at the same address, so they are eligible to
// share a gRPC connection (if the timing works out).
addr := "127.0.0.1:notaport"
nodeIDs := []roachpb.NodeID{7, 7, 0}
var g errgroup.Group
for _, nodeID := range nodeIDs {
nodeID := nodeID // copy for goroutine
g.Go(func() error {
var conn *Connection
if nodeID == 0 {
conn = clientCtx.GRPCUnvalidatedDial(addr)
} else {
conn = clientCtx.GRPCDialNode(addr, nodeID, SystemClass)
}
_, err := conn.Connect(ctx)
if err == nil {
return errors.Errorf("expected some kind of error, got nil")
}
return nil
})
}
require.NoError(t, g.Wait())

// NB: this takes a moment because GRPCDialRaw only gives up on the initial
// connection after 1s (more precisely, the redialChan gets closed only after
// 1s), which seems difficult to configure ad-hoc.
testutils.SucceedsSoon(t, func() error {
var keys []connKey
clientCtx.conns.Range(func(k, v interface{}) bool {
keys = append(keys, k.(connKey))
return true
})
if len(keys) > 0 {
return errors.Errorf("still have connections %v", keys)
}
return nil
})
}

type interceptingListener struct {
net.Listener
connCB func(net.Conn)
Expand Down

0 comments on commit ed55c13

Please sign in to comment.