Skip to content

Commit

Permalink
add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
arnetheduck committed Oct 28, 2022
1 parent 81a6d7a commit b585977
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
68 changes: 52 additions & 16 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func toAltairMetadata(phase0: phase0.MetaData): altair.MetaData =
const
clientId* = "Nimbus beacon node " & fullVersionStr

requestPrefix = "/eth2/beacon_chain/req/"
requestSuffix = "/ssz_snappy"

ConcurrentConnections = 20
## Maximum number of active concurrent connection requests.

Expand Down Expand Up @@ -304,6 +307,18 @@ declareHistogram nbc_resolve_time,
"Time(s) used while resolving peer information",
buckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]

declareCounter nbc_reqresp_messages_sent,
"Number of Req/Resp messages sent", labels = ["protocol"]

declareCounter nbc_reqresp_messages_received,
"Number of Req/Resp messages received", labels = ["protocol"]

declareCounter nbc_reqresp_messages_failed,
"Number of Req/Resp messages that failed decoding", labels = ["protocol"]

declareCounter nbc_reqresp_messages_throttled,
"Number of Req/Resp messages that were throttled", labels = ["protocol"]

const
libp2p_pki_schemes {.strdefine.} = ""

Expand All @@ -321,16 +336,23 @@ func shortLog*(peer: Peer): string = shortLog(peer.peerId)
chronicles.formatIt(Peer): shortLog(it)
chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet())

func shortProtocolId(protocolId: string): string =
let
start = if protocolId.startsWith(requestPrefix): requestPrefix.len else: 0
ends = if protocolId.endsWith(requestSuffix):
protocolId.high - requestSuffix.len
else:
protocolId.high
protocolId[start..ends]

proc openStream(node: Eth2Node,
peer: Peer,
protocolId: string): Future[Connection] {.async.} =
# When dialling here, we do not provide addresses - all new connection
# attempts are handled via `connect` which also takes into account
# reconnection timeouts
let
protocolId = protocolId & "ssz_snappy"
conn = await dial(
node.switch, peer.peerId, protocolId)
conn = await dial(node.switch, peer.peerId, protocolId)

return conn

Expand Down Expand Up @@ -416,22 +438,26 @@ const
## Roughly, this means we allow 2 peers to sync from us at a time
fullReplenishTime = 5.seconds

template awaitQuota*(peerParam: Peer, costParam: float) =
template awaitQuota*(peerParam: Peer, costParam: float, protocolIdParam: string) =
let
peer = peerParam
cost = int(costParam)

if not peer.quota.tryConsume(cost.int):
debug "Awaiting peer quota", peer, cost
let protocolId = protocolIdParam
debug "Awaiting peer quota", peer, cost, protocolId
nbc_reqresp_messages_throttled.inc(1, [protocolId])
await peer.quota.consume(cost.int)

template awaitQuota*(networkParam: Eth2Node, costParam: float) =
template awaitQuota*(networkParam: Eth2Node, costParam: float, protocolIdParam: string) =
let
network = networkParam
cost = int(costParam)

if not network.quota.tryConsume(cost.int):
debug "Awaiting network quota", peer, cost
let protocolId = protocolIdParam
debug "Awaiting network quota", peer, cost, protocolId
nbc_reqresp_messages_throttled.inc(1, [protocolId])
await network.quota.consume(cost.int)

func allowedOpsPerSecondCost*(n: int): float =
Expand Down Expand Up @@ -505,7 +531,7 @@ proc getRequestProtoName(fn: NimNode): NimNode =
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/")
return newLit(requestPrefix & protoName & "/" & protoVer & requestSuffix)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454

return newLit("")
Expand Down Expand Up @@ -915,8 +941,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]]
{.async.} =
var deadline = sleepAsync timeout

let deadline = sleepAsync timeout
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
deadline): return neterr StreamOpenTimeout
try:
Expand All @@ -929,6 +954,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# done, the other peer might never send us the response.
await stream.close()

nbc_reqresp_messages_sent.inc(1, [shortProtocolId(protocolId)])

# Read the response
return await readResponse(stream, peer, ResponseMsg, timeout)
finally:
Expand Down Expand Up @@ -1011,6 +1038,7 @@ proc implementSendProcBody(sendProc: SendProc) =

proc handleIncomingStream(network: Eth2Node,
conn: Connection,
protocolId: string,
MsgType: type) {.async.} =
mixin callUserHandler, RecType

Expand Down Expand Up @@ -1058,6 +1086,8 @@ proc handleIncomingStream(network: Eth2Node,
template returnResourceUnavailable(msg: string) =
returnResourceUnavailable(ErrorMsg msg.toBytes)

nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)])

# The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred
# regardless if the request succeeds or fails - we don't count waiting
Expand All @@ -1069,7 +1099,7 @@ proc handleIncomingStream(network: Eth2Node,
# limited by libp2p), this will naturally adapt them to the available
# quota.

awaitQuota(peer, libp2pRequestCost)
awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId))

# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
let deadline = sleepAsync RESP_TIMEOUT
Expand All @@ -1092,20 +1122,25 @@ proc handleIncomingStream(network: Eth2Node,
readChunkPayload(conn, peer, MsgRec), deadline):
# Timeout, e.g., cancellation due to fulfillment by different peer.
# Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`.
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(
peer, conn, InvalidRequest,
errorMsgLit "Request full data not sent in time")
return

except SerializationError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.formatMsg("msg")

except SnappyError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg

if msg.isErr:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
let (responseCode, errMsg) = case msg.error.kind
of UnexpectedEOF, PotentiallyExpectedEOF:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
(InvalidRequest, errorMsgLit "Incomplete request")

of InvalidContextBytes:
Expand Down Expand Up @@ -1143,14 +1178,16 @@ proc handleIncomingStream(network: Eth2Node,
logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
except ResourceUnavailableError as err:
returnResourceUnavailable err.msg
except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
await sendErrorResponse(peer, conn, ServerError, ErrorMsg err.msg.toBytes)

except CatchableError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
debug "Error processing an incoming request", err = err.msg, msgName

finally:
Expand Down Expand Up @@ -1992,12 +2029,11 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc snappyThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`,
`MsgStrongRecName`)

mount `networkVar`.switch,
LPProtocol(codecs: @[`codecNameLit` & "ssz_snappy"],
handler: snappyThunk)
LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk)

##
## Implement Senders and Handshake
Expand Down
37 changes: 25 additions & 12 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/1")

await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes

Expand Down Expand Up @@ -392,8 +393,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_root/1")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_root/1")

await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0
inc found
Expand Down Expand Up @@ -457,8 +459,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")

await response.writeBytesSZ(
uncompressedLen, bytes,
Expand Down Expand Up @@ -518,8 +521,9 @@ p2pProtocol BeaconSync(version = 1,
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.awaitQuota(blockResponseCost)
peer.network.awaitQuota(blockResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")

await response.writeBytesSZ(
uncompressedLen, bytes,
Expand All @@ -546,7 +550,10 @@ p2pProtocol BeaconSync(version = 1,
let
contextEpoch = bootstrap.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
peer.awaitQuota(lightClientBootstrapResponseCost)

# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientBootstrapResponseCost, "light_client_bootstrap/1")
await response.send(bootstrap.get, contextBytes)
else:
raise newException(ResourceUnavailableError, LCBootstrapUnavailable)
Expand Down Expand Up @@ -585,7 +592,9 @@ p2pProtocol BeaconSync(version = 1,
contextEpoch = update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientUpdateResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientUpdateResponseCost, "light_client_updates_by_range/1")
await response.write(update.get, contextBytes)
inc found

Expand All @@ -607,7 +616,9 @@ p2pProtocol BeaconSync(version = 1,
contextEpoch = finality_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientFinalityUpdateResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientFinalityUpdateResponseCost, "light_client_finality_update/1")
await response.send(finality_update.get, contextBytes)
else:
raise newException(ResourceUnavailableError, LCFinUpdateUnavailable)
Expand All @@ -631,7 +642,9 @@ p2pProtocol BeaconSync(version = 1,
contextEpoch = optimistic_update.get.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data

peer.awaitQuota(lightClientOptimisticUpdateResponseCost)
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientOptimisticUpdateResponseCost, "light_client_optimistic_update/1")
await response.send(optimistic_update.get, contextBytes)
else:
raise newException(ResourceUnavailableError, LCOptUpdateUnavailable)
Expand Down

0 comments on commit b585977

Please sign in to comment.