Skip to content

Commit

Permalink
Peer dialing/kicking system overhaul (#3346)
Browse files Browse the repository at this point in the history
* Force dial + excess peer trimmer
* Ensure we always have outgoing peers
* Add configurable hard-max-peers
  • Loading branch information
Menduist authored Mar 11, 2022
1 parent 9601735 commit f589bf2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 53 deletions.
6 changes: 5 additions & 1 deletion beacon_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,14 @@ type
name: "udp-port" }: Port

maxPeers* {.
desc: "The maximum number of peers to connect to"
desc: "The target number of peers to connect to"
defaultValue: 160 # 5 (fanout) * 64 (subnets) / 2 (subs) for a heathy mesh
name: "max-peers" }: int

hardMaxPeers* {.
desc: "The maximum number of peers to connect to. Defaults to maxPeers * 1.5"
name: "hard-max-peers" }: Option[int]

nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>"
Expand Down
157 changes: 109 additions & 48 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import
libp2p/protocols/pubsub/[
pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer],
libp2p/stream/connection,
libp2p/utils/semaphore,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock],
Expand Down Expand Up @@ -65,6 +64,7 @@ type
discovery*: Eth2DiscoveryProtocol
discoveryEnabled*: bool
wantedPeers*: int
hardMaxPeers*: int
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
metadata*: altair.MetaData
Expand All @@ -81,6 +81,7 @@ type
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
peerPingerHeartbeatFut: Future[void]
peerTrimmerHeartbeatFut: Future[void]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn

Expand Down Expand Up @@ -887,7 +888,11 @@ proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =

debug "Connecting to discovered peer"
var deadline = sleepAsync(node.connectTimeout)
var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs)
var workfut = node.switch.connect(
peerAddr.peerId,
peerAddr.addrs,
forceDial = true
)

try:
# `or` operation will only raise exception of `workfut`, because `deadline`
Expand Down Expand Up @@ -916,7 +921,8 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
# Previous worker dial might have hit the maximum peers.
# TODO: could clear the whole connTable and connQueue here also, best
# would be to have this event based coming from peer pool or libp2p.
if node.switch.connManager.outSema.count > 0:

if node.peerPool.len < node.hardMaxPeers:
await node.dialPeer(remotePeerAddr, index)
# Peer was added to `connTable` before adding it to `connQueue`, so we
# excluding peer here after processing.
Expand Down Expand Up @@ -947,7 +953,8 @@ proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits): Future[seq[Node]] {.async.} =
wantedSyncnets: SyncnetBits,
minScore: int): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.

Expand Down Expand Up @@ -999,25 +1006,31 @@ proc queryRandom*(
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent

if score > 0:
if score >= minScore:
filtered.add((score, n))

d.rng[].shuffle(filtered)
return filtered.sortedByIt(-it[0]).mapIt(it[1])

proc trimConnections(node: Eth2Node, count: int) {.async.} =
proc trimConnections(node: Eth2Node, count: int) =
# Kill `count` peers, scoring them to remove the least useful ones

var scores = initOrderedTable[PeerID, int]()

# Take into account the stabilitySubnets
# During sync, only this will be used to score peers
# since gossipsub is not running yet
#
# A peer subscribed to all stabilitySubnets will
# have 640 points
var peersInGracePeriod = 0
for peer in node.peers.values:
if peer.connectionState != Connected: continue
if peer.metadata.isNone: continue

# Metadata pinger is used as grace period
if peer.metadata.isNone:
peersInGracePeriod.inc()
continue

let
stabilitySubnets = peer.metadata.get().attnets
Expand All @@ -1026,25 +1039,53 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =

scores[peer.peerId] = thisPeersScore


# Safegard: if we have too many peers in the grace
# period, don't kick anyone. Otherwise, they will be
# preferred over long-standing peers
if peersInGracePeriod > scores.len div 2:
return

# Split a 1000 points for each topic's peers
# + 10 000 points for each subbed topic
# + 5 000 points for each subbed topic
# This gives priority to peers in topics with few peers
# For instance, a topic with `dHigh` peers will give 80 points to each peer
# Whereas a topic with `dLow` peers will give 250 points to each peer
#
# Then, use the average of all topics per peers, to avoid giving too much
# point to big peers

var gossipScores = initTable[PeerID, tuple[sum: int, count: int]]()
for topic, _ in node.pubsub.gossipsub:
let
peersInMesh = node.pubsub.mesh.peers(topic)
peersSubbed = node.pubsub.gossipsub.peers(topic)
scorePerMeshPeer = 10_000 div max(peersInMesh, 1)
scorePerMeshPeer = 5_000 div max(peersInMesh, 1)
scorePerSubbedPeer = 1_000 div max(peersSubbed, 1)

for peer in node.pubsub.mesh.getOrDefault(topic):
for peer in node.pubsub.gossipsub.getOrDefault(topic):
if peer.peerId notin scores: continue
scores[peer.peerId] = scores[peer.peerId] + scorePerSubbedPeer
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerSubbedPeer,
currentVal.count + 1
)

for peer in node.pubsub.gossipsub.getOrDefault(topic):
# Avoid global topics (>75% of peers), which would greatly reduce
# the average score for small peers
if peersSubbed > scores.len div 4 * 3: continue

for peer in node.pubsub.mesh.getOrDefault(topic):
if peer.peerId notin scores: continue
scores[peer.peerId] = scores[peer.peerId] + scorePerMeshPeer
let currentVal = gossipScores.getOrDefault(peer.peerId)
gossipScores[peer.peerId] = (
currentVal.sum + scorePerMeshPeer,
currentVal.count + 1
)

for peerId, gScore in gossipScores.pairs:
scores[peerId] =
scores.getOrDefault(peerId) + (gScore.sum div gScore.count)

proc sortPerScore(a, b: (PeerID, int)): int =
system.cmp(a[1], b[1])
Expand All @@ -1055,7 +1096,7 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =

for peerId in scores.keys:
debug "kicking peer", peerId, score=scores[peerId]
await node.switch.disconnect(peerId)
asyncSpawn node.getPeer(peerId).disconnect(PeerScoreLow)
dec toKick
inc(nbc_cycling_kicked_peers)
if toKick <= 0: return
Expand Down Expand Up @@ -1137,10 +1178,20 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
(wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch)
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing})
targetOutgoingPeers = max(node.wantedPeers div 10, 3)

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or
outgoingPeers < targetOutgoingPeers:

if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets)
let
minScore =
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
1
else:
0
discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore)

let newPeers = block:
var np = newSeq[PeerAddr]()
Expand All @@ -1157,39 +1208,27 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
np.add(peerAddr)
np

# We have to be careful to kick enough peers to make room for new ones
# (If we are here, we have an unhealthy mesh, so if we're full, we have bad peers)
# But no kick too many peers because with low max-peers, that can cause disruption
# Also keep in mind that a lot of dial fails, and that we can have incoming peers waiting
let
roomRequired = 1 + newPeers.len()
roomCurrent = node.peerPool.lenSpace({PeerType.Outgoing})
roomDelta = roomRequired - roomCurrent
roomCurrent = node.hardMaxPeers - len(node.peerPool)
peersToKick = min(newPeers.len - roomCurrent, node.hardMaxPeers div 5)

maxPeersToKick = len(node.peerPool) div 5
peersToKick = min(roomDelta, maxPeersToKick)

if peersToKick > 0 and newPeers.len() > 0:
await node.trimConnections(peersToKick)
if peersToKick > 0 and newPeers.len > 0:
node.trimConnections(peersToKick)

for peerAddr in newPeers:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)

debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
debug "Discovery tick",
wanted_peers = node.wantedPeers,
current_peers = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
kicked_peers = max(0, peersToKick),
new_peers = len(newPeers)

if len(newPeers) == 0:
let currentPeers = node.peerPool.lenCurrent()
let currentPeers = len(node.peerPool)
if currentPeers <= node.wantedPeers shr 2: # 25%
warn "Peer count low, no new peers discovered",
discovered_nodes = len(discoveredNodes), new_peers = newPeers,
Expand Down Expand Up @@ -1369,8 +1408,9 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
switch: switch,
pubsub: pubsub,
wantedPeers: config.maxPeers,
hardMaxPeers: config.hardMaxPeers.get(config.maxPeers * 3 div 2), #*1.5
cfg: runtimeCfg,
peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers),
peerPool: newPeerPool[Peer, PeerID](),
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections),
Expand Down Expand Up @@ -1452,16 +1492,12 @@ proc startListening*(node: Eth2Node) {.async.} =
await node.pubsub.start()

proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}

proc start*(node: Eth2Node) {.async.} =

proc onPeerCountChanged() =
trace "Number of peers has been changed",
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool)
trace "Number of peers has been changed", length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))

node.peerPool.setPeerCounter(onPeerCountChanged)
Expand All @@ -1482,15 +1518,22 @@ proc start*(node: Eth2Node) {.async.} =
if pa.isOk():
await node.connQueue.addLast(pa.get())
node.peerPingerHeartbeatFut = node.peerPingerHeartbeat()
node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat()

proc stop*(node: Eth2Node) {.async.} =
# Ignore errors in futures, since we're shutting down (but log them on the
# TRACE level, if a timeout is reached).
var waitedFutures =
@[
node.switch.stop(),
node.peerPingerHeartbeat.cancelAndWait(),
node.peerTrimmerHeartbeatFut.cancelAndWait(),
]

if node.discoveryEnabled:
waitedFutures &= node.discovery.closeWait()

let
waitedFutures = if node.discoveryEnabled:
@[node.discovery.closeWait(), node.switch.stop()]
else:
@[node.switch.stop()]
timeout = 5.seconds
completed = await withTimeout(allFutures(waitedFutures), timeout)
if not completed:
Expand Down Expand Up @@ -1691,6 +1734,24 @@ proc peerPingerHeartbeat(node: Eth2Node) {.async.} =

await sleepAsync(5.seconds)

proc peerTrimmerHeartbeat(node: Eth2Node) {.async.} =
while true:
# Peer trimmer

# Only count Connected peers
# (to avoid counting Disconnecting ones)
var connectedPeers = 0
for peer in node.peers.values:
if peer.connectionState == Connected:
inc connectedPeers

let excessPeers = connectedPeers - node.wantedPeers
if excessPeers > 0:
# Let chronos take back control every kick
node.trimConnections(1)

await sleepAsync(1.seconds div max(1, excessPeers))

func asLibp2pKey*(key: keys.PublicKey): PublicKey =
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))

Expand Down
8 changes: 4 additions & 4 deletions tests/test_discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(34)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -96,7 +96,7 @@ procSuite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down Expand Up @@ -124,7 +124,7 @@ procSuite "Eth2 specific discovery tests":

block:
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 0

block:
Expand All @@ -139,7 +139,7 @@ procSuite "Eth2 specific discovery tests":
discard node1.addNode(nodes[][0])

let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
enrForkId, attnetsSelected, noSyncnetsPreference, 1)
check discovered.len == 1

await node1.closeWait()
Expand Down

0 comments on commit f589bf2

Please sign in to comment.