Skip to content

Commit

Permalink
kv: convert uni-directional network partitions to bi-directional
Browse files Browse the repository at this point in the history
Previously one-way partitions where a node could initiate a successful
TCP connection in one direction, but the reverse connection fails which
causes problems. The node that initiates outgoing connections can
acquire leases and cause failures for reads and writes to those ranges.
This is particularly a problem if it acquires the liveness range leases,
but is a problem even for other ranges.

This commit adds an additional check during server-to-server
communication where the recipient of a new PingRequest first validates
that it is able to open a reverse connection to the initiator before
responding. Additionally, it will monitor whether it has a successful
reverse connection over time and asynchronously validate reverse
connections to the sender. The ongoing validation is asynchronous to
avoid adding delays to PingResponses as they are used for measuring
clock offsets.

Release note (bug fix): RPC connections between nodes now require RPC
connections to be established in both directions, otherwise the
connection will be closed. This is done to prevent asymmetric network
partitions where nodes are able to send outbound messages but not
receive inbound messages, which could result in persistent
unavailability. This behavior can be disabled by the cluster setting
rpc.dialback.enabled.

Also the onlyOnceDialer prevents retrying after a dial error, however
this can get into a state where it continually retries for certain
network connections. This is not easy to reproduce in a unit tests as
it requires killing the connection using iptables (normal closes don't
cauuse this).

After this change the onlyOnceDialer will no longer repeatedly retry to
reconnect after a broken connection during setup.

Epic: none
Release note: None
  • Loading branch information
andrewbaptist committed Mar 7, 2023
1 parent e4b62c9 commit 84fff6b
Show file tree
Hide file tree
Showing 13 changed files with 485 additions and 101 deletions.
4 changes: 2 additions & 2 deletions pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (n *Node) Alive() bool {
}

// StatusClient returns a StatusClient set up to talk to this node.
func (n *Node) StatusClient() serverpb.StatusClient {
func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
n.Lock()
existingClient := n.statusClient
n.Unlock()
Expand All @@ -491,7 +491,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr()).Connect(ctx)
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ type Config struct {
// The flag exists mostly for the benefit of tests, and for
// `cockroach start-single-node`.
AutoInitializeCluster bool

// LocalityAddresses contains private IP addresses that can only be accessed
// in the corresponding locality.
LocalityAddresses []roachpb.LocalityAddress
}

// HistogramWindowInterval is used to determine the approximate length of time
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/allocsim/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *allocSim) rangeInfo() allocStats {
for i := 0; i < len(a.Nodes); i++ {
go func(i int) {
defer wg.Done()
status := a.Nodes[i].StatusClient()
status := a.Nodes[i].StatusClient(context.Background())
if status == nil {
// Cluster is shutting down.
return
Expand Down
31 changes: 19 additions & 12 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,21 +552,11 @@ func (sc StoreCapacity) Load() load.Load {
// AddressForLocality returns the network address that nodes in the specified
// locality should use when connecting to the node described by the descriptor.
func (n *NodeDescriptor) AddressForLocality(loc Locality) *util.UnresolvedAddr {
// If the provided locality has any tiers that are an exact exact match (key
// If the provided locality has any tiers that are an exact match (key
// and value) with a tier in the node descriptor's custom LocalityAddress
// list, return the corresponding address. Otherwise, return the default
// address.
//
// O(n^2), but we expect very few locality tiers in practice.
for i := range n.LocalityAddress {
nLoc := &n.LocalityAddress[i]
for _, loc := range loc.Tiers {
if loc == nLoc.LocalityTier {
return &nLoc.Address
}
}
}
return &n.Address
return loc.LookupAddress(n.LocalityAddress, &n.Address)
}

// CheckedSQLAddress returns the value of SQLAddress if set. If not, either
Expand Down Expand Up @@ -644,6 +634,23 @@ func (l Locality) Equals(r Locality) bool {
return true
}

// LookupAddress is given a set of LocalityAddresses and finds the one that
// exactly matches my Locality. O(n^2), but we expect very few locality tiers in
// practice.
func (l Locality) LookupAddress(
address []LocalityAddress, base *util.UnresolvedAddr,
) *util.UnresolvedAddr {
for i := range address {
nLoc := &address[i]
for _, loc := range l.Tiers {
if loc == nLoc.LocalityTier {
return &nLoc.Address
}
}
}
return base
}

// MaxDiversityScore is the largest possible diversity score, indicating that
// two localities are as different from each other as possible.
const MaxDiversityScore = 1.0
Expand Down
1 change: 1 addition & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/contextutil",
"//pkg/util/envutil",
Expand Down
Loading

0 comments on commit 84fff6b

Please sign in to comment.