diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 0284ec28a7..c076b3a43c 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -20,7 +20,7 @@ import stew/shims/[macros], faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, json_serialization, json_serialization/std/[net, sets, options], - chronos, chronicles, metrics, + chronos, chronos/ratelimit, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, crypto/secp, builders], libp2p/protocols/pubsub/[ @@ -35,9 +35,9 @@ import "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] export - tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection, - libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery, - peer_pool, peer_scores + tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, + connection, libp2p_json_serialization, eth2_ssz_serialization, results, + eth2_discovery, peer_pool, peer_scores logScope: topics = "networking" @@ -86,6 +86,8 @@ type cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn + quota: TokenBucket ## Global quota mainly for high-bandwidth stuff + EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers AverageThroughput* = object @@ -100,7 +102,7 @@ type protocolStates*: seq[RootRef] netThroughput: AverageThroughput score*: int - requestQuota*: float + quota*: TokenBucket lastReqTime*: Moment connections*: int enr*: Option[enr.Record] @@ -408,27 +410,37 @@ func `<`(a, b: Peer): bool = false const - maxRequestQuota = 1000000.0 + maxRequestQuota = 1000000 + maxGlobalQuota = 2 * maxRequestQuota + ## Roughly, this means we allow 2 peers to sync from us at a time fullReplenishTime = 5.seconds replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float) -proc updateRequestQuota*(peer: Peer, reqCost: float) = +template awaitQuota*(peerParam: Peer, costParam: float) = let - currentTime = now(chronos.Moment) - nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime) - replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate + peer = peerParam + cost = int(costParam) + + if not peer.quota.tryConsume(cost.int): + debug "Awaiting peer quota", peer, cost + await peer.quota.consume(cost.int) - peer.lastReqTime = currentTime - peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost +template awaitQuota*(networkParam: Eth2Node, costParam: float) = + let + network = networkParam + cost = int(costParam) -template awaitNonNegativeRequestQuota*(peer: Peer) = - let quota = peer.requestQuota - if quota < 0: - await sleepAsync(nanoseconds(int((-quota) / replenishRate))) + if not network.quota.tryConsume(cost.int): + debug "Awaiting network quota", peer, cost + await network.quota.consume(cost.int) func allowedOpsPerSecondCost*(n: int): float = (replenishRate * 1000000000'f / n.float) +const + libp2pRequestCost = allowedOpsPerSecondCost(8) + ## Maximum number of libp2p requests per peer per second + proc isSeen(network: Eth2Node, peerId: PeerId): bool = ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## yet expired. @@ -1016,6 +1028,19 @@ proc handleIncomingStream(network: Eth2Node, template returnResourceUnavailable(msg: string) = returnResourceUnavailable(ErrorMsg msg.toBytes) + # The request quota is shared between all requests - it represents the + # cost to perform a service on behalf of a client and is incurred + # regardless if the request succeeds or fails - we don't count waiting + # for this quota against timeouts so as not to prematurely disconnect + # clients that are on the edge - nonetheless, the client will count it. + # + # When a client exceeds their quota, they will be slowed down without + # notification - as long as they don't make parallel requests (which is + # limited by libp2p), this will naturally adapt them to the available + # quota. + + awaitQuota(peer, libp2pRequestCost) + # TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end let deadline = sleepAsync RESP_TIMEOUT @@ -1701,7 +1726,8 @@ proc new(T: type Eth2Node, discoveryEnabled: discovery, rng: rng, connectTimeout: connectTimeout, - seenThreshold: seenThreshold + seenThreshold: seenThreshold, + quota: TokenBucket.new(maxGlobalQuota, int(replenishRate / 1000)) ) newSeq node.protocolStates, allProtocols.len @@ -1820,7 +1846,8 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = connectionState: ConnectionState.None, lastReqTime: now(chronos.Moment), lastMetadataTime: now(chronos.Moment), - protocolStates: newSeq[RootRef](len(allProtocols)) + protocolStates: newSeq[RootRef](len(allProtocols)), + quota: TokenBucket.new(maxRequestQuota.int, int(replenishRate / 1000)) ) for i in 0 ..< len(allProtocols): let proto = allProtocols[i] diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index e215199fa4..24056f0796 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -26,17 +26,15 @@ logScope: const MAX_REQUEST_BLOCKS = 1024 - blockByRootLookupCost = allowedOpsPerSecondCost(50) - blockResponseCost = allowedOpsPerSecondCost(100) - blockByRangeLookupCost = allowedOpsPerSecondCost(20) + + blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs) # https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#configuration MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128 - lightClientEmptyResponseCost = allowedOpsPerSecondCost(50) - lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5) - lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100) - lightClientUpdateResponseCost = allowedOpsPerSecondCost(100) - lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20) + lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1) + ## Only one bootstrap per peer should ever be needed - no need to allow more + lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000) + ## Updates are tiny - we can allow lots of them lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100) lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100) @@ -314,9 +312,6 @@ p2pProtocol BeaconSync(version = 1, startIndex = dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) - peer.updateRequestQuota(blockByRangeLookupCost) - peer.awaitNonNegativeRequestQuota() - var found = 0 bytes: seq[byte] @@ -336,8 +331,8 @@ p2pProtocol BeaconSync(version = 1, bytes = bytes.len(), blck = shortLog(blocks[i]) continue - peer.updateRequestQuota(blockResponseCost) - peer.awaitNonNegativeRequestQuota() + peer.awaitQuota(blockResponseCost) + peer.network.awaitQuota(blockResponseCost) await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes @@ -377,9 +372,6 @@ p2pProtocol BeaconSync(version = 1, found = 0 bytes: seq[byte] - peer.updateRequestQuota(count.float * blockByRootLookupCost) - peer.awaitNonNegativeRequestQuota() - for i in 0..