From 57c470955ae6cec058f3be342b6fefc31e65dd38 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:59:57 +0000 Subject: [PATCH 1/6] chore: use gossipsub msgid fn --- .../circuit-types/src/p2p/block_proposal.ts | 4 ++-- yarn-project/circuit-types/src/p2p/interface.ts | 12 ++++++++++++ yarn-project/p2p/src/service/libp2p_service.ts | 15 ++++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/yarn-project/circuit-types/src/p2p/block_proposal.ts b/yarn-project/circuit-types/src/p2p/block_proposal.ts index 8e64f4c1fd9..5d2ad7f7f46 100644 --- a/yarn-project/circuit-types/src/p2p/block_proposal.ts +++ b/yarn-project/circuit-types/src/p2p/block_proposal.ts @@ -1,5 +1,5 @@ import { Buffer32 } from '@aztec/foundation/buffer'; -import { recoverAddress } from '@aztec/foundation/crypto'; +import { keccak256, recoverAddress } from '@aztec/foundation/crypto'; import { type EthAddress } from '@aztec/foundation/eth-address'; import { Signature } from '@aztec/foundation/eth-signature'; import { type Fr } from '@aztec/foundation/fields'; @@ -42,7 +42,7 @@ export class BlockProposal extends Gossipable { } override p2pMessageIdentifier(): Buffer32 { - return BlockProposalHash.fromField(this.payload.archive); + return new BlockProposalHash(keccak256(this.signature.toBuffer())); } get archive(): Fr { diff --git a/yarn-project/circuit-types/src/p2p/interface.ts b/yarn-project/circuit-types/src/p2p/interface.ts index 1e252250399..06a02794602 100644 --- a/yarn-project/circuit-types/src/p2p/interface.ts +++ b/yarn-project/circuit-types/src/p2p/interface.ts @@ -17,3 +17,15 @@ export const TopicTypeMap: Record = { [TopicType.block_attestation]: BlockAttestation as unknown as typeof Gossipable, [TopicType.epoch_proof_quote]: EpochProofQuote as unknown as typeof Gossipable, }; + +/** + * Map from topic to deserialiser + * + * Used in msgIdFn libp2p to get the p2pMessageIdentifier from a message + */ +export const TopicToDeserializer = { + [Tx.p2pTopic]: Tx.fromBuffer, + [BlockProposal.p2pTopic]: BlockProposal.fromBuffer, + [BlockAttestation.p2pTopic]: BlockAttestation.fromBuffer, + [EpochProofQuote.p2pTopic]: EpochProofQuote.fromBuffer, +}; diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 18d2d180a4a..c8efda096fb 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -7,6 +7,7 @@ import { type L2BlockSource, MerkleTreeId, type RawGossipMessage, + TopicToDeserializer, TopicType, TopicTypeMap, Tx, @@ -27,7 +28,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; import { identify } from '@libp2p/identify'; -import type { PeerId } from '@libp2p/interface'; +import type { Message, PeerId } from '@libp2p/interface'; import '@libp2p/kad-dht'; import { mplex } from '@libp2p/mplex'; import { tcp } from '@libp2p/tcp'; @@ -242,6 +243,7 @@ export class LibP2PService extends WithTracer implements P2PService { heartbeatInterval: config.gossipsubInterval, mcacheLength: config.gossipsubMcacheLength, mcacheGossip: config.gossipsubMcacheGossip, + msgIdFn: getMsgIdFn, metricsRegister: otelMetricsAdapter, metricsTopicStrToLabel: metricsTopicStrToLabels(), scoreParams: createPeerScoreParams({ @@ -598,3 +600,14 @@ export class LibP2PService extends WithTracer implements P2PService { } } } + +function getMsgIdFn(message: Message) { + if (message.topic in TopicToDeserializer) { + // 1. Get deserialiser based on the topic + // 2. Deserialise message + // 3. Get p2pMessageIdentifier from deserialised message + return Uint8Array.from(TopicToDeserializer[message.topic](Buffer.from(message.data)).p2pMessageIdentifier().buffer); + } + + throw new Error(`No deserializer found for topic ${message.topic}`); +} From b853c006c498708827ce53eecef077009672192b Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:20:43 +0000 Subject: [PATCH 2/6] fix: closer to lodestar --- yarn-project/p2p/package.json | 3 +- yarn-project/p2p/src/service/encoding.ts | 48 +++++++++++++++++++ .../p2p/src/service/libp2p_service.ts | 17 ++----- yarn-project/yarn.lock | 8 ++++ 4 files changed, 62 insertions(+), 14 deletions(-) create mode 100644 yarn-project/p2p/src/service/encoding.ts diff --git a/yarn-project/p2p/package.json b/yarn-project/p2p/package.json index a458dcc94a5..bc72fd926e0 100644 --- a/yarn-project/p2p/package.json +++ b/yarn-project/p2p/package.json @@ -91,7 +91,8 @@ "libp2p": "1.5.0", "semver": "^7.6.0", "sha3": "^2.1.4", - "tslib": "^2.4.0" + "tslib": "^2.4.0", + "xxhash-wasm": "^1.1.0" }, "devDependencies": { "@aztec/archiver": "workspace:^", diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts new file mode 100644 index 00000000000..40403cb6f43 --- /dev/null +++ b/yarn-project/p2p/src/service/encoding.ts @@ -0,0 +1,48 @@ +// Taken from lodestar: https://github.com/ChainSafe/lodestar + +import { type Message } from '@libp2p/interface'; +import { sha256 } from '@aztec/foundation/crypto'; +import {RPC} from "@chainsafe/libp2p-gossipsub/message"; +import xxhashFactory from "xxhash-wasm"; + +// Load WASM +const xxhash = await xxhashFactory(); + +// Use salt to prevent msgId from being mined for collisions +const h64Seed = BigInt(Math.floor(Math.random() * 1e9)); + +// Shared buffer to convert msgId to string +const sharedMsgIdBuf = Buffer.alloc(20); + +/** + * The function used to generate a gossipsub message id + * We use the first 8 bytes of SHA256(data) for content addressing + */ +export function fastMsgIdFn(rpcMsg: RPC.Message): string { + if (rpcMsg.data) { + return xxhash.h64Raw(rpcMsg.data, h64Seed).toString(16); + } + return "0000000000000000"; +} + +export function msgIdToStrFn(msgId: Uint8Array): string { + // This happens serially, no need to reallocate the buffer + sharedMsgIdBuf.set(msgId); + return `0x${sharedMsgIdBuf.toString("hex")}`; +} + +/** + * Get the message identifier from a libp2p message + * + * Follows similarly to: + * https://github.com/ethereum/consensus-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages + * + * @param message - The libp2p message + * @returns The message identifier + */ +export function getMsgIdFn(message: Message) { + const { topic } = message; + + const vec = [Buffer.from(topic), message.data]; + return sha256(Buffer.concat(vec)).subarray(0, 20); +} diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index c8efda096fb..921b605bffe 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -7,7 +7,6 @@ import { type L2BlockSource, MerkleTreeId, type RawGossipMessage, - TopicToDeserializer, TopicType, TopicTypeMap, Tx, @@ -28,7 +27,7 @@ import { createPeerScoreParams, createTopicScoreParams } from '@chainsafe/libp2p import { noise } from '@chainsafe/libp2p-noise'; import { yamux } from '@chainsafe/libp2p-yamux'; import { identify } from '@libp2p/identify'; -import type { Message, PeerId } from '@libp2p/interface'; +import type { PeerId } from '@libp2p/interface'; import '@libp2p/kad-dht'; import { mplex } from '@libp2p/mplex'; import { tcp } from '@libp2p/tcp'; @@ -59,6 +58,7 @@ import { } from './reqresp/interface.js'; import { ReqResp } from './reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from './service.js'; +import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; /** * Lib P2P implementation of the P2PService interface. @@ -244,6 +244,8 @@ export class LibP2PService extends WithTracer implements P2PService { mcacheLength: config.gossipsubMcacheLength, mcacheGossip: config.gossipsubMcacheGossip, msgIdFn: getMsgIdFn, + msgIdToStrFn: msgIdToStrFn, + fastMsgIdFn: fastMsgIdFn, metricsRegister: otelMetricsAdapter, metricsTopicStrToLabel: metricsTopicStrToLabels(), scoreParams: createPeerScoreParams({ @@ -600,14 +602,3 @@ export class LibP2PService extends WithTracer implements P2PService { } } } - -function getMsgIdFn(message: Message) { - if (message.topic in TopicToDeserializer) { - // 1. Get deserialiser based on the topic - // 2. Deserialise message - // 3. Get p2pMessageIdentifier from deserialised message - return Uint8Array.from(TopicToDeserializer[message.topic](Buffer.from(message.data)).p2pMessageIdentifier().buffer); - } - - throw new Error(`No deserializer found for topic ${message.topic}`); -} diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index c53310ab30d..732f75683f7 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -924,6 +924,7 @@ __metadata: typescript: ^5.0.4 uint8arrays: ^5.0.3 viem: ^2.7.15 + xxhash-wasm: ^1.1.0 languageName: unknown linkType: soft @@ -18628,6 +18629,13 @@ __metadata: languageName: node linkType: hard +"xxhash-wasm@npm:^1.1.0": + version: 1.1.0 + resolution: "xxhash-wasm@npm:1.1.0" + checksum: 2ccecb3b1dac5fefe11002d5ff5d106bbb5b506f9ee817ecf1bda65e132ebff3c82701c6727df3cb90b94a6dc1d8b294337678606f2304bcb0fd6b8dc68afe0d + languageName: node + linkType: hard + "y18n@npm:^5.0.5": version: 5.0.8 resolution: "y18n@npm:5.0.8" From ccfe03d6cb6440d5fadbb174d547e3eac10cefcc Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:21:09 +0000 Subject: [PATCH 3/6] fmt --- yarn-project/p2p/src/service/encoding.ts | 10 +++++----- yarn-project/p2p/src/service/libp2p_service.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts index 40403cb6f43..34a0c49a1db 100644 --- a/yarn-project/p2p/src/service/encoding.ts +++ b/yarn-project/p2p/src/service/encoding.ts @@ -1,9 +1,9 @@ // Taken from lodestar: https://github.com/ChainSafe/lodestar +import { sha256 } from '@aztec/foundation/crypto'; +import { type RPC } from '@chainsafe/libp2p-gossipsub/message'; import { type Message } from '@libp2p/interface'; -import { sha256 } from '@aztec/foundation/crypto'; -import {RPC} from "@chainsafe/libp2p-gossipsub/message"; -import xxhashFactory from "xxhash-wasm"; +import xxhashFactory from 'xxhash-wasm'; // Load WASM const xxhash = await xxhashFactory(); @@ -22,13 +22,13 @@ export function fastMsgIdFn(rpcMsg: RPC.Message): string { if (rpcMsg.data) { return xxhash.h64Raw(rpcMsg.data, h64Seed).toString(16); } - return "0000000000000000"; + return '0000000000000000'; } export function msgIdToStrFn(msgId: Uint8Array): string { // This happens serially, no need to reallocate the buffer sharedMsgIdBuf.set(msgId); - return `0x${sharedMsgIdBuf.toString("hex")}`; + return `0x${sharedMsgIdBuf.toString('hex')}`; } /** diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 921b605bffe..28251f9110c 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -43,6 +43,7 @@ import { } from '../tx_validator/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; +import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; import { PeerManager } from './peer_manager.js'; import { PeerErrorSeverity } from './peer_scoring.js'; import { pingHandler, statusHandler } from './reqresp/handlers.js'; @@ -58,7 +59,6 @@ import { } from './reqresp/interface.js'; import { ReqResp } from './reqresp/reqresp.js'; import type { P2PService, PeerDiscoveryService } from './service.js'; -import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; /** * Lib P2P implementation of the P2PService interface. From 3b33ce00a6d1bb5e206c3b64291be51c28d18e37 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:34:43 +0000 Subject: [PATCH 4/6] chore: add snappy --- yarn-project/p2p/package.json | 1 + yarn-project/p2p/src/service/encoding.ts | 13 ++ .../p2p/src/service/libp2p_service.ts | 3 +- yarn-project/yarn.lock | 140 ++++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) diff --git a/yarn-project/p2p/package.json b/yarn-project/p2p/package.json index bc72fd926e0..9fffdafad29 100644 --- a/yarn-project/p2p/package.json +++ b/yarn-project/p2p/package.json @@ -91,6 +91,7 @@ "libp2p": "1.5.0", "semver": "^7.6.0", "sha3": "^2.1.4", + "snappy": "^7.2.2", "tslib": "^2.4.0", "xxhash-wasm": "^1.1.0" }, diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts index 34a0c49a1db..be032043943 100644 --- a/yarn-project/p2p/src/service/encoding.ts +++ b/yarn-project/p2p/src/service/encoding.ts @@ -2,8 +2,10 @@ import { sha256 } from '@aztec/foundation/crypto'; import { type RPC } from '@chainsafe/libp2p-gossipsub/message'; +import { DataTransform } from '@chainsafe/libp2p-gossipsub/types'; import { type Message } from '@libp2p/interface'; import xxhashFactory from 'xxhash-wasm'; +import { compressSync, uncompressSync } from 'snappy'; // Load WASM const xxhash = await xxhashFactory(); @@ -46,3 +48,14 @@ export function getMsgIdFn(message: Message) { const vec = [Buffer.from(topic), message.data]; return sha256(Buffer.concat(vec)).subarray(0, 20); } + +export class SnappyTransform implements DataTransform { + inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { + const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true })); + return new Uint8Array(uncompressed); + } + + outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { + return new Uint8Array(compressSync(Buffer.from(data))); + } + } diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 28251f9110c..78839cf4915 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -43,7 +43,7 @@ import { } from '../tx_validator/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; -import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; +import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn, SnappyTransform } from './encoding.js'; import { PeerManager } from './peer_manager.js'; import { PeerErrorSeverity } from './peer_scoring.js'; import { pingHandler, statusHandler } from './reqresp/handlers.js'; @@ -246,6 +246,7 @@ export class LibP2PService extends WithTracer implements P2PService { msgIdFn: getMsgIdFn, msgIdToStrFn: msgIdToStrFn, fastMsgIdFn: fastMsgIdFn, + dataTransform: new SnappyTransform(), metricsRegister: otelMetricsAdapter, metricsTopicStrToLabel: metricsTopicStrToLabels(), scoreParams: createPeerScoreParams({ diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 732f75683f7..0a9dde8c978 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -919,6 +919,7 @@ __metadata: libp2p: 1.5.0 semver: ^7.6.0 sha3: ^2.1.4 + snappy: ^7.2.2 ts-node: ^10.9.1 tslib: ^2.4.0 typescript: ^5.0.4 @@ -3402,6 +3403,97 @@ __metadata: languageName: node linkType: hard +"@napi-rs/snappy-android-arm-eabi@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-android-arm-eabi@npm:7.2.2" + conditions: os=android & cpu=arm + languageName: node + linkType: hard + +"@napi-rs/snappy-android-arm64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-android-arm64@npm:7.2.2" + conditions: os=android & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-darwin-arm64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-darwin-arm64@npm:7.2.2" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-darwin-x64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-darwin-x64@npm:7.2.2" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-freebsd-x64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-freebsd-x64@npm:7.2.2" + conditions: os=freebsd & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm-gnueabihf@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm-gnueabihf@npm:7.2.2" + conditions: os=linux & cpu=arm + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm64-gnu@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm64-gnu@npm:7.2.2" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm64-musl@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm64-musl@npm:7.2.2" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-x64-gnu@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-x64-gnu@npm:7.2.2" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-x64-musl@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-x64-musl@npm:7.2.2" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-arm64-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-arm64-msvc@npm:7.2.2" + conditions: os=win32 & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-ia32-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-ia32-msvc@npm:7.2.2" + conditions: os=win32 & cpu=ia32 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-x64-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-x64-msvc@npm:7.2.2" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + "@noble/ciphers@npm:^0.4.0": version: 0.4.1 resolution: "@noble/ciphers@npm:0.4.1" @@ -16312,6 +16404,54 @@ __metadata: languageName: node linkType: hard +"snappy@npm:^7.2.2": + version: 7.2.2 + resolution: "snappy@npm:7.2.2" + dependencies: + "@napi-rs/snappy-android-arm-eabi": 7.2.2 + "@napi-rs/snappy-android-arm64": 7.2.2 + "@napi-rs/snappy-darwin-arm64": 7.2.2 + "@napi-rs/snappy-darwin-x64": 7.2.2 + "@napi-rs/snappy-freebsd-x64": 7.2.2 + "@napi-rs/snappy-linux-arm-gnueabihf": 7.2.2 + "@napi-rs/snappy-linux-arm64-gnu": 7.2.2 + "@napi-rs/snappy-linux-arm64-musl": 7.2.2 + "@napi-rs/snappy-linux-x64-gnu": 7.2.2 + "@napi-rs/snappy-linux-x64-musl": 7.2.2 + "@napi-rs/snappy-win32-arm64-msvc": 7.2.2 + "@napi-rs/snappy-win32-ia32-msvc": 7.2.2 + "@napi-rs/snappy-win32-x64-msvc": 7.2.2 + dependenciesMeta: + "@napi-rs/snappy-android-arm-eabi": + optional: true + "@napi-rs/snappy-android-arm64": + optional: true + "@napi-rs/snappy-darwin-arm64": + optional: true + "@napi-rs/snappy-darwin-x64": + optional: true + "@napi-rs/snappy-freebsd-x64": + optional: true + "@napi-rs/snappy-linux-arm-gnueabihf": + optional: true + "@napi-rs/snappy-linux-arm64-gnu": + optional: true + "@napi-rs/snappy-linux-arm64-musl": + optional: true + "@napi-rs/snappy-linux-x64-gnu": + optional: true + "@napi-rs/snappy-linux-x64-musl": + optional: true + "@napi-rs/snappy-win32-arm64-msvc": + optional: true + "@napi-rs/snappy-win32-ia32-msvc": + optional: true + "@napi-rs/snappy-win32-x64-msvc": + optional: true + checksum: cc6ee627d32325c3b3a7220f57bf7f87906372431072b77dfacf5d875a21c54043df8d6f328eadf8d58bda3d9bb558b3f00e1daaa757441cfa1ec20004f715f1 + languageName: node + linkType: hard + "sockjs@npm:^0.3.24": version: 0.3.24 resolution: "sockjs@npm:0.3.24" From d6d023c62cdd3a6adeace848cb5a4c316e0d9d7e Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:35:59 +0000 Subject: [PATCH 5/6] fmt --- yarn-project/p2p/src/service/encoding.ts | 10 +++++----- yarn-project/p2p/src/service/libp2p_service.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts index be032043943..0713b7e8a26 100644 --- a/yarn-project/p2p/src/service/encoding.ts +++ b/yarn-project/p2p/src/service/encoding.ts @@ -2,10 +2,10 @@ import { sha256 } from '@aztec/foundation/crypto'; import { type RPC } from '@chainsafe/libp2p-gossipsub/message'; -import { DataTransform } from '@chainsafe/libp2p-gossipsub/types'; +import { type DataTransform } from '@chainsafe/libp2p-gossipsub/types'; import { type Message } from '@libp2p/interface'; -import xxhashFactory from 'xxhash-wasm'; import { compressSync, uncompressSync } from 'snappy'; +import xxhashFactory from 'xxhash-wasm'; // Load WASM const xxhash = await xxhashFactory(); @@ -55,7 +55,7 @@ export class SnappyTransform implements DataTransform { return new Uint8Array(uncompressed); } - outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { - return new Uint8Array(compressSync(Buffer.from(data))); - } + outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { + return new Uint8Array(compressSync(Buffer.from(data))); } +} diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 78839cf4915..c7c354c9a37 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -43,7 +43,7 @@ import { } from '../tx_validator/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; -import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn, SnappyTransform } from './encoding.js'; +import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; import { PeerManager } from './peer_manager.js'; import { PeerErrorSeverity } from './peer_scoring.js'; import { pingHandler, statusHandler } from './reqresp/handlers.js'; From c4a2b0b0b4ff80b3b8d4e033fac9e48e3e48aa32 Mon Sep 17 00:00:00 2001 From: Maddiaa0 <47148561+Maddiaa0@users.noreply.github.com> Date: Thu, 5 Dec 2024 13:04:55 +0000 Subject: [PATCH 6/6] feat: reqresp compression --- yarn-project/p2p/src/mocks/index.ts | 2 +- .../p2p/src/service/libp2p_service.ts | 6 +- .../p2p/src/service/reqresp/handlers.ts | 8 +-- .../p2p/src/service/reqresp/interface.ts | 6 +- .../p2p/src/service/reqresp/reqresp.test.ts | 60 ++++++++----------- .../p2p/src/service/reqresp/reqresp.ts | 9 ++- 6 files changed, 43 insertions(+), 48 deletions(-) diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index f0fc6cd2ecf..38127a88daf 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -148,7 +148,7 @@ export type ReqRespNode = { export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [PING_PROTOCOL]: pingHandler, [STATUS_PROTOCOL]: statusHandler, - [TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))), + [TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')), }; // By default, all requests are valid diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index c7c354c9a37..c0666aa069d 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -283,11 +283,11 @@ export class LibP2PService extends WithTracer implements P2PService { * @param msg - the tx request message * @returns the tx response message */ - const txHandler = (msg: Buffer): Promise => { + const txHandler = (msg: Buffer): Promise => { const txHash = TxHash.fromBuffer(msg); const foundTx = mempools.txPool.getTxByHash(txHash); - const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0)); - return Promise.resolve(asUint8Array); + const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0); + return Promise.resolve(buf); }; const requestResponseHandlers = { diff --git a/yarn-project/p2p/src/service/reqresp/handlers.ts b/yarn-project/p2p/src/service/reqresp/handlers.ts index 688fab959e3..20a9163f88e 100644 --- a/yarn-project/p2p/src/service/reqresp/handlers.ts +++ b/yarn-project/p2p/src/service/reqresp/handlers.ts @@ -3,8 +3,8 @@ * @param _msg - The ping request message. * @returns A resolved promise with the pong response. */ -export function pingHandler(_msg: any): Promise { - return Promise.resolve(Uint8Array.from(Buffer.from('pong'))); +export function pingHandler(_msg: any): Promise { + return Promise.resolve(Buffer.from('pong')); } /** @@ -12,6 +12,6 @@ export function pingHandler(_msg: any): Promise { * @param _msg - The status request message. * @returns A resolved promise with the ok response. */ -export function statusHandler(_msg: any): Promise { - return Promise.resolve(Uint8Array.from(Buffer.from('ok'))); +export function statusHandler(_msg: any): Promise { + return Promise.resolve(Buffer.from('ok')); } diff --git a/yarn-project/p2p/src/service/reqresp/interface.ts b/yarn-project/p2p/src/service/reqresp/interface.ts index 8370b8a8a21..e23608c3665 100644 --- a/yarn-project/p2p/src/service/reqresp/interface.ts +++ b/yarn-project/p2p/src/service/reqresp/interface.ts @@ -16,7 +16,7 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL | * A handler for a sub protocol * The message will arrive as a buffer, and the handler must return a buffer */ -export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; +export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; /** * A type mapping from supprotocol to it's rate limits @@ -83,8 +83,8 @@ export type SubProtocolMap = { * Default handler for unimplemented sub protocols, this SHOULD be overwritten * by the service, but is provided as a fallback */ -const defaultHandler = (_msg: any): Promise => { - return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented'))); +const defaultHandler = (_msg: any): Promise => { + return Promise.resolve(Buffer.from('unimplemented')); }; /** diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 1807a318522..349b3a8f6b5 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -8,6 +8,7 @@ import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../ import { MOCK_SUB_PROTOCOL_HANDLERS, MOCK_SUB_PROTOCOL_VALIDATORS, + type ReqRespNode, connectToPeers, createNodes, startNodes, @@ -23,15 +24,22 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { let peerManager: MockProxy; + let nodes: ReqRespNode[]; beforeEach(() => { peerManager = mock(); }); + afterEach(async () => { + if (nodes) { + await stopNodes(nodes as ReqRespNode[]); + } + }); + it('Should perform a ping request', async () => { // Create two nodes // They need to discover each other - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; await startNodes(nodes); @@ -45,12 +53,10 @@ describe('ReqResp', () => { await sleep(500); expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); - - await stopNodes(nodes); }); it('Should handle gracefully if a peer connected peer is offline', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; const { req: ponger } = nodes[1]; @@ -66,12 +72,10 @@ describe('ReqResp', () => { const res = await pinger.sendRequest(PING_PROTOCOL, PING_REQUEST); expect(res).toBeUndefined(); - - await stopNodes(nodes); }); it('Should request from a later peer if other peers are offline', async () => { - const nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerManager, 4); await startNodes(nodes); await sleep(500); @@ -86,12 +90,10 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST); expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); - - await stopNodes(nodes); }); it('Should hit a rate limit if too many requests are made in quick succession', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -110,8 +112,6 @@ describe('ReqResp', () => { // Make sure the error message is logged const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`; expect(loggerSpy).toHaveBeenCalledWith(errorMessage); - - await stopNodes(nodes); }); describe('TX REQ PROTOCOL', () => { @@ -120,15 +120,15 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { + protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { - return Promise.resolve(Uint8Array.from(tx.toBuffer())); + return Promise.resolve(tx.toBuffer()); } - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + return Promise.resolve(Buffer.from('')); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -137,8 +137,6 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); expect(res).toEqual(tx); - - await stopNodes(nodes); }); it('Does not crash if tx hash returns undefined', async () => { @@ -147,11 +145,11 @@ describe('ReqResp', () => { const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; // Return nothing - protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { + return Promise.resolve(Buffer.from('')); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -160,12 +158,10 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); expect(res).toBeUndefined(); - - await stopNodes(nodes); }); it('Should hit individual timeout if nothing is returned over the stream', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -197,12 +193,10 @@ describe('ReqResp', () => { }), PeerErrorSeverity.HighToleranceError, ); - - await stopNodes(nodes); }); it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { - const nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerManager, 4); await startNodes(nodes); @@ -226,8 +220,6 @@ describe('ReqResp', () => { // Make sure the error message is logged const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`; expect(loggerSpy).toHaveBeenCalledWith(errorMessage); - - await stopNodes(nodes); }); it('Should penalize peer if transaction validation fails', async () => { @@ -236,12 +228,12 @@ describe('ReqResp', () => { // Mock that the node will respond with the tx const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { + protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { - return Promise.resolve(Uint8Array.from(tx.toBuffer())); + return Promise.resolve(tx.toBuffer()); } - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + return Promise.resolve(Buffer.from('')); }; // Mock that the receiving node will find that the transaction is invalid @@ -251,7 +243,7 @@ describe('ReqResp', () => { return Promise.resolve(false); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers, protocolValidators); await sleep(500); @@ -268,8 +260,6 @@ describe('ReqResp', () => { }), PeerErrorSeverity.LowToleranceError, ); - - await stopNodes(nodes); }); }); }); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index a2249015c2f..9d67de5c367 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -5,6 +5,7 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; import { type Libp2p } from 'libp2p'; +import { compressSync, uncompressSync } from 'snappy'; import { type Uint8ArrayList } from 'uint8arraylist'; import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; @@ -31,6 +32,9 @@ import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js'; * This service implements the request response sub protocol, it is heavily inspired from * ethereum implementations of the same name. * + * Note, responses get compressed in streamHandler + * so they get decompressed in readMessage + * * see: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain */ export class ReqResp { @@ -232,7 +236,7 @@ export class ReqResp { chunks.push(chunk.subarray()); } const messageData = chunks.concat(); - return Buffer.concat(messageData); + return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer; } /** @@ -269,7 +273,8 @@ export class ReqResp { async function* (source: any) { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); - yield handler(msg); + const response = await handler(msg); + yield new Uint8Array(compressSync(response)); } }, stream,