Skip to content

Commit

Permalink
Tighten libp2p request quotas
Browse files Browse the repository at this point in the history
To further tighten Nimbus against spam, this PR introduces a global
quota for block requests (shared between peers) as well as a general
per-peer request limit that applies to all libp2p requests.

* apply request quota before decoding message
* for high-bandwidth requests (blocks), apply a shared global quota
which helps manage bandwidth for high-peer setups
  • Loading branch information
arnetheduck committed Oct 24, 2022
1 parent 2536db1 commit f9d06f0
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 69 deletions.
63 changes: 45 additions & 18 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import
stew/shims/[macros],
faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams,
json_serialization, json_serialization/std/[net, sets, options],
chronos, chronicles, metrics,
chronos, chronos/ratelimit, chronicles, metrics,
libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto,
crypto/secp, builders],
libp2p/protocols/pubsub/[
Expand All @@ -35,9 +35,9 @@ import
"."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores]

export
tables, chronos, version, multiaddress, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery,
peer_pool, peer_scores
tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol,
connection, libp2p_json_serialization, eth2_ssz_serialization, results,
eth2_discovery, peer_pool, peer_scores

logScope:
topics = "networking"
Expand Down Expand Up @@ -86,6 +86,8 @@ type
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn

quota: TokenBucket ## Global quota mainly for high-bandwidth stuff

EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers

AverageThroughput* = object
Expand All @@ -100,7 +102,7 @@ type
protocolStates*: seq[RootRef]
netThroughput: AverageThroughput
score*: int
requestQuota*: float
quota*: TokenBucket
lastReqTime*: Moment
connections*: int
enr*: Option[enr.Record]
Expand Down Expand Up @@ -408,27 +410,37 @@ func `<`(a, b: Peer): bool =
false

const
maxRequestQuota = 1000000.0
maxRequestQuota = 1000000
maxGlobalQuota = 2 * maxRequestQuota
## Roughly, this means we allow 2 peers to sync from us at a time
fullReplenishTime = 5.seconds
replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float)

proc updateRequestQuota*(peer: Peer, reqCost: float) =
template awaitQuota*(peerParam: Peer, costParam: float) =
let
currentTime = now(chronos.Moment)
nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime)
replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate
peer = peerParam
cost = int(costParam)

if not peer.quota.tryConsume(cost.int):
debug "Awaiting peer quota", peer, cost
await peer.quota.consume(cost.int)

peer.lastReqTime = currentTime
peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost
template awaitQuota*(networkParam: Eth2Node, costParam: float) =
let
network = networkParam
cost = int(costParam)

template awaitNonNegativeRequestQuota*(peer: Peer) =
let quota = peer.requestQuota
if quota < 0:
await sleepAsync(nanoseconds(int((-quota) / replenishRate)))
if not network.quota.tryConsume(cost.int):
debug "Awaiting network quota", peer, cost
await network.quota.consume(cost.int)

func allowedOpsPerSecondCost*(n: int): float =
(replenishRate * 1000000000'f / n.float)

const
libp2pRequestCost = allowedOpsPerSecondCost(8)
## Maximum number of libp2p requests per peer per second

proc isSeen(network: Eth2Node, peerId: PeerId): bool =
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
## yet expired.
Expand Down Expand Up @@ -1016,6 +1028,19 @@ proc handleIncomingStream(network: Eth2Node,
template returnResourceUnavailable(msg: string) =
returnResourceUnavailable(ErrorMsg msg.toBytes)

# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
# regardless if the request succeeds or fails - we don't count waiting
# for this quota against timeouts so as not to prematurely disconnect
# clients that are on the edge - nonetheless, the client will count it.
#
# When a client exceeds their quota, they will be slowed down without
# notification - as long as they don't make parallel requests (which is
# limited by libp2p), this will naturally adapt them to the available
# quota.

awaitQuota(peer, libp2pRequestCost)

# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
let deadline = sleepAsync RESP_TIMEOUT

Expand Down Expand Up @@ -1701,7 +1726,8 @@ proc new(T: type Eth2Node,
discoveryEnabled: discovery,
rng: rng,
connectTimeout: connectTimeout,
seenThreshold: seenThreshold
seenThreshold: seenThreshold,
quota: TokenBucket.new(maxGlobalQuota, int(replenishRate / 1000))
)

newSeq node.protocolStates, allProtocols.len
Expand Down Expand Up @@ -1820,7 +1846,8 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer =
connectionState: ConnectionState.None,
lastReqTime: now(chronos.Moment),
lastMetadataTime: now(chronos.Moment),
protocolStates: newSeq[RootRef](len(allProtocols))
protocolStates: newSeq[RootRef](len(allProtocols)),
quota: TokenBucket.new(maxRequestQuota.int, int(replenishRate / 1000))
)
for i in 0 ..< len(allProtocols):
let proto = allProtocols[i]
Expand Down
71 changes: 21 additions & 50 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ logScope:

const
MAX_REQUEST_BLOCKS = 1024
blockByRootLookupCost = allowedOpsPerSecondCost(50)
blockResponseCost = allowedOpsPerSecondCost(100)
blockByRangeLookupCost = allowedOpsPerSecondCost(20)

blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs)

# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#configuration
MAX_REQUEST_LIGHT_CLIENT_UPDATES* = 128
lightClientEmptyResponseCost = allowedOpsPerSecondCost(50)
lightClientBootstrapLookupCost = allowedOpsPerSecondCost(5)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1)
## Only one bootstrap per peer should ever be needed - no need to allow more
lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000)
## Updates are tiny - we can allow lots of them
lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100)

Expand Down Expand Up @@ -314,9 +312,6 @@ p2pProtocol BeaconSync(version = 1,
startIndex =
dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex))

peer.updateRequestQuota(blockByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()

var
found = 0
bytes: seq[byte]
Expand All @@ -336,8 +331,8 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)

await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes

Expand Down Expand Up @@ -377,9 +372,6 @@ p2pProtocol BeaconSync(version = 1,
found = 0
bytes: seq[byte]

peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota()

for i in 0..<count:
let
blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
Expand All @@ -402,8 +394,8 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)

await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0
inc found
Expand Down Expand Up @@ -448,9 +440,6 @@ p2pProtocol BeaconSync(version = 1,
dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex))

peer.updateRequestQuota(blockByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()

var
found = 0
bytes: seq[byte]
Expand All @@ -469,8 +458,8 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)

await response.writeBytesSZ(
uncompressedLen, bytes,
Expand Down Expand Up @@ -507,9 +496,6 @@ p2pProtocol BeaconSync(version = 1,
dag = peer.networkState.dag
count = blockRoots.len

peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota()

var
found = 0
bytes: seq[byte]
Expand All @@ -532,8 +518,8 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()
peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)

await response.writeBytesSZ(
uncompressedLen, bytes,
Expand All @@ -555,21 +541,16 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve

peer.updateRequestQuota(lightClientBootstrapLookupCost)
peer.awaitNonNegativeRequestQuota()

let bootstrap = dag.getLightClientBootstrap(blockRoot)
if bootstrap.isOk:
let
contextEpoch = bootstrap.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
peer.awaitQuota(lightClientBootstrapResponseCost)
await response.send(bootstrap.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCBootstrapUnavailable)

peer.updateRequestQuota(lightClientBootstrapResponseCost)

debug "LC bootstrap request done", peer, blockRoot

# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
Expand All @@ -594,11 +575,6 @@ p2pProtocol BeaconSync(version = 1,
min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
count = min(reqCount, maxSupportedCount)
onePastPeriod = startPeriod + count
if count == 0:
peer.updateRequestQuota(lightClientEmptyResponseCost)

peer.updateRequestQuota(count.float * lightClientUpdateByRangeLookupCost)
peer.awaitNonNegativeRequestQuota()

var found = 0
for period in startPeriod..<onePastPeriod:
Expand All @@ -607,11 +583,11 @@ p2pProtocol BeaconSync(version = 1,
let
contextEpoch = update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientUpdateResponseCost)
await response.write(update.get, contextBytes)
inc found

peer.updateRequestQuota(found.float * lightClientUpdateResponseCost)

debug "LC updates by range request done", peer, startPeriod, count, found

# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
Expand All @@ -624,19 +600,17 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve

peer.awaitNonNegativeRequestQuota()

let finality_update = dag.getLightClientFinalityUpdate()
if finality_update.isSome:
let
contextEpoch = finality_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientFinalityUpdateResponseCost)
await response.send(finality_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCFinUpdateUnavailable)

peer.updateRequestQuota(lightClientFinalityUpdateResponseCost)

debug "LC finality update request done", peer

Expand All @@ -650,20 +624,17 @@ p2pProtocol BeaconSync(version = 1,
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve

peer.awaitNonNegativeRequestQuota()

let optimistic_update = dag.getLightClientOptimisticUpdate()
if optimistic_update.isSome:
let
contextEpoch = optimistic_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientOptimisticUpdateResponseCost)
await response.send(optimistic_update.get, contextBytes)
else:
peer.updateRequestQuota(lightClientEmptyResponseCost)
raise newException(ResourceUnavailableError, LCOptUpdateUnavailable)

peer.updateRequestQuota(lightClientOptimisticUpdateResponseCost)

debug "LC optimistic update request done", peer

proc goodbye(peer: Peer,
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-chronos

0 comments on commit f9d06f0

Please sign in to comment.