Skip to content

Commit

Permalink
moved pubsub monitor to libp2p
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov committed Jul 28, 2020
1 parent 1199e54 commit 35a2113
Showing 1 changed file with 7 additions and 30 deletions.
37 changes: 7 additions & 30 deletions beacon_chain/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
# TODO: this fails with nil pointer, why?
if not(isNil(peer.pubsubFut)) and
not(peer.pubsubFut.finished):
peer.pubsubFut.cancel()
await peer.network.switch.disconnect(peer.info)
peer.connectionState = Disconnected
peer.network.peerPool.release(peer)
Expand Down Expand Up @@ -1150,31 +1146,6 @@ func gossipId(data: openArray[byte]): string =
func msgIdProvider(m: messages.Message): string =
gossipId(m.data)

proc pubsubMonitor(switch: Switch, peer: PeerInfo) {.async.} =
## while peer connected maintain a
## pubsub connection as well
##

var tries = 0
var backoffFactor = 5 # up to ~10 mins
var backoff = 1.seconds
while switch.isConnected(peer) and
tries < MaxPubsubReconnectAttempts:
try:
debug "subscribing to pubsub peer", peer = $peer
await switch.subscribePeer(peer)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in pubsub monitor", peer = $peer, exc = exc.msg
finally:
debug "awaiting backoff period before reconnecting", peer = $peer, backoff, tries
await sleepAsync(backoff) # allow the peer to cooldown
backoff = backoff * backoffFactor
tries.inc()

trace "exiting pubsub monitor", peer = $peer

proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Node] {.async, gcsafe.} =
var
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
Expand Down Expand Up @@ -1203,6 +1174,12 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId
extIp, extTcpPort, extUdpPort,
keys.seckey.asEthKey, rng = rng)

# TODO: this is to show how a hook can be used
# the actual subscribePeer call can be moved
# elsewhere, for example a good place to do this
# would be right after the node has synced or is
# about to, to prevent useless gossip traffic since
# most attestation are going to be rejected
switch.addHook(
proc(peer: PeerInfo, cycle: Lifecycle) {.async.} =
## trigger every time a new connection
Expand All @@ -1214,7 +1191,7 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, conf: BeaconNodeConf, enrForkId

let ethPeer = node.getPeer(peer)
if not(isNil(ethPeer)) and isNil(ethPeer.pubsubFut):
ethPeer.pubsubFut = switch.pubsubMonitor(peer)
ethPeer.pubsubFut = switch.subscribePeer(peer)

, Lifecycle.Upgraded
)
Expand Down

0 comments on commit 35a2113

Please sign in to comment.