diff --git a/packages/beacon-node/src/network/reqresp/BeaconNodeReqResp.ts b/packages/beacon-node/src/network/reqresp/BeaconNodeReqResp.ts deleted file mode 100644 index 11d457f75563..000000000000 --- a/packages/beacon-node/src/network/reqresp/BeaconNodeReqResp.ts +++ /dev/null @@ -1,24 +0,0 @@ -import {PeerId} from "@libp2p/interface-peer-id"; -import {ReqResp, ReqRespModules, ReqRespOptions, RequestTypedContainer} from "@lodestar/reqresp"; -import {RateLimiterOptions} from "@lodestar/reqresp/rate_limiter"; -import {INetworkEventBus, NetworkEvent} from "../events.js"; - -export interface BeaconNodeReqRespModules extends ReqRespModules { - networkEventBus: INetworkEventBus; -} - -export class BeaconNodeReqResp extends ReqResp { - private networkEventBus: INetworkEventBus; - - constructor(modules: BeaconNodeReqRespModules, options?: Partial & Partial) { - super(modules, options); - this.networkEventBus = modules.networkEventBus; - } - - protected onIncomingRequestBody(req: RequestTypedContainer, peerId: PeerId): void { - // Allow onRequest to return and close the stream - // For Goodbye there may be a race condition where the listener of `receivedGoodbye` - // disconnects in the same syncronous call, preventing the stream from ending cleanly - setTimeout(() => this.networkEventBus.emit(NetworkEvent.reqRespRequest, req, peerId), 0); - } -} diff --git a/packages/beacon-node/src/network/reqresp/handlers/beaconBlocksByRoot.ts b/packages/beacon-node/src/network/reqresp/handlers/beaconBlocksByRoot.ts index 53cd6fbbf5e6..788130bc07e3 100644 --- a/packages/beacon-node/src/network/reqresp/handlers/beaconBlocksByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/handlers/beaconBlocksByRoot.ts @@ -1,8 +1,8 @@ import {EncodedPayload, EncodedPayloadType, ContextBytesType} from "@lodestar/reqresp"; -import {getSlotFromBytes} from "@lodestar/reqresp/utils"; import {allForks, phase0, Slot} from "@lodestar/types"; import {IBeaconChain} from "../../../chain/index.js"; import {IBeaconDb} from "../../../db/index.js"; +import {getSlotFromBytes} from "../../../util/multifork.js"; export async function* onBeaconBlocksByRoot( requestBody: phase0.BeaconBlocksByRootRequest, diff --git a/packages/beacon-node/src/network/reqresp/handlers/index.ts b/packages/beacon-node/src/network/reqresp/handlers/index.ts index 33f9d99a8998..dea2c3046495 100644 --- a/packages/beacon-node/src/network/reqresp/handlers/index.ts +++ b/packages/beacon-node/src/network/reqresp/handlers/index.ts @@ -22,13 +22,13 @@ export function getReqRespHandlers({ db: IBeaconDb; chain: IBeaconChain; }): { - onStatus: HandlerTypeFromMessage; - onBeaconBlocksByRange: HandlerTypeFromMessage; - onBeaconBlocksByRoot: HandlerTypeFromMessage; - onLightClientBootstrap: HandlerTypeFromMessage; - onLightClientUpdatesByRange: HandlerTypeFromMessage; - onLightClientFinalityUpdate: HandlerTypeFromMessage; - onLightClientOptimisticUpdate: HandlerTypeFromMessage; + onStatus: HandlerTypeFromMessage; + onBeaconBlocksByRange: HandlerTypeFromMessage; + onBeaconBlocksByRoot: HandlerTypeFromMessage; + onLightClientBootstrap: HandlerTypeFromMessage; + onLightClientUpdatesByRange: HandlerTypeFromMessage; + onLightClientFinalityUpdate: HandlerTypeFromMessage; + onLightClientOptimisticUpdate: HandlerTypeFromMessage; } { return { async *onStatus() { diff --git a/packages/beacon-node/src/network/reqresp/index.ts b/packages/beacon-node/src/network/reqresp/index.ts index ceb9bb33d92d..ef1b8ff57f17 100644 --- a/packages/beacon-node/src/network/reqresp/index.ts +++ b/packages/beacon-node/src/network/reqresp/index.ts @@ -1,50 +1,269 @@ -import {getMetrics as getReqResMetrics, IReqResp, MetricsRegister} from "@lodestar/reqresp"; +import {Libp2p} from "libp2p"; +import {PeerId} from "@libp2p/interface-peer-id"; +import {ForkName} from "@lodestar/params"; +import {ILogger} from "@lodestar/utils"; +import {IBeaconConfig} from "@lodestar/config"; +import {ReqRespOpts} from "@lodestar/reqresp/lib/ReqResp.js"; + +import {allForks, altair, phase0, Root, Slot} from "@lodestar/types"; +import { + collectExactOne, + collectMaxResponse, + EncodedPayload, + EncodedPayloadType, + ReqResp, + RequestError, + ResponseError, +} from "@lodestar/reqresp"; import messages from "@lodestar/reqresp/messages"; -import {IMetrics} from "../../metrics/index.js"; -import {BeaconNodeReqResp, BeaconNodeReqRespModules} from "./BeaconNodeReqResp.js"; +import {IMetrics} from "../../metrics/metrics.js"; +import {INetworkEventBus, NetworkEvent} from "../events.js"; +import {IPeerRpcScoreStore} from "../peers/score.js"; +import {MetadataController} from "../metadata.js"; +import {PeerData, PeersData} from "../peers/peersData.js"; import {ReqRespHandlers} from "./handlers/index.js"; +import {collectSequentialBlocksInRange} from "./utils/collectSequentialBlocksInRange.js"; +import {IReqResp, RateLimiter, RespStatus} from "./interface.js"; +import {Method, RequestTypedContainer, Version} from "./types.js"; +import {onOutgoingReqRespError} from "./score.js"; -export const getBeaconNodeReqResp = ( - modules: Omit & { - metrics: IMetrics | null; - }, - reqRespHandlers: ReqRespHandlers -): IReqResp => { - const metrics = modules.metrics - ? getReqResMetrics((modules.metrics as unknown) as MetricsRegister, { - version: "", - commit: "", - network: "", - }) - : null; - - const reqRespModules = { - ...modules, - metrics, - } as BeaconNodeReqRespModules; - - const reqresp = new BeaconNodeReqResp(reqRespModules); - - reqresp.registerProtocol(messages.v1.Ping(reqRespModules)); - reqresp.registerProtocol(messages.v1.Status(reqRespModules, reqRespHandlers.onStatus)); - reqresp.registerProtocol(messages.v1.Metadata(reqRespModules)); - reqresp.registerProtocol(messages.v1.Goodbye(reqRespModules)); - reqresp.registerProtocol(messages.v1.BeaconBlocksByRange(reqRespModules, reqRespHandlers.onBeaconBlocksByRange)); - reqresp.registerProtocol(messages.v1.BeaconBlocksByRoot(reqRespModules, reqRespHandlers.onBeaconBlocksByRoot)); - reqresp.registerProtocol(messages.v1.LightClientBootstrap(reqRespModules, reqRespHandlers.onLightClientBootstrap)); - reqresp.registerProtocol( - messages.v1.LightClientFinalityUpdate(reqRespModules, reqRespHandlers.onLightClientFinalityUpdate) - ); - reqresp.registerProtocol( - messages.v1.LightClientOptimisticUpdate(reqRespModules, reqRespHandlers.onLightClientOptimisticUpdate) - ); - reqresp.registerProtocol( - messages.v1.LightClientUpdatesByRange(reqRespModules, reqRespHandlers.onLightClientUpdatesByRange) - ); - - reqresp.registerProtocol(messages.v2.Metadata(reqRespModules)); - reqresp.registerProtocol(messages.v2.BeaconBlocksByRange(reqRespModules, reqRespHandlers.onBeaconBlocksByRange)); - reqresp.registerProtocol(messages.v2.BeaconBlocksByRoot(reqRespModules, reqRespHandlers.onBeaconBlocksByRoot)); - - return reqresp; +/** This type helps response to beacon_block_by_range and beacon_block_by_root more efficiently */ +export type ReqRespBlockResponse = { + /** Deserialized data of allForks.SignedBeaconBlock */ + bytes: Uint8Array; + slot: Slot; }; + +export interface ReqRespBeaconNodeModules { + libp2p: Libp2p; + peersData: PeersData; + logger: ILogger; + config: IBeaconConfig; + metrics: IMetrics | null; + reqRespHandlers: ReqRespHandlers; + metadataController: MetadataController; + peerRpcScores: IPeerRpcScoreStore; + networkEventBus: INetworkEventBus; +} + +/** + * Implementation of Ethereum Consensus p2p Req/Resp domain. + * For the spec that this code is based on, see: + * https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-reqresp-domain + * https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#the-reqresp-domain + */ +export class ReqRespBeaconNode extends ReqResp implements IReqResp { + private readonly reqRespHandlers: ReqRespHandlers; + private readonly metadataController: MetadataController; + private readonly peerRpcScores: IPeerRpcScoreStore; + private readonly inboundRateLimiter: RateLimiter; + private readonly networkEventBus: INetworkEventBus; + private readonly peerData: PeerData; + + constructor(modules: ReqRespBeaconNodeModules, options?: Partial & Partial) { + super(modules, {...defaultReqRespOptions, ...options}); + const reqRespHandlers: ReqRespHandlers = 1; + this.peerRpcScores = modules.peerRpcScores; + this.metadataController = modules.metadataController; + this.inboundRateLimiter = new InboundRateLimiter({...InboundRateLimiter.defaults, ...options}, modules); + + // TODO: Do not register everything! Some protocols are fork dependant + this.registerProtocol(messages.Ping(modules, this.onPing.bind(this))); + this.registerProtocol(messages.Status(modules, this.onStatus.bind(this))); + this.registerProtocol(messages.Metadata(modules, this.onMetadata.bind(this))); + this.registerProtocol(messages.MetadataV2(modules, this.onMetadata.bind(this))); + this.registerProtocol(messages.Goodbye(modules, this.onGoodbye.bind(this))); + this.registerProtocol(messages.BeaconBlocksByRange(modules, this.onBeaconBlocksByRange.bind(this))); + this.registerProtocol(messages.BeaconBlocksByRangeV2(modules, this.onBeaconBlocksByRange.bind(this))); + this.registerProtocol(messages.BeaconBlocksByRoot(modules, this.onBeaconBlocksByRoot.bind(this))); + this.registerProtocol(messages.BeaconBlocksByRootV2(modules, this.onBeaconBlocksByRoot.bind(this))); + this.registerProtocol(messages.LightClientBootstrap(modules, reqRespHandlers.onLightClientBootstrap)); + this.registerProtocol(messages.LightClientFinalityUpdate(modules, reqRespHandlers.onLightClientFinalityUpdate)); + this.registerProtocol(messages.LightClientOptimisticUpdate(modules, reqRespHandlers.onLightClientOptimisticUpdate)); + this.registerProtocol(messages.LightClientUpdatesByRange(modules, reqRespHandlers.onLightClientUpdatesByRange)); + } + + async start(): Promise { + await super.start(); + this.inboundRateLimiter.start(); + } + + async stop(): Promise { + await super.stop(); + this.inboundRateLimiter.stop(); + } + + pruneOnPeerDisconnect(peerId: PeerId): void { + this.inboundRateLimiter.prune(peerId); + } + + async status(peerId: PeerId, request: phase0.Status): Promise { + return collectExactOne( + this.sendRequest(peerId, Method.Status, [Version.V1], request) + ); + } + + async goodbye(peerId: PeerId, request: phase0.Goodbye): Promise { + // TODO: Replace with "ignore response after request" + await collectExactOne( + this.sendRequest(peerId, Method.Goodbye, [Version.V1], request) + ); + } + + async ping(peerId: PeerId): Promise { + return collectExactOne( + this.sendRequest(peerId, Method.Ping, [Version.V1], this.metadataController.seqNumber) + ); + } + + async metadata(peerId: PeerId, fork?: ForkName): Promise { + // Only request V1 if forcing phase0 fork. It's safe to not specify `fork` and let stream negotiation pick the version + const versions = fork === ForkName.phase0 ? [Version.V1] : [Version.V2, Version.V1]; + return collectExactOne(this.sendRequest(peerId, Method.Metadata, versions, null)); + } + + async beaconBlocksByRange( + peerId: PeerId, + request: phase0.BeaconBlocksByRangeRequest + ): Promise { + return collectSequentialBlocksInRange( + this.sendRequest( + peerId, + Method.BeaconBlocksByRange, + [Version.V2, Version.V1], // Prioritize V2 + request + ), + request + ); + } + + async beaconBlocksByRoot( + peerId: PeerId, + request: phase0.BeaconBlocksByRootRequest + ): Promise { + return collectMaxResponse( + this.sendRequest( + peerId, + Method.BeaconBlocksByRoot, + [Version.V2, Version.V1], // Prioritize V2 + request + ), + request.length + ); + } + + async lightClientBootstrap(peerId: PeerId, request: Root): Promise { + return collectExactOne( + this.sendRequest(peerId, Method.LightClientBootstrap, [Version.V1], request) + ); + } + + async lightClientOptimisticUpdate(peerId: PeerId): Promise { + return collectExactOne( + this.sendRequest( + peerId, + Method.LightClientOptimisticUpdate, + [Version.V1], + null + ) + ); + } + + async lightClientFinalityUpdate(peerId: PeerId): Promise { + return collectExactOne( + this.sendRequest( + peerId, + Method.LightClientFinalityUpdate, + [Version.V1], + null + ) + ); + } + + async lightClientUpdatesByRange( + peerId: PeerId, + request: altair.LightClientUpdatesByRange + ): Promise { + return collectMaxResponse( + this.sendRequest( + peerId, + Method.LightClientUpdatesByRange, + [Version.V1], + request + ), + request.count + ); + } + + protected sendRequest(peerId: PeerId, method: string, versions: number[], body: Req): AsyncIterable { + // Remember prefered encoding + const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY; + + return super.sendRequest(peerId, method, versions, encoding, body); + } + + protected onIncomingRequestBody(req: RequestTypedContainer, peerId: PeerId): void { + // Allow onRequest to return and close the stream + // For Goodbye there may be a race condition where the listener of `receivedGoodbye` + // disconnects in the same syncronous call, preventing the stream from ending cleanly + setTimeout(() => this.networkEventBus.emit(NetworkEvent.reqRespRequest, req, peerId), 0); + } + + protected onIncomingRequest(peerId: PeerId, method: Method): void { + if (method !== Method.Goodbye && !this.inboundRateLimiter.allowRequest(peerId)) { + throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); + } + } + + protected onOutgoingRequestError(peerId: PeerId, method: Method, error: RequestError): void { + const peerAction = onOutgoingReqRespError(error, method); + if (peerAction !== null) { + this.peerRpcScores.applyAction(peerId, peerAction, error.type.code); + } + } + + private async *onStatus(req: phase0.Status, peerId: PeerId): AsyncIterable> { + this.onIncomingRequestBody({method: Method.Status, body: req}, peerId); + // Remember prefered encoding + const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY; + yield* this.reqRespHandlers.onStatus(req, peerId); + } + + private async *onGoodbye(req: phase0.Goodbye, peerId: PeerId): AsyncIterable> { + this.onIncomingRequestBody({method: Method.Goodbye, body: req}, peerId); + yield {type: EncodedPayloadType.ssz, data: BigInt(0)}; + } + + private async *onPing(req: phase0.Ping, peerId: PeerId): AsyncIterable> { + this.onIncomingRequestBody({method: Method.Goodbye, body: req}, peerId); + yield {type: EncodedPayloadType.ssz, data: this.metadataController.seqNumber}; + } + + private async *onMetadata(req: null, peerId: PeerId): AsyncIterable> { + this.onIncomingRequestBody({method: Method.Metadata, body: req}, peerId); + + // V1 -> phase0, V2 -> altair. But the type serialization of phase0.Metadata will just ignore the extra .syncnets property + // It's safe to return altair.Metadata here for all versions + yield {type: EncodedPayloadType.ssz, data: this.metadataController.json}; + } + + private async *onBeaconBlocksByRange( + req: phase0.BeaconBlocksByRangeRequest, + peerId: PeerId + ): AsyncIterable> { + if (!this.inboundRateLimiter.allowBlockByRequest(peerId, req.count)) { + throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); + } + yield* this.reqRespHandlers.onBeaconBlocksByRange(req, peerId); + } + + private async *onBeaconBlocksByRoot( + req: phase0.BeaconBlocksByRootRequest, + peerId: PeerId + ): AsyncIterable> { + if (!this.inboundRateLimiter.allowBlockByRequest(peerId, req.length)) { + throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); + } + yield* this.reqRespHandlers.onBeaconBlocksByRoot(req, peerId); + } +} diff --git a/packages/beacon-node/src/network/reqresp/interface.ts b/packages/beacon-node/src/network/reqresp/interface.ts new file mode 100644 index 000000000000..641b7d4504fc --- /dev/null +++ b/packages/beacon-node/src/network/reqresp/interface.ts @@ -0,0 +1,69 @@ +import {PeerId} from "@libp2p/interface-peer-id"; +import {ForkName} from "@lodestar/params"; +import {allForks, altair, phase0} from "@lodestar/types"; + +export interface IReqResp { + start(): void; + stop(): void; + status(peerId: PeerId, request: phase0.Status): Promise; + goodbye(peerId: PeerId, request: phase0.Goodbye): Promise; + ping(peerId: PeerId): Promise; + metadata(peerId: PeerId, fork?: ForkName): Promise; + beaconBlocksByRange( + peerId: PeerId, + request: phase0.BeaconBlocksByRangeRequest + ): Promise; + beaconBlocksByRoot(peerId: PeerId, request: phase0.BeaconBlocksByRootRequest): Promise; + pruneOnPeerDisconnect(peerId: PeerId): void; + lightClientBootstrap(peerId: PeerId, request: Uint8Array): Promise; + lightClientOptimisticUpdate(peerId: PeerId): Promise; + lightClientFinalityUpdate(peerId: PeerId): Promise; + lightClientUpdatesByRange( + peerId: PeerId, + request: altair.LightClientUpdatesByRange + ): Promise; +} + +/** + * Rate limiter interface for inbound and outbound requests. + */ +export interface RateLimiter { + /** Allow to request or response based on rate limit params configured. */ + allowRequest(peerId: PeerId): boolean; + /** Rate limit check for block count */ + allowBlockByRequest(peerId: PeerId, numBlock: number): boolean; + + /** + * Prune by peer id + */ + prune(peerId: PeerId): void; + start(): void; + stop(): void; +} + +// Request/Response constants +export enum RespStatus { + /** + * A normal response follows, with contents matching the expected message schema and encoding specified in the request + */ + SUCCESS = 0, + /** + * The contents of the request are semantically invalid, or the payload is malformed, + * or could not be understood. The response payload adheres to the ErrorMessage schema + */ + INVALID_REQUEST = 1, + /** + * The responder encountered an error while processing the request. The response payload adheres to the ErrorMessage schema + */ + SERVER_ERROR = 2, + /** + * The responder does not have requested resource. The response payload adheres to the ErrorMessage schema (described below). Note: This response code is only valid as a response to BlocksByRange + */ + RESOURCE_UNAVAILABLE = 3, + /** + * Our node does not have bandwidth to serve requests due to either per-peer quota or total quota. + */ + RATE_LIMITED = 139, +} + +export type RpcResponseStatusError = Exclude; diff --git a/packages/reqresp/src/score.ts b/packages/beacon-node/src/network/reqresp/score.ts similarity index 94% rename from packages/reqresp/src/score.ts rename to packages/beacon-node/src/network/reqresp/score.ts index 9a121f9430fa..157e5d979ec6 100644 --- a/packages/reqresp/src/score.ts +++ b/packages/beacon-node/src/network/reqresp/score.ts @@ -1,6 +1,6 @@ +import {RequestError, RequestErrorCode} from "@lodestar/reqresp"; +import {PeerAction} from "../peers/score.js"; import {Method} from "./types.js"; -import {RequestError, RequestErrorCode} from "./request/index.js"; -import {PeerAction} from "./sharedTypes.js"; /** * libp2p-ts does not include types for the error codes. diff --git a/packages/beacon-node/src/network/reqresp/types.ts b/packages/beacon-node/src/network/reqresp/types.ts new file mode 100644 index 000000000000..c23f5c3b9717 --- /dev/null +++ b/packages/beacon-node/src/network/reqresp/types.ts @@ -0,0 +1,40 @@ +import {phase0} from "@lodestar/types"; + +/** ReqResp protocol names or methods. Each Method can have multiple versions and encodings */ +export enum Method { + // Phase 0 + Status = "status", + Goodbye = "goodbye", + Ping = "ping", + Metadata = "metadata", + BeaconBlocksByRange = "beacon_blocks_by_range", + BeaconBlocksByRoot = "beacon_blocks_by_root", + LightClientBootstrap = "light_client_bootstrap", + LightClientUpdatesByRange = "light_client_updates_by_range", + LightClientFinalityUpdate = "light_client_finality_update", + LightClientOptimisticUpdate = "light_client_optimistic_update", +} + +// To typesafe events to network +type RequestBodyByMethod = { + [Method.Status]: phase0.Status; + [Method.Goodbye]: phase0.Goodbye; + [Method.Ping]: phase0.Ping; + [Method.Metadata]: null; + // Do not matter + [Method.BeaconBlocksByRange]: unknown; + [Method.BeaconBlocksByRoot]: unknown; + [Method.LightClientBootstrap]: unknown; + [Method.LightClientUpdatesByRange]: unknown; + [Method.LightClientFinalityUpdate]: unknown; + [Method.LightClientOptimisticUpdate]: unknown; +}; + +export type RequestTypedContainer = { + [K in Method]: {method: K; body: RequestBodyByMethod[K]}; +}[Method]; + +export enum Version { + V1 = 1, + V2 = 2, +} diff --git a/packages/beacon-node/src/network/reqresp/utils/collectSequentialBlocksInRange.ts b/packages/beacon-node/src/network/reqresp/utils/collectSequentialBlocksInRange.ts new file mode 100644 index 000000000000..b162677b32d0 --- /dev/null +++ b/packages/beacon-node/src/network/reqresp/utils/collectSequentialBlocksInRange.ts @@ -0,0 +1,53 @@ +import {allForks, phase0} from "@lodestar/types"; +import {LodestarError} from "@lodestar/utils"; + +/** + * Asserts a response from BeaconBlocksByRange respects the request and is sequential + * Note: MUST allow missing block for skipped slots. + */ +export async function collectSequentialBlocksInRange( + blockStream: AsyncIterable, + {count, startSlot}: phase0.BeaconBlocksByRangeRequest +): Promise { + const blocks: allForks.SignedBeaconBlock[] = []; + + for await (const block of blockStream) { + const blockSlot = block.message.slot; + + // Note: step is deprecated and assumed to be 1 + if (blockSlot >= startSlot + count) { + throw new BlocksByRangeError({code: BlocksByRangeErrorCode.OVER_MAX_SLOT}); + } + + if (blockSlot < startSlot) { + throw new BlocksByRangeError({code: BlocksByRangeErrorCode.UNDER_START_SLOT}); + } + + const prevBlock = blocks.length === 0 ? null : blocks[blocks.length - 1]; + if (prevBlock) { + if (prevBlock.message.slot >= blockSlot) { + throw new BlocksByRangeError({code: BlocksByRangeErrorCode.BAD_SEQUENCE}); + } + } + + blocks.push(block); + if (blocks.length >= count) { + break; // Done, collected all blocks + } + } + + return blocks; +} + +export enum BlocksByRangeErrorCode { + UNDER_START_SLOT = "BLOCKS_BY_RANGE_ERROR_UNDER_START_SLOT", + OVER_MAX_SLOT = "BLOCKS_BY_RANGE_ERROR_OVER_MAX_SLOT", + BAD_SEQUENCE = "BLOCKS_BY_RANGE_ERROR_BAD_SEQUENCE", +} + +type BlocksByRangeErrorType = + | {code: BlocksByRangeErrorCode.UNDER_START_SLOT} + | {code: BlocksByRangeErrorCode.OVER_MAX_SLOT} + | {code: BlocksByRangeErrorCode.BAD_SEQUENCE}; + +export class BlocksByRangeError extends LodestarError {} diff --git a/packages/reqresp/package.json b/packages/reqresp/package.json index e67b2a0b0d08..69279321b5e1 100644 --- a/packages/reqresp/package.json +++ b/packages/reqresp/package.json @@ -59,7 +59,6 @@ "check-readme": "typescript-docs-verifier" }, "dependencies": { - "@libp2p/interface-peer-id": "^1.0.4", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^1.0.4", "strict-event-emitter-types": "^2.0.0", diff --git a/packages/reqresp/src/ReqResp.ts b/packages/reqresp/src/ReqResp.ts index 7d0c4973ffbd..233a9c467037 100644 --- a/packages/reqresp/src/ReqResp.ts +++ b/packages/reqresp/src/ReqResp.ts @@ -1,34 +1,30 @@ +import {setMaxListeners} from "node:events"; +import {Connection, Stream} from "@libp2p/interface-connection"; import {PeerId} from "@libp2p/interface-peer-id"; -import {ForkName} from "@lodestar/params"; -import {allForks, altair, phase0, Root, Slot} from "@lodestar/types"; -import {DIAL_TIMEOUT, REQUEST_TIMEOUT, RESP_TIMEOUT, TTFB_TIMEOUT} from "./constants.js"; -import {IReqResp, RateLimiter, ReqRespHandlerContext, ReqRespModules, RespStatus} from "./interface.js"; -import {InboundRateLimiter, RateLimiterOptions} from "./rate_limiter/RateLimiter.js"; -import {ReqRespProtocol} from "./ReqRespProtocol.js"; -import {RequestError} from "./request/errors.js"; -import {ResponseError} from "./response/errors.js"; -import {onOutgoingReqRespError} from "./score.js"; -import {IPeerRpcScoreStore, MetadataController} from "./sharedTypes.js"; -import {Method, ReqRespOptions, Version} from "./types.js"; -import {assertSequentialBlocksInRange} from "./utils/index.js"; - -/** This type helps response to beacon_block_by_range and beacon_block_by_root more efficiently */ -export type ReqRespBlockResponse = { - /** Deserialized data of allForks.SignedBeaconBlock */ - bytes: Uint8Array; - slot: Slot; -}; - -export const defaultReqRespOptions: ReqRespOptions = { - // eslint-disable-next-line @typescript-eslint/naming-convention - TTFB_TIMEOUT, - // eslint-disable-next-line @typescript-eslint/naming-convention - RESP_TIMEOUT, - // eslint-disable-next-line @typescript-eslint/naming-convention - REQUEST_TIMEOUT, - // eslint-disable-next-line @typescript-eslint/naming-convention - DIAL_TIMEOUT, -}; +import {Libp2p} from "libp2p"; +import {ILogger} from "@lodestar/utils"; +import {IBeaconConfig} from "@lodestar/config"; +import {getMetrics, Metrics, MetricsRegister} from "./metrics.js"; +import {RequestError, RequestErrorCode, sendRequest, SendRequestOpts} from "./request/index.js"; +import {handleRequest} from "./response/index.js"; +import {Encoding, ProtocolDefinition} from "./types.js"; + +type ProtocolID = string; + +export const DEFAULT_PROTOCOL_PREFIX = "/eth2/beacon_chain/req"; + +export interface ReqRespProtocolModules { + libp2p: Libp2p; + logger: ILogger; + config: IBeaconConfig; + metrics: Metrics | null; +} + +export interface ReqRespOpts extends SendRequestOpts { + /** Custom prefix for `/ProtocolPrefix/MessageName/SchemaVersion/Encoding` */ + protocolPrefix?: string; + getPeerLogMetadata?: (peerId: string) => string; +} /** * Implementation of Ethereum Consensus p2p Req/Resp domain. @@ -36,145 +32,151 @@ export const defaultReqRespOptions: ReqRespOptions = { * https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-reqresp-domain * https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#the-reqresp-domain */ -export abstract class ReqResp extends ReqRespProtocol implements IReqResp { - private inboundRateLimiter: RateLimiter; - private peerRpcScores: IPeerRpcScoreStore; - private metadataController: MetadataController; - - constructor(modules: ReqRespModules, options?: Partial & Partial) { - super(modules, {...defaultReqRespOptions, ...options}); - this.peerRpcScores = modules.peerRpcScores; - this.metadataController = modules.metadataController; - this.inboundRateLimiter = new InboundRateLimiter({...InboundRateLimiter.defaults, ...options}, modules); - } +export abstract class ReqResp { + private readonly libp2p: Libp2p; + private readonly logger: ILogger; + private readonly metrics: Metrics | null; + private controller = new AbortController(); + /** Tracks request and responses in a sequential counter */ + private reqCount = 0; + private readonly protocolPrefix: string; - protected onIncomingRequest(peerId: PeerId, method: Method): void { - if (method !== Method.Goodbye && !this.inboundRateLimiter.allowRequest(peerId)) { - throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); - } - } + /** `${protocolPrefix}/${method}/${version}/${encoding}` */ + private readonly supportedProtocols = new Map(); - protected onOutgoingRequestError(peerId: PeerId, method: Method, error: RequestError): void { - const peerAction = onOutgoingReqRespError(error, method); - if (peerAction !== null) { - this.peerRpcScores.applyAction(peerId, peerAction, error.type.code); - } + constructor(modules: ReqRespProtocolModules, private readonly opts: ReqRespOpts) { + this.libp2p = modules.libp2p; + this.logger = modules.logger; + this.metrics = modules.metrics ? getMetrics((modules.metrics as unknown) as MetricsRegister) : null; + this.protocolPrefix = opts.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX; } - protected getContext(): ReqRespHandlerContext { - const context = super.getContext(); - return { - ...context, - modules: { - ...context.modules, - inboundRateLimiter: this.inboundRateLimiter, - metadataController: this.metadataController, - }, - }; + registerProtocol(protocol: ProtocolDefinition): void { + const {method, version, encoding} = protocol; + const protocolID = this.formatProtocolID(method, version, encoding); + this.supportedProtocols.set(protocolID, protocol as ProtocolDefinition); } async start(): Promise { - await super.start(); - this.inboundRateLimiter.start(); - } - - async stop(): Promise { - await super.stop(); - this.inboundRateLimiter.stop(); - } - - pruneOnPeerDisconnect(peerId: PeerId): void { - this.inboundRateLimiter.prune(peerId); - } + this.controller = new AbortController(); + // We set infinity to prevent MaxListenersExceededWarning which get logged when listeners > 10 + // Since it is perfectly fine to have listeners > 10 + setMaxListeners(Infinity, this.controller.signal); - async status(peerId: PeerId, request: phase0.Status): Promise { - return await this.sendRequest(peerId, Method.Status, [Version.V1], request); - } - - async goodbye(peerId: PeerId, request: phase0.Goodbye): Promise { - await this.sendRequest(peerId, Method.Goodbye, [Version.V1], request); - } - - async ping(peerId: PeerId): Promise { - return await this.sendRequest( - peerId, - Method.Ping, - [Version.V1], - this.metadataController.seqNumber - ); + for (const [protocolID, protocol] of this.supportedProtocols) { + await this.libp2p.handle(protocolID, this.getRequestHandler(protocol)); + } } - async metadata(peerId: PeerId, fork?: ForkName): Promise { - // Only request V1 if forcing phase0 fork. It's safe to not specify `fork` and let stream negotiation pick the version - const versions = fork === ForkName.phase0 ? [Version.V1] : [Version.V2, Version.V1]; - return await this.sendRequest(peerId, Method.Metadata, versions, null); + async stop(): Promise { + for (const protocolID of this.supportedProtocols.keys()) { + await this.libp2p.unhandle(protocolID); + } + this.controller.abort(); } - async beaconBlocksByRange( + // Helper to reduce code duplication + protected async *sendRequest( peerId: PeerId, - request: phase0.BeaconBlocksByRangeRequest - ): Promise { - const blocks = await this.sendRequest( - peerId, - Method.BeaconBlocksByRange, - [Version.V2, Version.V1], // Prioritize V2 - request, - request.count - ); - assertSequentialBlocksInRange(blocks, request); - return blocks; - } + method: string, + versions: number[], + encoding: Encoding, + body: Req + ): AsyncIterable { + const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString()); + this.metrics?.outgoingRequests.inc({method}); + const timer = this.metrics?.outgoingRequestRoundtripTime.startTimer({method}); + + const protocols: ProtocolDefinition[] = []; + const protocolIDs: string[] = []; + + for (const version of versions) { + const protocolID = this.formatProtocolID(method, version, encoding); + const protocol = this.supportedProtocols.get(protocolID); + if (!protocol) { + throw Error(`Request to send to protocol ${protocolID} but it has not been declared`); + } + protocols.push(protocol); + protocolIDs.push(protocolID); + } - async beaconBlocksByRoot( - peerId: PeerId, - request: phase0.BeaconBlocksByRootRequest - ): Promise { - return await this.sendRequest( - peerId, - Method.BeaconBlocksByRoot, - [Version.V2, Version.V1], // Prioritize V2 - request, - request.length - ); + try { + yield* sendRequest( + {logger: this.logger, libp2p: this.libp2p, peerClient}, + peerId, + protocols, + protocolIDs, + body, + this.controller.signal, + this.opts, + this.reqCount++ + ); + } catch (e) { + this.metrics?.outgoingErrors.inc({method}); + + if (e instanceof RequestError) { + if (e.type.code === RequestErrorCode.DIAL_ERROR || e.type.code === RequestErrorCode.DIAL_TIMEOUT) { + this.metrics?.dialErrors.inc(); + } + + this.onOutgoingRequestError(peerId, method, e); + } + + throw e; + } finally { + timer?.(); + } } - async lightClientBootstrap(peerId: PeerId, request: Root): Promise { - return await this.sendRequest( - peerId, - Method.LightClientBootstrap, - [Version.V1], - request - ); + private getRequestHandler(protocol: ProtocolDefinition) { + return async ({connection, stream}: {connection: Connection; stream: Stream}) => { + const peerId = connection.remotePeer; + const peerClient = this.opts.getPeerLogMetadata?.(peerId.toString()); + const method = protocol.method; + + this.metrics?.incomingRequests.inc({method}); + const timer = this.metrics?.incomingRequestHandlerTime.startTimer({method}); + + this.onIncomingRequest?.(peerId, method); + + try { + await handleRequest({ + logger: this.logger, + stream, + peerId, + protocol, + signal: this.controller.signal, + requestId: this.reqCount++, + peerClient, + requestTimeoutMs: this.opts.requestTimeoutMs, + }); + // TODO: Do success peer scoring here + } catch { + this.metrics?.incomingErrors.inc({method}); + + // TODO: Do error peer scoring here + // Must not throw since this is an event handler + } finally { + timer?.(); + } + }; } - async lightClientOptimisticUpdate(peerId: PeerId): Promise { - return await this.sendRequest( - peerId, - Method.LightClientOptimisticUpdate, - [Version.V1], - null - ); + protected onIncomingRequest(_peerId: PeerId, _method: string): void { + // Override } - async lightClientFinalityUpdate(peerId: PeerId): Promise { - return await this.sendRequest( - peerId, - Method.LightClientFinalityUpdate, - [Version.V1], - null - ); + protected onOutgoingRequestError(_peerId: PeerId, _method: string, _error: RequestError): void { + // Override } - async lightClientUpdate( - peerId: PeerId, - request: altair.LightClientUpdatesByRange - ): Promise { - return await this.sendRequest( - peerId, - Method.LightClientUpdatesByRange, - [Version.V1], - request, - request.count - ); + /** + * ``` + * /ProtocolPrefix/MessageName/SchemaVersion/Encoding + * ``` + * https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#protocol-identification + */ + protected formatProtocolID(method: string, version: number, encoding: Encoding): string { + return `${this.protocolPrefix}/${method}/${version}/${encoding}`; } } diff --git a/packages/reqresp/src/ReqRespProtocol.ts b/packages/reqresp/src/ReqRespProtocol.ts deleted file mode 100644 index c5a73592e5cd..000000000000 --- a/packages/reqresp/src/ReqRespProtocol.ts +++ /dev/null @@ -1,177 +0,0 @@ -import {setMaxListeners} from "node:events"; -import {Connection, Stream} from "@libp2p/interface-connection"; -import {PeerId} from "@libp2p/interface-peer-id"; -import {Libp2p} from "libp2p"; -import {ILogger} from "@lodestar/utils"; -import {IBeaconConfig} from "@lodestar/config"; -import {Metrics} from "./metrics.js"; -import {RequestError, RequestErrorCode, sendRequest} from "./request/index.js"; -import {handleRequest} from "./response/index.js"; -import {PeersData} from "./sharedTypes.js"; -import {Encoding, Method, ProtocolDefinition, ReqRespOptions, RequestTypedContainer} from "./types.js"; -import {formatProtocolID} from "./utils/index.js"; -import {ReqRespHandlerProtocolContext} from "./interface.js"; - -type ProtocolID = string; - -export interface ReqRespProtocolModules { - libp2p: Libp2p; - peersData: PeersData; - logger: ILogger; - config: IBeaconConfig; - metrics: Metrics | null; -} - -/** - * Implementation of Ethereum Consensus p2p Req/Resp domain. - * For the spec that this code is based on, see: - * https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-reqresp-domain - * https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/p2p-interface.md#the-reqresp-domain - */ -export abstract class ReqRespProtocol { - private libp2p: Libp2p; - private readonly peersData: PeersData; - private logger: ILogger; - private controller = new AbortController(); - private options: ReqRespOptions; - private reqCount = 0; - private respCount = 0; - private metrics: Metrics | null; - private config: IBeaconConfig; - - /** `${protocolPrefix}/${method}/${version}/${encoding}` */ - private readonly supportedProtocols = new Map(); - - constructor(modules: ReqRespProtocolModules, options: ReqRespOptions) { - this.options = options; - this.config = modules.config; - this.libp2p = modules.libp2p; - this.peersData = modules.peersData; - this.logger = modules.logger; - this.metrics = modules.metrics; - } - - registerProtocol(protocol: ProtocolDefinition): void { - const {method, version, encoding} = protocol; - const protocolID = formatProtocolID(method, version, encoding); - this.supportedProtocols.set(protocolID, protocol as ProtocolDefinition); - } - - async start(): Promise { - this.controller = new AbortController(); - // We set infinity to prevent MaxListenersExceededWarning which get logged when listeners > 10 - // Since it is perfectly fine to have listeners > 10 - setMaxListeners(Infinity, this.controller.signal); - - for (const [protocolID, protocol] of this.supportedProtocols) { - await this.libp2p.handle(protocolID, this.getRequestHandler(protocol)); - } - } - - async stop(): Promise { - for (const protocolID of this.supportedProtocols.keys()) { - await this.libp2p.unhandle(protocolID); - } - this.controller.abort(); - } - - // Helper to reduce code duplication - protected async sendRequest( - peerId: PeerId, - method: string, - versions: string[], - body: Req, - maxResponses = 1 - ): Promise { - const peerClient = this.peersData.getPeerKind(peerId.toString()); - this.metrics?.outgoingRequests.inc({method}); - const timer = this.metrics?.outgoingRequestRoundtripTime.startTimer({method}); - - // Remember prefered encoding - const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY; - - const protocols: ProtocolDefinition[] = []; - for (const version of versions) { - const protocolID = formatProtocolID(method, version, encoding); - const protocol = this.supportedProtocols.get(protocolID); - if (!protocol) { - throw Error(`Request to send to protocol ${protocolID} but it has not been declared`); - } - protocols.push(protocol); - } - - try { - const result = await sendRequest( - {logger: this.logger, libp2p: this.libp2p, peerClient}, - peerId, - protocols, - body, - maxResponses, - this.controller.signal, - this.options, - this.reqCount++ - ); - - return result; - } catch (e) { - this.metrics?.outgoingErrors.inc({method}); - - if (e instanceof RequestError) { - if (e.type.code === RequestErrorCode.DIAL_ERROR || e.type.code === RequestErrorCode.DIAL_TIMEOUT) { - this.metrics?.dialErrors.inc(); - } - - this.onOutgoingRequestError(peerId, method as Method, e); - } - - throw e; - } finally { - timer?.(); - } - } - - private getRequestHandler(protocol: ProtocolDefinition) { - return async ({connection, stream}: {connection: Connection; stream: Stream}) => { - const peerId = connection.remotePeer; - const peerClient = this.peersData.getPeerKind(peerId.toString()); - const method = protocol.method; - - this.metrics?.incomingRequests.inc({method}); - const timer = this.metrics?.incomingRequestHandlerTime.startTimer({method}); - - this.onIncomingRequest?.(peerId, method); - - try { - await handleRequest({ - context: this.getContext(), - logger: this.logger, - stream, - peerId, - protocol, - signal: this.controller.signal, - requestId: this.respCount++, - peerClient, - }); - // TODO: Do success peer scoring here - } catch { - this.metrics?.incomingErrors.inc({method}); - - // TODO: Do error peer scoring here - // Must not throw since this is an event handler - } finally { - timer?.(); - } - }; - } - - protected getContext(): Context { - return { - modules: {config: this.config, logger: this.logger, metrics: this.metrics, peersData: this.peersData}, - eventHandlers: {onIncomingRequestBody: this.onIncomingRequestBody}, - } as Context; - } - - protected abstract onIncomingRequestBody(_req: RequestTypedContainer, _peerId: PeerId): void; - protected abstract onOutgoingRequestError(_peerId: PeerId, _method: Method, _error: RequestError): void; - protected abstract onIncomingRequest(_peerId: PeerId, _method: Method): void; -} diff --git a/packages/reqresp/src/constants.ts b/packages/reqresp/src/constants.ts deleted file mode 100644 index 603671c6d297..000000000000 --- a/packages/reqresp/src/constants.ts +++ /dev/null @@ -1,12 +0,0 @@ -/** The maximum time for complete response transfer. */ -export const RESP_TIMEOUT = 10 * 1000; // 10 sec -/** Non-spec timeout from sending request until write stream closed by responder */ -export const REQUEST_TIMEOUT = 5 * 1000; // 5 sec -/** The maximum time to wait for first byte of request response (time-to-first-byte). */ -export const TTFB_TIMEOUT = 5 * 1000; // 5 sec -/** Non-spec timeout from dialing protocol until stream opened */ -export const DIAL_TIMEOUT = 5 * 1000; // 5 sec -// eslint-disable-next-line @typescript-eslint/naming-convention -export const timeoutOptions = {TTFB_TIMEOUT, RESP_TIMEOUT, REQUEST_TIMEOUT, DIAL_TIMEOUT}; - -export const MAX_VARINT_BYTES = 10; diff --git a/packages/reqresp/src/encoders/responseEncode.ts b/packages/reqresp/src/encoders/responseEncode.ts index 8561db1c0705..448d9b2a91da 100644 --- a/packages/reqresp/src/encoders/responseEncode.ts +++ b/packages/reqresp/src/encoders/responseEncode.ts @@ -4,7 +4,6 @@ import {encodeErrorMessage} from "../utils/index.js"; import { ContextBytesType, ContextBytesFactory, - Protocol, ProtocolDefinition, EncodedPayload, EncodedPayloadType, @@ -53,7 +52,7 @@ export function responseEncodeSuccess( * fn yields exactly one `` and afterwards the stream must be terminated */ export async function* responseEncodeError( - protocol: Protocol, + protocol: Pick, status: RpcResponseStatusError, errorMessage: string ): AsyncGenerator { diff --git a/packages/reqresp/src/encodingStrategies/sszSnappy/decode.ts b/packages/reqresp/src/encodingStrategies/sszSnappy/decode.ts index 959de9c45371..4abe4e19c628 100644 --- a/packages/reqresp/src/encodingStrategies/sszSnappy/decode.ts +++ b/packages/reqresp/src/encodingStrategies/sszSnappy/decode.ts @@ -5,8 +5,8 @@ import {BufferedSource} from "../../utils/index.js"; import {SnappyFramesUncompress} from "./snappyFrames/uncompress.js"; import {maxEncodedLen} from "./utils.js"; import {SszSnappyError, SszSnappyErrorCode} from "./errors.js"; -import {MAX_VARINT_BYTES} from "../../constants.js"; +export const MAX_VARINT_BYTES = 10; export type TypeRead = Pick, "minSize" | "maxSize" | "deserialize">; /** diff --git a/packages/reqresp/src/index.ts b/packages/reqresp/src/index.ts index f9ecccadf746..6a4dadce7ce8 100644 --- a/packages/reqresp/src/index.ts +++ b/packages/reqresp/src/index.ts @@ -1,7 +1,8 @@ export {ReqResp} from "./ReqResp.js"; export {getMetrics, Metrics, MetricsRegister} from "./metrics.js"; -export {Encoding as ReqRespEncoding, Method as ReqRespMethod} from "./types.js"; // Expose enums renamed +export {Encoding as ReqRespEncoding} from "./types.js"; // Expose enums renamed export * from "./types.js"; export * from "./interface.js"; -export * from "./constants.js"; -export * from "./response/errors.js"; +export {ResponseErrorCode, ResponseError} from "./response/errors.js"; +export {RequestErrorCode, RequestError} from "./request/errors.js"; +export {collectExactOne, collectMaxResponse} from "./utils/index.js"; diff --git a/packages/reqresp/src/interface.ts b/packages/reqresp/src/interface.ts index df2a441c5dde..4aab7414f5c1 100644 --- a/packages/reqresp/src/interface.ts +++ b/packages/reqresp/src/interface.ts @@ -1,48 +1,4 @@ import {PeerId} from "@libp2p/interface-peer-id"; -import {ForkName} from "@lodestar/params"; -import {allForks, altair, phase0} from "@lodestar/types"; -import {ReqRespProtocolModules} from "./ReqRespProtocol.js"; -import {IPeerRpcScoreStore, MetadataController} from "./sharedTypes.js"; -import {ProtocolDefinition, RequestTypedContainer} from "./types.js"; - -export interface IReqResp { - start(): void; - stop(): void; - status(peerId: PeerId, request: phase0.Status): Promise; - goodbye(peerId: PeerId, request: phase0.Goodbye): Promise; - ping(peerId: PeerId): Promise; - metadata(peerId: PeerId, fork?: ForkName): Promise; - beaconBlocksByRange( - peerId: PeerId, - request: phase0.BeaconBlocksByRangeRequest - ): Promise; - beaconBlocksByRoot(peerId: PeerId, request: phase0.BeaconBlocksByRootRequest): Promise; - pruneOnPeerDisconnect(peerId: PeerId): void; - lightClientBootstrap(peerId: PeerId, request: Uint8Array): Promise; - lightClientOptimisticUpdate(peerId: PeerId): Promise; - lightClientFinalityUpdate(peerId: PeerId): Promise; - lightClientUpdate(peerId: PeerId, request: altair.LightClientUpdatesByRange): Promise; - registerProtocol(protocol: ProtocolDefinition): void; -} - -export interface ReqRespModules extends ReqRespProtocolModules { - peerRpcScores: IPeerRpcScoreStore; - metadataController: MetadataController; -} - -export interface ReqRespHandlerProtocolContext { - modules: Omit; - eventHandlers: { - onIncomingRequestBody(_req: RequestTypedContainer, _peerId: PeerId): void; - }; -} - -export interface ReqRespHandlerContext extends ReqRespHandlerProtocolContext { - modules: ReqRespHandlerProtocolContext["modules"] & { - inboundRateLimiter: RateLimiter; - metadataController: MetadataController; - }; -} /** * Rate limiter interface for inbound and outbound requests. diff --git a/packages/reqresp/src/messages/BeaconBlocksByRange.ts b/packages/reqresp/src/messages/BeaconBlocksByRange.ts new file mode 100644 index 000000000000..12d0af7f9b5a --- /dev/null +++ b/packages/reqresp/src/messages/BeaconBlocksByRange.ts @@ -0,0 +1,19 @@ +import {allForks, phase0, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const BeaconBlocksByRange: ProtocolDefinitionGenerator< + phase0.BeaconBlocksByRangeRequest, + allForks.SignedBeaconBlock +> = (_modules, handler) => { + return { + method: "beacon_blocks_by_range", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.BeaconBlocksByRangeRequest, + responseType: (forkName) => ssz[forkName].SignedBeaconBlock, + renderRequestBody: (req) => `${req.startSlot},${req.step},${req.count}`, + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/BeaconBlocksByRangeV2.ts b/packages/reqresp/src/messages/BeaconBlocksByRangeV2.ts new file mode 100644 index 000000000000..756f484fd521 --- /dev/null +++ b/packages/reqresp/src/messages/BeaconBlocksByRangeV2.ts @@ -0,0 +1,23 @@ +import {allForks, phase0, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const BeaconBlocksByRangeV2: ProtocolDefinitionGenerator< + phase0.BeaconBlocksByRangeRequest, + allForks.SignedBeaconBlock +> = (modules, handler) => { + return { + method: "beacon_blocks_by_range", + version: 2, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.BeaconBlocksByRangeRequest, + responseType: (forkName) => ssz[forkName].SignedBeaconBlock, + renderRequestBody: (req) => `${req.startSlot},${req.step},${req.count}`, + contextBytes: { + type: ContextBytesType.ForkDigest, + forkDigestContext: modules.config, + forkFromResponse: (block) => modules.config.getForkName(block.message.slot), + }, + }; +}; diff --git a/packages/reqresp/src/messages/BeaconBlocksByRoot.ts b/packages/reqresp/src/messages/BeaconBlocksByRoot.ts new file mode 100644 index 000000000000..b4e22d471b45 --- /dev/null +++ b/packages/reqresp/src/messages/BeaconBlocksByRoot.ts @@ -0,0 +1,20 @@ +import {allForks, phase0, ssz} from "@lodestar/types"; +import {toHex} from "@lodestar/utils"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const BeaconBlocksByRoot: ProtocolDefinitionGenerator< + phase0.BeaconBlocksByRootRequest, + allForks.SignedBeaconBlock +> = (_modules, handler) => { + return { + method: "beacon_blocks_by_root", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.BeaconBlocksByRootRequest, + responseType: (forkName) => ssz[forkName].SignedBeaconBlock, + renderRequestBody: (req) => req.map((root) => toHex(root)).join(","), + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/v2/BeaconBlocksByRoot.ts b/packages/reqresp/src/messages/BeaconBlocksByRootV2.ts similarity index 50% rename from packages/reqresp/src/messages/v2/BeaconBlocksByRoot.ts rename to packages/reqresp/src/messages/BeaconBlocksByRootV2.ts index aacef8789177..76aadd8b0580 100644 --- a/packages/reqresp/src/messages/v2/BeaconBlocksByRoot.ts +++ b/packages/reqresp/src/messages/BeaconBlocksByRootV2.ts @@ -1,30 +1,17 @@ import {allForks, phase0, ssz} from "@lodestar/types"; import {toHex} from "@lodestar/utils"; -import {RespStatus} from "../../interface.js"; -import {ResponseError} from "../../response/errors.js"; -import {ContextBytesType, Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getHandlerRequiredErrorFor} from "../utils.js"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; // eslint-disable-next-line @typescript-eslint/naming-convention export const BeaconBlocksByRootV2: ProtocolDefinitionGenerator< phase0.BeaconBlocksByRootRequest, allForks.SignedBeaconBlock > = (modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.BeaconBlocksByRoot); - } - return { - method: Method.BeaconBlocksByRoot, - version: Version.V2, + method: "beacon_blocks_by_root", + version: 2, encoding: Encoding.SSZ_SNAPPY, - handler: async function* beaconBlocksByRootV2Handler(context, req, peerId) { - if (!context.modules.inboundRateLimiter.allowBlockByRequest(peerId, req.length)) { - throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); - } - - yield* handler(req, peerId); - }, + handler, requestType: () => ssz.phase0.BeaconBlocksByRootRequest, responseType: (forkName) => ssz[forkName].SignedBeaconBlock, renderRequestBody: (req) => req.map((root) => toHex(root)).join(","), @@ -33,6 +20,5 @@ export const BeaconBlocksByRootV2: ProtocolDefinitionGenerator< forkDigestContext: modules.config, forkFromResponse: (block) => modules.config.getForkName(block.message.slot), }, - isSingleResponse: false, }; }; diff --git a/packages/reqresp/src/messages/Goodbye.ts b/packages/reqresp/src/messages/Goodbye.ts new file mode 100644 index 000000000000..db9684466bc9 --- /dev/null +++ b/packages/reqresp/src/messages/Goodbye.ts @@ -0,0 +1,16 @@ +import {phase0, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const Goodbye: ProtocolDefinitionGenerator = (_modules, handler) => { + return { + method: "goodbye", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.Goodbye, + responseType: () => ssz.phase0.Goodbye, + renderRequestBody: (req) => req.toString(10), + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/v1/LightClientBootstrap.ts b/packages/reqresp/src/messages/LightClientBootstrap.ts similarity index 55% rename from packages/reqresp/src/messages/v1/LightClientBootstrap.ts rename to packages/reqresp/src/messages/LightClientBootstrap.ts index e7448e913da4..7aa9e380c9b5 100644 --- a/packages/reqresp/src/messages/v1/LightClientBootstrap.ts +++ b/packages/reqresp/src/messages/LightClientBootstrap.ts @@ -1,28 +1,21 @@ import {altair, Root, ssz} from "@lodestar/types"; import {toHex} from "@lodestar/utils"; -import {Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getContextBytesLightclient, getHandlerRequiredErrorFor} from "../utils.js"; +import {Encoding, ProtocolDefinitionGenerator} from "../types.js"; +import {getContextBytesLightclient} from "./utils.js"; // eslint-disable-next-line @typescript-eslint/naming-convention export const LightClientBootstrap: ProtocolDefinitionGenerator = ( modules, handler ) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.LightClientBootstrap); - } - return { - method: Method.LightClientBootstrap, - version: Version.V1, + method: "light_client_bootstrap", + version: 1, encoding: Encoding.SSZ_SNAPPY, - handler: async function* lightClientBootstrapHandler(context, req, peerId) { - yield* handler(req, peerId); - }, + handler, requestType: () => ssz.Root, responseType: () => ssz.altair.LightClientBootstrap, renderRequestBody: (req) => toHex(req), contextBytes: getContextBytesLightclient((bootstrap) => modules.config.getForkName(bootstrap.header.slot), modules), - isSingleResponse: true, }; }; diff --git a/packages/reqresp/src/messages/v1/LightClientFinalityUpdate.ts b/packages/reqresp/src/messages/LightClientFinalityUpdate.ts similarity index 51% rename from packages/reqresp/src/messages/v1/LightClientFinalityUpdate.ts rename to packages/reqresp/src/messages/LightClientFinalityUpdate.ts index ad033e8df346..5f10c60391af 100644 --- a/packages/reqresp/src/messages/v1/LightClientFinalityUpdate.ts +++ b/packages/reqresp/src/messages/LightClientFinalityUpdate.ts @@ -1,26 +1,19 @@ import {altair, ssz} from "@lodestar/types"; -import {Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getContextBytesLightclient, getHandlerRequiredErrorFor} from "../utils.js"; +import {Encoding, ProtocolDefinitionGenerator} from "../types.js"; +import {getContextBytesLightclient} from "./utils.js"; // eslint-disable-next-line @typescript-eslint/naming-convention export const LightClientFinalityUpdate: ProtocolDefinitionGenerator = ( modules, handler ) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.LightClientFinalityUpdate); - } - return { - method: Method.LightClientFinalityUpdate, - version: Version.V1, + method: "light_client_finality_update", + version: 1, encoding: Encoding.SSZ_SNAPPY, - handler: async function* statusHandler(_context, req, peerId) { - yield* handler(req, peerId); - }, + handler, requestType: () => null, responseType: () => ssz.altair.LightClientFinalityUpdate, contextBytes: getContextBytesLightclient((update) => modules.config.getForkName(update.signatureSlot), modules), - isSingleResponse: true, }; }; diff --git a/packages/reqresp/src/messages/v1/LightClientOptimisticUpdate.ts b/packages/reqresp/src/messages/LightClientOptimisticUpdate.ts similarity index 52% rename from packages/reqresp/src/messages/v1/LightClientOptimisticUpdate.ts rename to packages/reqresp/src/messages/LightClientOptimisticUpdate.ts index 614d12be87fc..d2f6d0849e20 100644 --- a/packages/reqresp/src/messages/v1/LightClientOptimisticUpdate.ts +++ b/packages/reqresp/src/messages/LightClientOptimisticUpdate.ts @@ -1,25 +1,19 @@ import {altair, ssz} from "@lodestar/types"; -import {Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getContextBytesLightclient, getHandlerRequiredErrorFor} from "../utils.js"; +import {Encoding, ProtocolDefinitionGenerator} from "../types.js"; +import {getContextBytesLightclient} from "./utils.js"; // eslint-disable-next-line @typescript-eslint/naming-convention export const LightClientOptimisticUpdate: ProtocolDefinitionGenerator = ( modules, handler ) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.LightClientOptimisticUpdate); - } return { - method: Method.LightClientOptimisticUpdate, - version: Version.V1, + method: "light_client_finality_update", + version: 1, encoding: Encoding.SSZ_SNAPPY, - handler: async function* statusHandler(_context, req, peerId) { - yield* handler(req, peerId); - }, + handler, requestType: () => null, responseType: () => ssz.altair.LightClientOptimisticUpdate, contextBytes: getContextBytesLightclient((update) => modules.config.getForkName(update.signatureSlot), modules), - isSingleResponse: true, }; }; diff --git a/packages/reqresp/src/messages/v1/LightClientUpdatesByRange.ts b/packages/reqresp/src/messages/LightClientUpdatesByRange.ts similarity index 56% rename from packages/reqresp/src/messages/v1/LightClientUpdatesByRange.ts rename to packages/reqresp/src/messages/LightClientUpdatesByRange.ts index 74e05bdb4eb6..347aa2216815 100644 --- a/packages/reqresp/src/messages/v1/LightClientUpdatesByRange.ts +++ b/packages/reqresp/src/messages/LightClientUpdatesByRange.ts @@ -1,26 +1,20 @@ import {altair, ssz} from "@lodestar/types"; -import {Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getContextBytesLightclient, getHandlerRequiredErrorFor} from "../utils.js"; +import {Encoding, ProtocolDefinitionGenerator} from "../types.js"; +import {getContextBytesLightclient} from "./utils.js"; // eslint-disable-next-line @typescript-eslint/naming-convention export const LightClientUpdatesByRange: ProtocolDefinitionGenerator< altair.LightClientUpdatesByRange, altair.LightClientUpdate > = (modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.LightClientUpdatesByRange); - } return { - method: Method.LightClientUpdatesByRange, - version: Version.V1, + method: "light_client_updates_by_range", + version: 1, encoding: Encoding.SSZ_SNAPPY, - handler: async function* statusHandler(_context, req, peerId) { - yield* handler(req, peerId); - }, + handler, requestType: () => ssz.altair.LightClientUpdatesByRange, responseType: () => ssz.altair.LightClientUpdate, renderRequestBody: (req) => `${req.startPeriod},${req.count}`, contextBytes: getContextBytesLightclient((update) => modules.config.getForkName(update.signatureSlot), modules), - isSingleResponse: true, }; }; diff --git a/packages/reqresp/src/messages/Metadata.ts b/packages/reqresp/src/messages/Metadata.ts new file mode 100644 index 000000000000..846ce31238e5 --- /dev/null +++ b/packages/reqresp/src/messages/Metadata.ts @@ -0,0 +1,15 @@ +import {allForks, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const Metadata: ProtocolDefinitionGenerator = (modules, handler) => { + return { + method: "metadata", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => null, + responseType: () => ssz.phase0.Metadata, + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/MetadataV2.ts b/packages/reqresp/src/messages/MetadataV2.ts new file mode 100644 index 000000000000..3d48a362d7c4 --- /dev/null +++ b/packages/reqresp/src/messages/MetadataV2.ts @@ -0,0 +1,15 @@ +import {allForks, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const MetadataV2: ProtocolDefinitionGenerator = (modules, handler) => { + return { + method: "metadata", + version: 2, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => null, + responseType: () => ssz.altair.Metadata, + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/Ping.ts b/packages/reqresp/src/messages/Ping.ts new file mode 100644 index 000000000000..3f26da6a5f0f --- /dev/null +++ b/packages/reqresp/src/messages/Ping.ts @@ -0,0 +1,16 @@ +import {phase0, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const Ping: ProtocolDefinitionGenerator = (modules, handler) => { + return { + method: "ping", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.Ping, + responseType: () => ssz.phase0.Ping, + renderRequestBody: (req) => req.toString(10), + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/Status.ts b/packages/reqresp/src/messages/Status.ts new file mode 100644 index 000000000000..6e368f468832 --- /dev/null +++ b/packages/reqresp/src/messages/Status.ts @@ -0,0 +1,15 @@ +import {phase0, ssz} from "@lodestar/types"; +import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js"; + +// eslint-disable-next-line @typescript-eslint/naming-convention +export const Status: ProtocolDefinitionGenerator = (_modules, handler) => { + return { + method: "status", + version: 1, + encoding: Encoding.SSZ_SNAPPY, + handler, + requestType: () => ssz.phase0.Status, + responseType: () => ssz.phase0.Status, + contextBytes: {type: ContextBytesType.Empty}, + }; +}; diff --git a/packages/reqresp/src/messages/index.ts b/packages/reqresp/src/messages/index.ts index 1e17721f08ae..c1f6aeddff42 100644 --- a/packages/reqresp/src/messages/index.ts +++ b/packages/reqresp/src/messages/index.ts @@ -1,34 +1,14 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import {BeaconBlocksByRoot} from "./v1/BeaconBlocksByRoot.js"; -import {BeaconBlocksByRange} from "./v1/BeaconBlocksByRange.js"; -import {Goodbye} from "./v1/Goodbye.js"; -import {LightClientBootstrap} from "./v1/LightClientBootstrap.js"; -import {LightClientFinalityUpdate} from "./v1/LightClientFinalityUpdate.js"; -import {LightClientOptimisticUpdate} from "./v1/LightClientOptimisticUpdate.js"; -import {LightClientUpdatesByRange} from "./v1/LightClientUpdatesByRange.js"; -import {Metadata} from "./v1/Metadata.js"; -import {Ping} from "./v1/Ping.js"; -import {Status} from "./v1/Status.js"; -import {BeaconBlocksByRangeV2} from "./v2/BeaconBlocksByRange.js"; -import {BeaconBlocksByRootV2} from "./v2/BeaconBlocksByRoot.js"; -import {MetadataV2} from "./v2/Metadata.js"; - -export default { - v1: { - BeaconBlocksByRoot, - BeaconBlocksByRange, - Goodbye, - LightClientBootstrap, - LightClientFinalityUpdate, - LightClientOptimisticUpdate, - LightClientUpdatesByRange, - Metadata, - Ping, - Status, - }, - v2: { - BeaconBlocksByRange: BeaconBlocksByRangeV2, - BeaconBlocksByRoot: BeaconBlocksByRootV2, - Metadata: MetadataV2, - }, -}; +export {BeaconBlocksByRoot} from "./BeaconBlocksByRoot.js"; +export {BeaconBlocksByRootV2} from "./BeaconBlocksByRootV2.js"; +export {BeaconBlocksByRange} from "./BeaconBlocksByRange.js"; +export {BeaconBlocksByRangeV2} from "./BeaconBlocksByRangeV2.js"; +export {Goodbye} from "./Goodbye.js"; +export {LightClientBootstrap} from "./LightClientBootstrap.js"; +export {LightClientFinalityUpdate} from "./LightClientFinalityUpdate.js"; +export {LightClientOptimisticUpdate} from "./LightClientOptimisticUpdate.js"; +export {LightClientUpdatesByRange} from "./LightClientUpdatesByRange.js"; +export {Metadata} from "./Metadata.js"; +export {MetadataV2} from "./MetadataV2.js"; +export {Ping} from "./Ping.js"; +export {Status} from "./Status.js"; diff --git a/packages/reqresp/src/messages/utils.ts b/packages/reqresp/src/messages/utils.ts index 4ad874501057..2801511a0d2a 100644 --- a/packages/reqresp/src/messages/utils.ts +++ b/packages/reqresp/src/messages/utils.ts @@ -12,6 +12,3 @@ export function getContextBytesLightclient( forkFromResponse, }; } - -export const getHandlerRequiredErrorFor = (method: string): Error => - new Error(`Handler is required for method "${method}."`); diff --git a/packages/reqresp/src/messages/v1/BeaconBlocksByRange.ts b/packages/reqresp/src/messages/v1/BeaconBlocksByRange.ts deleted file mode 100644 index fe2faeb1918e..000000000000 --- a/packages/reqresp/src/messages/v1/BeaconBlocksByRange.ts +++ /dev/null @@ -1,33 +0,0 @@ -import {allForks, phase0, ssz} from "@lodestar/types"; -import {RespStatus} from "../../interface.js"; -import {ResponseError} from "../../response/errors.js"; -import {ContextBytesType, Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getHandlerRequiredErrorFor} from "../utils.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const BeaconBlocksByRange: ProtocolDefinitionGenerator< - phase0.BeaconBlocksByRangeRequest, - allForks.SignedBeaconBlock -> = (_modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.BeaconBlocksByRange); - } - - return { - method: Method.BeaconBlocksByRange, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* beaconBlocksByRangeHandler(context, req, peerId) { - if (!context.modules.inboundRateLimiter.allowBlockByRequest(peerId, req.count)) { - throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); - } - - yield* handler(req, peerId); - }, - requestType: () => ssz.phase0.BeaconBlocksByRangeRequest, - responseType: (forkName) => ssz[forkName].SignedBeaconBlock, - renderRequestBody: (req) => `${req.startSlot},${req.step},${req.count}`, - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: false, - }; -}; diff --git a/packages/reqresp/src/messages/v1/BeaconBlocksByRoot.ts b/packages/reqresp/src/messages/v1/BeaconBlocksByRoot.ts deleted file mode 100644 index da19028d5ffb..000000000000 --- a/packages/reqresp/src/messages/v1/BeaconBlocksByRoot.ts +++ /dev/null @@ -1,34 +0,0 @@ -import {allForks, phase0, ssz} from "@lodestar/types"; -import {toHex} from "@lodestar/utils"; -import {RespStatus} from "../../interface.js"; -import {ResponseError} from "../../response/errors.js"; -import {ContextBytesType, Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getHandlerRequiredErrorFor} from "../utils.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const BeaconBlocksByRoot: ProtocolDefinitionGenerator< - phase0.BeaconBlocksByRootRequest, - allForks.SignedBeaconBlock -> = (_modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.BeaconBlocksByRoot); - } - - return { - method: Method.BeaconBlocksByRoot, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* beaconBlocksByRootHandler(context, req, peerId) { - if (!context.modules.inboundRateLimiter.allowBlockByRequest(peerId, req.length)) { - throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); - } - - yield* handler(req, peerId); - }, - requestType: () => ssz.phase0.BeaconBlocksByRootRequest, - responseType: (forkName) => ssz[forkName].SignedBeaconBlock, - renderRequestBody: (req) => req.map((root) => toHex(root)).join(","), - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: false, - }; -}; diff --git a/packages/reqresp/src/messages/v1/Goodbye.ts b/packages/reqresp/src/messages/v1/Goodbye.ts deleted file mode 100644 index beff4fa20d47..000000000000 --- a/packages/reqresp/src/messages/v1/Goodbye.ts +++ /dev/null @@ -1,28 +0,0 @@ -import {phase0, ssz} from "@lodestar/types"; -import { - ContextBytesType, - EncodedPayloadType, - Encoding, - Method, - ProtocolDefinitionGenerator, - Version, -} from "../../types.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const Goodbye: ProtocolDefinitionGenerator = (_handler) => { - return { - method: Method.Status, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* goodbyeHandler(context, req, peerId) { - context.eventHandlers.onIncomingRequestBody({method: Method.Goodbye, body: req}, peerId); - - yield {type: EncodedPayloadType.ssz, data: context.modules.metadataController.seqNumber}; - }, - requestType: () => ssz.phase0.Goodbye, - responseType: () => ssz.phase0.Goodbye, - renderRequestBody: (req) => req.toString(10), - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: true, - }; -}; diff --git a/packages/reqresp/src/messages/v1/Metadata.ts b/packages/reqresp/src/messages/v1/Metadata.ts deleted file mode 100644 index e5ddda0df86f..000000000000 --- a/packages/reqresp/src/messages/v1/Metadata.ts +++ /dev/null @@ -1,27 +0,0 @@ -import {allForks, ssz} from "@lodestar/types"; -import { - ContextBytesType, - EncodedPayloadType, - Encoding, - Method, - ProtocolDefinitionGenerator, - Version, -} from "../../types.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const Metadata: ProtocolDefinitionGenerator = (modules) => { - return { - method: Method.Metadata, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* metadataHandler(context, req, peerId) { - context.eventHandlers.onIncomingRequestBody({method: Method.Metadata, body: req}, peerId); - - yield {type: EncodedPayloadType.ssz, data: modules.metadataController.json}; - }, - requestType: () => null, - responseType: () => ssz.phase0.Metadata, - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: true, - }; -}; diff --git a/packages/reqresp/src/messages/v1/Ping.ts b/packages/reqresp/src/messages/v1/Ping.ts deleted file mode 100644 index 009aa5dca042..000000000000 --- a/packages/reqresp/src/messages/v1/Ping.ts +++ /dev/null @@ -1,28 +0,0 @@ -import {phase0, ssz} from "@lodestar/types"; -import { - ContextBytesType, - EncodedPayloadType, - Encoding, - Method, - ProtocolDefinitionGenerator, - Version, -} from "../../types.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const Ping: ProtocolDefinitionGenerator = (modules) => { - return { - method: Method.Status, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* pingHandler(context, req, peerId) { - context.eventHandlers.onIncomingRequestBody({method: Method.Ping, body: req}, peerId); - - yield {type: EncodedPayloadType.ssz, data: modules.metadataController.seqNumber}; - }, - requestType: () => ssz.phase0.Ping, - responseType: () => ssz.phase0.Ping, - renderRequestBody: (req) => req.toString(10), - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: true, - }; -}; diff --git a/packages/reqresp/src/messages/v1/Status.ts b/packages/reqresp/src/messages/v1/Status.ts deleted file mode 100644 index 0d204702a9c8..000000000000 --- a/packages/reqresp/src/messages/v1/Status.ts +++ /dev/null @@ -1,25 +0,0 @@ -import {phase0, ssz} from "@lodestar/types"; -import {ContextBytesType, Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getHandlerRequiredErrorFor} from "../utils.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const Status: ProtocolDefinitionGenerator = (_modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.Status); - } - - return { - method: Method.Status, - version: Version.V1, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* statusHandler(context, req, peerId) { - context.eventHandlers.onIncomingRequestBody({method: Method.Status, body: req}, peerId); - - yield* handler(req, peerId); - }, - requestType: () => ssz.phase0.Status, - responseType: () => ssz.phase0.Status, - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: true, - }; -}; diff --git a/packages/reqresp/src/messages/v2/BeaconBlocksByRange.ts b/packages/reqresp/src/messages/v2/BeaconBlocksByRange.ts deleted file mode 100644 index 0df2ffdaadb6..000000000000 --- a/packages/reqresp/src/messages/v2/BeaconBlocksByRange.ts +++ /dev/null @@ -1,37 +0,0 @@ -import {allForks, phase0, ssz} from "@lodestar/types"; -import {RespStatus} from "../../interface.js"; -import {ResponseError} from "../../response/errors.js"; -import {ContextBytesType, Encoding, Method, ProtocolDefinitionGenerator, Version} from "../../types.js"; -import {getHandlerRequiredErrorFor} from "../utils.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const BeaconBlocksByRangeV2: ProtocolDefinitionGenerator< - phase0.BeaconBlocksByRangeRequest, - allForks.SignedBeaconBlock -> = (modules, handler) => { - if (!handler) { - throw getHandlerRequiredErrorFor(Method.BeaconBlocksByRange); - } - - return { - method: Method.BeaconBlocksByRange, - version: Version.V2, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* beaconBlocksByRangeV2Handler(context, req, peerId) { - if (!context.modules.inboundRateLimiter.allowBlockByRequest(peerId, req.count)) { - throw new ResponseError(RespStatus.RATE_LIMITED, "rate limit"); - } - - yield* handler(req, peerId); - }, - requestType: () => ssz.phase0.BeaconBlocksByRangeRequest, - responseType: (forkName) => ssz[forkName].SignedBeaconBlock, - renderRequestBody: (req) => `${req.startSlot},${req.step},${req.count}`, - contextBytes: { - type: ContextBytesType.ForkDigest, - forkDigestContext: modules.config, - forkFromResponse: (block) => modules.config.getForkName(block.message.slot), - }, - isSingleResponse: false, - }; -}; diff --git a/packages/reqresp/src/messages/v2/Metadata.ts b/packages/reqresp/src/messages/v2/Metadata.ts deleted file mode 100644 index 4dfde9756b25..000000000000 --- a/packages/reqresp/src/messages/v2/Metadata.ts +++ /dev/null @@ -1,27 +0,0 @@ -import {allForks, ssz} from "@lodestar/types"; -import { - ContextBytesType, - EncodedPayloadType, - Encoding, - Method, - ProtocolDefinitionGenerator, - Version, -} from "../../types.js"; - -// eslint-disable-next-line @typescript-eslint/naming-convention -export const MetadataV2: ProtocolDefinitionGenerator = (modules) => { - return { - method: Method.Metadata, - version: Version.V2, - encoding: Encoding.SSZ_SNAPPY, - handler: async function* metadataV2Handler(context, req, peerId) { - context.eventHandlers.onIncomingRequestBody({method: Method.Metadata, body: req}, peerId); - - yield {type: EncodedPayloadType.ssz, data: modules.metadataController.json}; - }, - requestType: () => null, - responseType: () => ssz.altair.Metadata, - contextBytes: {type: ContextBytesType.Empty}, - isSingleResponse: true, - }; -}; diff --git a/packages/reqresp/src/metrics.ts b/packages/reqresp/src/metrics.ts index 5f09721744bd..d9683b84ed92 100644 --- a/packages/reqresp/src/metrics.ts +++ b/packages/reqresp/src/metrics.ts @@ -29,12 +29,6 @@ interface Histogram { reset(): void; } -interface AvgMinMax { - set(values: number[]): void; - set(labels: Labels, values: number[]): void; - set(arg1?: Labels | number[], arg2?: number[]): void; -} - type GaugeConfig = { name: string; help: string; @@ -48,12 +42,9 @@ type HistogramConfig = { buckets?: number[]; }; -type AvgMinMaxConfig = GaugeConfig; - export interface MetricsRegister { gauge(config: GaugeConfig): Gauge; histogram(config: HistogramConfig): Histogram; - avgMinMax(config: AvgMinMaxConfig): AvgMinMax; } export type Metrics = ReturnType; @@ -71,18 +62,9 @@ export type LodestarGitData = { * A collection of metrics used throughout the Gossipsub behaviour. */ // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type -export function getMetrics(register: MetricsRegister, gitData: LodestarGitData) { +export function getMetrics(register: MetricsRegister) { // Using function style instead of class to prevent having to re-declare all MetricsPrometheus types. - // Track version, same as https://github.com/ChainSafe/lodestar/blob/6df28de64f12ea90b341b219229a47c8a25c9343/packages/lodestar/src/metrics/metrics/lodestar.ts#L17 - register - .gauge({ - name: "lodestar_version", - help: "Lodestar version", - labelNames: Object.keys(gitData) as (keyof LodestarGitData)[], - }) - .set(gitData, 1); - return { outgoingRequests: register.gauge<{method: string}>({ name: "beacon_reqresp_outgoing_requests_total", diff --git a/packages/reqresp/src/rate_limiter/RateLimiter.ts b/packages/reqresp/src/rate_limiter/RateLimiter.ts index 0f45e6cac205..54cc10a32f74 100644 --- a/packages/reqresp/src/rate_limiter/RateLimiter.ts +++ b/packages/reqresp/src/rate_limiter/RateLimiter.ts @@ -2,12 +2,11 @@ import {PeerId} from "@libp2p/interface-peer-id"; import {ILogger, MapDef} from "@lodestar/utils"; import {RateLimiter} from "../interface.js"; import {Metrics} from "../metrics.js"; -import {IPeerRpcScoreStore, PeerAction} from "../sharedTypes.js"; import {RateTracker} from "./RateTracker.js"; interface RateLimiterModules { logger: ILogger; - peerRpcScores: IPeerRpcScoreStore; + reportPeer: (peer: PeerId) => void; metrics: Metrics | null; } @@ -37,7 +36,7 @@ const DISCONNECTED_TIMEOUT_MS = 5 * 60 * 1000; */ export class InboundRateLimiter implements RateLimiter { private readonly logger: ILogger; - private readonly peerRpcScores: IPeerRpcScoreStore; + private readonly reportPeer: RateLimiterModules["reportPeer"]; private readonly metrics: Metrics | null; private requestCountTrackersByPeer: MapDef; /** @@ -68,6 +67,7 @@ export class InboundRateLimiter implements RateLimiter { constructor(options: Partial, modules: RateLimiterModules) { this.options = {...InboundRateLimiter.defaults, ...options}; + this.reportPeer = modules.reportPeer; this.requestCountTrackersByPeer = new MapDef( () => new RateTracker({limit: this.options.requestCountPeerLimit, timeoutMs: this.options.rateTrackerTimeoutMs}) @@ -80,7 +80,6 @@ export class InboundRateLimiter implements RateLimiter { () => new RateTracker({limit: this.options.blockCountPeerLimit, timeoutMs: this.options.rateTrackerTimeoutMs}) ); this.logger = modules.logger; - this.peerRpcScores = modules.peerRpcScores; this.metrics = modules.metrics; this.lastSeenRequestsByPeer = new Map(); } @@ -109,7 +108,7 @@ export class InboundRateLimiter implements RateLimiter { peerId: peerIdStr, requestsWithinWindow: requestCountPeerTracker.getRequestedObjectsWithinWindow(), }); - this.peerRpcScores.applyAction(peerId, PeerAction.Fatal, "RateLimit"); + this.reportPeer(peerId); if (this.metrics) { this.metrics.rateLimitErrors.inc({tracker: "requestCountPeerTracker"}); } @@ -132,7 +131,7 @@ export class InboundRateLimiter implements RateLimiter { blockCount: numBlock, requestsWithinWindow: blockCountPeerTracker.getRequestedObjectsWithinWindow(), }); - this.peerRpcScores.applyAction(peerId, PeerAction.Fatal, "RateLimit"); + this.reportPeer(peerId); if (this.metrics) { this.metrics.rateLimitErrors.inc({tracker: "blockCountPeerTracker"}); } diff --git a/packages/reqresp/src/request/collectResponses.ts b/packages/reqresp/src/request/collectResponses.ts deleted file mode 100644 index 018354390f87..000000000000 --- a/packages/reqresp/src/request/collectResponses.ts +++ /dev/null @@ -1,34 +0,0 @@ -import {ProtocolDefinition} from "../types.js"; -import {RequestErrorCode, RequestInternalError} from "./errors.js"; - -/** - * Sink for `*`, from - * ```bnf - * response ::= * - * ``` - * Note: `response` has zero or more chunks for SSZ-list responses or exactly one chunk for non-list - */ -export function collectResponses( - protocol: ProtocolDefinition, - maxResponses?: number -): (source: AsyncIterable) => Promise { - return async (source) => { - if (protocol.isSingleResponse) { - for await (const response of source) { - return response; - } - throw new RequestInternalError({code: RequestErrorCode.EMPTY_RESPONSE}); - } - - // else: zero or more responses - const responses: T[] = []; - for await (const response of source) { - responses.push(response); - - if (maxResponses !== undefined && responses.length >= maxResponses) { - break; - } - } - return responses; - }; -} diff --git a/packages/reqresp/src/request/errors.ts b/packages/reqresp/src/request/errors.ts index ce9b2fcacdb6..da131686c8ae 100644 --- a/packages/reqresp/src/request/errors.ts +++ b/packages/reqresp/src/request/errors.ts @@ -1,5 +1,5 @@ import {LodestarError} from "@lodestar/utils"; -import {Method, Encoding} from "../types.js"; +import {Encoding} from "../types.js"; import {ResponseError} from "../response/index.js"; import {RespStatus, RpcResponseStatusError} from "../interface.js"; @@ -46,7 +46,7 @@ type RequestErrorType = | {code: RequestErrorCode.RESP_TIMEOUT}; export interface IRequestErrorMetadata { - method: Method; + method: string; encoding: Encoding; peer: string; // Do not include requestId in error metadata to make the errors deterministic for tests diff --git a/packages/reqresp/src/request/index.ts b/packages/reqresp/src/request/index.ts index 7562d282079d..8311af70348e 100644 --- a/packages/reqresp/src/request/index.ts +++ b/packages/reqresp/src/request/index.ts @@ -4,12 +4,10 @@ import {Libp2p} from "libp2p"; import {Uint8ArrayList} from "uint8arraylist"; import {ErrorAborted, ILogger, withTimeout, TimeoutError} from "@lodestar/utils"; import {ProtocolDefinition} from "../types.js"; -import {formatProtocolID, prettyPrintPeerId, abortableSource} from "../utils/index.js"; +import {prettyPrintPeerId, abortableSource} from "../utils/index.js"; import {ResponseError} from "../response/index.js"; import {requestEncode} from "../encoders/requestEncode.js"; import {responseDecode} from "../encoders/responseDecode.js"; -import {timeoutOptions} from "../constants.js"; -import {collectResponses} from "./collectResponses.js"; import { RequestError, RequestErrorCode, @@ -20,6 +18,23 @@ import { export {RequestError, RequestErrorCode}; +// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration +export const DEFAULT_DIAL_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_TTFB_TIMEOUT = 5 * 1000; // 5 sec +export const DEFAULT_RESP_TIMEOUT = 10 * 1000; // 10 sec + +export interface SendRequestOpts { + /** The maximum time for complete response transfer. */ + respTimeoutMs?: number; + /** Non-spec timeout from sending request until write stream closed by responder */ + requestTimeoutMs?: number; + /** The maximum time to wait for first byte of request response (time-to-first-byte). */ + ttfbTimeoutMs?: number; + /** Non-spec timeout from dialing protocol until stream opened */ + dialTimeoutMs?: number; +} + type SendRequestModules = { logger: ILogger; libp2p: Libp2p; @@ -37,21 +52,25 @@ type SendRequestModules = { * - Any part of the response_chunk fails validation. Throws a typed error (see `SszSnappyError`) * - The maximum number of requested chunks are read. Does not throw, returns read chunks only. */ -export async function sendRequest( +export async function* sendRequest( {logger, libp2p, peerClient}: SendRequestModules, peerId: PeerId, protocols: ProtocolDefinition[], + protocolIDs: string[], requestBody: Req, - maxResponses: number, signal?: AbortSignal, - options?: Partial, + opts?: SendRequestOpts, requestId = 0 -): Promise { +): AsyncIterable { if (protocols.length === 0) { throw Error("sendRequest must set > 0 protocols"); } - const {REQUEST_TIMEOUT, DIAL_TIMEOUT} = {...timeoutOptions, ...options}; + const DIAL_TIMEOUT = opts?.dialTimeoutMs ?? DEFAULT_DIAL_TIMEOUT; + const REQUEST_TIMEOUT = opts?.requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT; + const TTFB_TIMEOUT = opts?.ttfbTimeoutMs ?? DEFAULT_TTFB_TIMEOUT; + const RESP_TIMEOUT = opts?.respTimeoutMs ?? DEFAULT_RESP_TIMEOUT; + const peerIdStr = peerId.toString(); const peerIdStrShort = prettyPrintPeerId(peerId); const {method, encoding} = protocols[0]; @@ -68,7 +87,7 @@ export async function sendRequest( // On stream negotiation `libp2p.dialProtocol` will pick the available protocol and return // the picked protocol in `connection.protocol` const protocolsMap = new Map( - protocols.map((protocol) => [formatProtocolID(protocol.method, protocol.version, protocol.encoding), protocol]) + protocols.map((protocol, i) => [protocolIDs[i], protocol]) ); // As of October 2020 we can't rely on libp2p.dialProtocol timeout to work so @@ -127,19 +146,15 @@ export async function sendRequest( logger.debug("Req request sent", logCtx); - const {TTFB_TIMEOUT, RESP_TIMEOUT} = {...timeoutOptions, ...options}; - // - TTFB_TIMEOUT: The requester MUST wait a maximum of TTFB_TIMEOUT for the first response byte to arrive // - RESP_TIMEOUT: Requester allows a further RESP_TIMEOUT for each subsequent response_chunk // - Max total timeout: This timeout is not required by the spec. It may not be necessary, but it's kept as // safe-guard to close. streams in case of bugs on other timeout mechanisms. const ttfbTimeoutController = new AbortController(); const respTimeoutController = new AbortController(); - const maxRTimeoutController = new AbortController(); const timeoutTTFB = setTimeout(() => ttfbTimeoutController.abort(), TTFB_TIMEOUT); let timeoutRESP: NodeJS.Timeout | null = null; - const timeoutMaxR = setTimeout(() => maxRTimeoutController.abort(), TTFB_TIMEOUT + maxResponses * RESP_TIMEOUT); const restartRespTimeout = (): void => { if (timeoutRESP) clearTimeout(timeoutRESP); @@ -148,7 +163,7 @@ export async function sendRequest( try { // Note: libp2p.stop() will close all connections, so not necessary to abort this pipe on parent stop - const responses = await pipe( + yield* pipe( abortableSource(stream.source as AsyncIterable, [ { signal: ttfbTimeoutController.signal, @@ -158,10 +173,6 @@ export async function sendRequest( signal: respTimeoutController.signal, getError: () => new RequestInternalError({code: RequestErrorCode.RESP_TIMEOUT}), }, - { - signal: maxRTimeoutController.signal, - getError: () => new RequestInternalError({code: RequestErrorCode.RESPONSE_TIMEOUT}), - }, ]), // Transforms `Buffer` chunks to yield `ResponseBody` chunks @@ -175,22 +186,16 @@ export async function sendRequest( // On , cancel this chunk's RESP_TIMEOUT and start next's restartRespTimeout(); }, - }), - - collectResponses(protocol, maxResponses) + }) ); // NOTE: Only log once per request to verbose, intermediate steps to debug // NOTE: Do not log the response, logs get extremely cluttered // NOTE: add double space after "Req " to align log with the "Resp " log - const numResponse = Array.isArray(responses) ? responses.length : 1; - logger.verbose("Req done", {...logCtx, numResponse}); - - return responses as Resp; + logger.verbose("Req done", {...logCtx}); } finally { clearTimeout(timeoutTTFB); if (timeoutRESP !== null) clearTimeout(timeoutRESP); - clearTimeout(timeoutMaxR); // Necessary to call `stream.close()` since collectResponses() may break out of the source before exhausting it // `stream.close()` libp2p-mplex will .end() the source (it-pushable instance) diff --git a/packages/reqresp/src/response/index.ts b/packages/reqresp/src/response/index.ts index f01ffce45d18..64315875751f 100644 --- a/packages/reqresp/src/response/index.ts +++ b/packages/reqresp/src/response/index.ts @@ -3,18 +3,19 @@ import {PeerId} from "@libp2p/interface-peer-id"; import {Stream} from "@libp2p/interface-connection"; import {Uint8ArrayList} from "uint8arraylist"; import {ILogger, TimeoutError, withTimeout} from "@lodestar/utils"; -import {REQUEST_TIMEOUT} from "../constants.js"; import {prettyPrintPeerId} from "../utils/index.js"; import {ProtocolDefinition} from "../types.js"; import {requestDecode} from "../encoders/requestDecode.js"; import {responseEncodeError, responseEncodeSuccess} from "../encoders/responseEncode.js"; -import {ReqRespHandlerContext, ReqRespHandlerProtocolContext, RespStatus} from "../interface.js"; +import {RespStatus} from "../interface.js"; import {ResponseError} from "./errors.js"; export {ResponseError}; -export interface HandleRequestOpts { - context: Context; +// Default spec values from https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#configuration +export const DEFAULT_REQUEST_TIMEOUT = 5 * 1000; // 5 sec + +export interface HandleRequestOpts { logger: ILogger; stream: Stream; peerId: PeerId; @@ -23,6 +24,8 @@ export interface HandleRequestOpts { requestId?: number; /** Peer client type for logging and metrics: 'prysm' | 'lighthouse' */ peerClient?: string; + /** Non-spec timeout from sending request until write stream closed by responder */ + requestTimeoutMs?: number; } /** @@ -35,8 +38,7 @@ export interface HandleRequestOpts { * 4a. Encode and write `` to peer * 4b. On error, encode and write an error `` and stop */ -export async function handleRequest({ - context, +export async function handleRequest({ logger, stream, peerId, @@ -44,7 +46,10 @@ export async function handleRequest): Promise { + requestTimeoutMs, +}: HandleRequestOpts): Promise { + const REQUEST_TIMEOUT = requestTimeoutMs ?? DEFAULT_REQUEST_TIMEOUT; + const logCtx = {method: protocol.method, client: peerClient, peer: prettyPrintPeerId(peerId), requestId}; let responseError: Error | null = null; @@ -70,7 +75,7 @@ export async function handleRequest logger.debug("Resp sending chunk", logCtx)), diff --git a/packages/reqresp/src/sharedTypes.ts b/packages/reqresp/src/sharedTypes.ts index 50be4ec9ac79..73255bbbeb26 100644 --- a/packages/reqresp/src/sharedTypes.ts +++ b/packages/reqresp/src/sharedTypes.ts @@ -1,11 +1,4 @@ -import {EventEmitter} from "events"; -import {PeerId} from "@libp2p/interface-peer-id"; -import StrictEventEmitter from "strict-event-emitter-types"; -import {ENR} from "@chainsafe/discv5"; -import {BitArray} from "@chainsafe/ssz"; -import {ForkName} from "@lodestar/params"; -import {allForks, altair, Epoch, phase0} from "@lodestar/types"; -import {Encoding, RequestTypedContainer} from "./types.js"; +import {Encoding} from "./types.js"; // These interfaces are shared among beacon-node package. export enum ScoreState { @@ -17,75 +10,12 @@ export enum ScoreState { Banned = "Banned", } -type PeerIdStr = string; - -export enum PeerAction { - /** Immediately ban peer */ - Fatal = "Fatal", - /** - * Not malicious action, but it must not be tolerated - * ~5 occurrences will get the peer banned - */ - LowToleranceError = "LowToleranceError", - /** - * Negative action that can be tolerated only sometimes - * ~10 occurrences will get the peer banned - */ - MidToleranceError = "MidToleranceError", - /** - * Some error that can be tolerated multiple times - * ~50 occurrences will get the peer banned - */ - HighToleranceError = "HighToleranceError", -} - -export interface IPeerRpcScoreStore { - getScore(peer: PeerId): number; - getScoreState(peer: PeerId): ScoreState; - applyAction(peer: PeerId, action: PeerAction, actionName: string): void; - update(): void; - updateGossipsubScore(peerId: PeerIdStr, newScore: number, ignore: boolean): void; -} - -export enum NetworkEvent { - /** A relevant peer has connected or has been re-STATUS'd */ - peerConnected = "peer-manager.peer-connected", - peerDisconnected = "peer-manager.peer-disconnected", - gossipStart = "gossip.start", - gossipStop = "gossip.stop", - gossipHeartbeat = "gossipsub.heartbeat", - reqRespRequest = "req-resp.request", - unknownBlockParent = "unknownBlockParent", -} - -export type NetworkEvents = { - [NetworkEvent.peerConnected]: (peer: PeerId, status: phase0.Status) => void; - [NetworkEvent.peerDisconnected]: (peer: PeerId) => void; - [NetworkEvent.reqRespRequest]: (request: RequestTypedContainer, peer: PeerId) => void; - [NetworkEvent.unknownBlockParent]: (signedBlock: allForks.SignedBeaconBlock, peerIdStr: string) => void; -}; - -export type INetworkEventBus = StrictEventEmitter; - export enum RelevantPeerStatus { Unknown = "unknown", relevant = "relevant", irrelevant = "irrelevant", } -export type PeerData = { - lastReceivedMsgUnixTsMs: number; - lastStatusUnixTsMs: number; - connectedUnixTsMs: number; - relevantStatus: RelevantPeerStatus; - direction: "inbound" | "outbound"; - peerId: PeerId; - metadata: altair.Metadata | null; - agentVersion: string | null; - agentClient: ClientKind | null; - encodingPreference: Encoding | null; -}; - export enum ClientKind { Lighthouse = "Lighthouse", Nimbus = "Nimbus", @@ -101,12 +31,3 @@ export interface PeersData { getEncodingPreference(peerIdStr: string): Encoding | null; setEncodingPreference(peerIdStr: string, encoding: Encoding): void; } - -export interface MetadataController { - seqNumber: bigint; - syncnets: BitArray; - attnets: BitArray; - json: altair.Metadata; - start(enr: ENR | undefined, currentFork: ForkName): void; - updateEth2Field(epoch: Epoch): void; -} diff --git a/packages/reqresp/src/types.ts b/packages/reqresp/src/types.ts index 0098d303e9c2..7ce98774fd40 100644 --- a/packages/reqresp/src/types.ts +++ b/packages/reqresp/src/types.ts @@ -1,11 +1,9 @@ import {PeerId} from "@libp2p/interface-peer-id"; import {Type} from "@chainsafe/ssz"; -import {IForkConfig, IForkDigestContext} from "@lodestar/config"; +import {IBeaconConfig, IForkConfig, IForkDigestContext} from "@lodestar/config"; import {ForkName} from "@lodestar/params"; -import {phase0, Slot} from "@lodestar/types"; +import {Slot} from "@lodestar/types"; import {LodestarError} from "@lodestar/utils"; -import {timeoutOptions} from "./constants.js"; -import {ReqRespHandlerContext, ReqRespHandlerProtocolContext} from "./interface.js"; export enum EncodedPayloadType { ssz, @@ -23,38 +21,28 @@ export type EncodedPayload = contextBytes: ContextBytes; }; -export type ReqRespHandlerWithContext< - Req, - Resp, - Context extends ReqRespHandlerProtocolContext = ReqRespHandlerContext -> = (context: Context, req: Req, peerId: PeerId) => AsyncIterable>; - export type ReqRespHandler = (req: Req, peerId: PeerId) => AsyncIterable>; -export interface ProtocolDefinition< - Req = unknown, - Resp = unknown, - Context extends ReqRespHandlerProtocolContext = ReqRespHandlerContext -> extends Protocol { - handler: ReqRespHandlerWithContext; +export interface ProtocolDefinition { + /** Protocol name identifier `beacon_blocks_by_range` or `status` */ + method: string; + /** Version counter: `1`, `2` etc */ + version: number; + encoding: Encoding; + handler: ReqRespHandler; // eslint-disable-next-line @typescript-eslint/no-explicit-any requestType: (fork: ForkName) => Type | null; // eslint-disable-next-line @typescript-eslint/no-explicit-any responseType: (fork: ForkName) => Type; renderRequestBody?: (request: Req) => string; contextBytes: ContextBytesFactory; - isSingleResponse: boolean; } -export type ProtocolDefinitionGenerator< - Req, - Res, - Context extends ReqRespHandlerProtocolContext = ReqRespHandlerContext -> = ( +export type ProtocolDefinitionGenerator = ( // "inboundRateLimiter" is available only on handler context not on generator - modules: Omit, - handler?: ReqRespHandler -) => ProtocolDefinition; + modules: {config: IBeaconConfig}, + handler: ReqRespHandler +) => ProtocolDefinition; export type HandlerTypeFromMessage = T extends ProtocolDefinitionGenerator ? ReqRespHandler @@ -62,46 +50,6 @@ export type HandlerTypeFromMessage = T extends ProtocolDefinitionGenerator = @@ -135,8 +77,6 @@ export enum ContextBytesType { ForkDigest, } -export type ReqRespOptions = typeof timeoutOptions; - export enum LightClientServerErrorCode { RESOURCE_UNAVAILABLE = "RESOURCE_UNAVAILABLE", } diff --git a/packages/reqresp/src/utils/assertSequentialBlocksInRange.ts b/packages/reqresp/src/utils/assertSequentialBlocksInRange.ts deleted file mode 100644 index a532bf15a4e5..000000000000 --- a/packages/reqresp/src/utils/assertSequentialBlocksInRange.ts +++ /dev/null @@ -1,57 +0,0 @@ -import {allForks, phase0} from "@lodestar/types"; -import {LodestarError} from "@lodestar/utils"; - -/** - * Asserts a response from BeaconBlocksByRange respects the request and is sequential - * Note: MUST allow missing block for skipped slots. - */ -export function assertSequentialBlocksInRange( - blocks: allForks.SignedBeaconBlock[], - {count, startSlot, step}: phase0.BeaconBlocksByRangeRequest -): void { - // Check below would throw for empty ranges - if (blocks.length === 0) { - return; - } - - const length = blocks.length; - if (length > count) { - throw new BlocksByRangeError({code: BlocksByRangeErrorCode.BAD_LENGTH, count, length}); - } - - const maxSlot = startSlot + count * (step || 1) - 1; - const firstSlot = blocks[0].message.slot; - const lastSlot = blocks[blocks.length - 1].message.slot; - - if (firstSlot < startSlot) { - throw new BlocksByRangeError({code: BlocksByRangeErrorCode.UNDER_START_SLOT, startSlot, firstSlot}); - } - - if (lastSlot > maxSlot) { - throw new BlocksByRangeError({code: BlocksByRangeErrorCode.OVER_MAX_SLOT, maxSlot, lastSlot}); - } - - // Assert sequential with request.step - for (let i = 0; i < blocks.length - 1; i++) { - const slotL = blocks[i].message.slot; - const slotR = blocks[i + 1].message.slot; - if (slotL + step > slotR) { - throw new BlocksByRangeError({code: BlocksByRangeErrorCode.BAD_SEQUENCE, step, slotL, slotR}); - } - } -} - -export enum BlocksByRangeErrorCode { - BAD_LENGTH = "BLOCKS_BY_RANGE_ERROR_BAD_LENGTH", - UNDER_START_SLOT = "BLOCKS_BY_RANGE_ERROR_UNDER_START_SLOT", - OVER_MAX_SLOT = "BLOCKS_BY_RANGE_ERROR_OVER_MAX_SLOT", - BAD_SEQUENCE = "BLOCKS_BY_RANGE_ERROR_BAD_SEQUENCE", -} - -type BlocksByRangeErrorType = - | {code: BlocksByRangeErrorCode.BAD_LENGTH; count: number; length: number} - | {code: BlocksByRangeErrorCode.UNDER_START_SLOT; startSlot: number; firstSlot: number} - | {code: BlocksByRangeErrorCode.OVER_MAX_SLOT; maxSlot: number; lastSlot: number} - | {code: BlocksByRangeErrorCode.BAD_SEQUENCE; step: number; slotL: number; slotR: number}; - -export class BlocksByRangeError extends LodestarError {} diff --git a/packages/reqresp/src/utils/collectExactOne.ts b/packages/reqresp/src/utils/collectExactOne.ts new file mode 100644 index 000000000000..6575c1034ac2 --- /dev/null +++ b/packages/reqresp/src/utils/collectExactOne.ts @@ -0,0 +1,15 @@ +import {RequestErrorCode, RequestInternalError} from "../request/errors.js"; + +/** + * Sink for `*`, from + * ```bnf + * response ::= * + * ``` + * Expects exactly one response + */ +export async function collectExactOne(source: AsyncIterable): Promise { + for await (const response of source) { + return response; + } + throw new RequestInternalError({code: RequestErrorCode.EMPTY_RESPONSE}); +} diff --git a/packages/reqresp/src/utils/collectMaxResponse.ts b/packages/reqresp/src/utils/collectMaxResponse.ts new file mode 100644 index 000000000000..42ce2c1c8611 --- /dev/null +++ b/packages/reqresp/src/utils/collectMaxResponse.ts @@ -0,0 +1,19 @@ +/** + * Sink for `*`, from + * ```bnf + * response ::= * + * ``` + * Collects a bounded list of responses up to `maxResponses` + */ +export async function collectMaxResponse(source: AsyncIterable, maxResponses: number): Promise { + // else: zero or more responses + const responses: T[] = []; + for await (const response of source) { + responses.push(response); + + if (maxResponses !== undefined && responses.length >= maxResponses) { + break; + } + } + return responses; +} diff --git a/packages/reqresp/src/utils/index.ts b/packages/reqresp/src/utils/index.ts index daabe0458abc..b454e4695673 100644 --- a/packages/reqresp/src/utils/index.ts +++ b/packages/reqresp/src/utils/index.ts @@ -1,8 +1,7 @@ -export * from "./assertSequentialBlocksInRange.js"; +export * from "./abortableSource.js"; export * from "./bufferedSource.js"; +export * from "./collectExactOne.js"; +export * from "./collectMaxResponse.js"; export * from "./errorMessage.js"; export * from "./onChunk.js"; -export * from "./protocolId.js"; export * from "./peerId.js"; -export * from "./abortableSource.js"; -export * from "./multifork.js"; diff --git a/packages/reqresp/src/utils/multifork.ts b/packages/reqresp/src/utils/multifork.ts deleted file mode 100644 index dccec9a90821..000000000000 --- a/packages/reqresp/src/utils/multifork.ts +++ /dev/null @@ -1,9 +0,0 @@ -import {Slot} from "@lodestar/types"; -import {bytesToInt} from "@lodestar/utils"; - -const SLOT_BYTES_POSITION_IN_BLOCK = 100; -const SLOT_BYTE_COUNT = 8; - -export function getSlotFromBytes(bytes: Buffer | Uint8Array): Slot { - return bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_BLOCK, SLOT_BYTES_POSITION_IN_BLOCK + SLOT_BYTE_COUNT)); -} diff --git a/packages/reqresp/src/utils/protocolId.ts b/packages/reqresp/src/utils/protocolId.ts deleted file mode 100644 index c88ba171f80e..000000000000 --- a/packages/reqresp/src/utils/protocolId.ts +++ /dev/null @@ -1,10 +0,0 @@ -import {Encoding, protocolPrefix} from "../types.js"; - -/** - * @param method `"beacon_blocks_by_range"` - * @param version `"1"` - * @param encoding `"ssz_snappy"` - */ -export function formatProtocolID(method: string, version: string, encoding: Encoding): string { - return `${protocolPrefix}/${method}/${version}/${encoding}`; -}