diff --git a/yarn-project/p2p/package.json b/yarn-project/p2p/package.json index bc72fd926e0..9fffdafad29 100644 --- a/yarn-project/p2p/package.json +++ b/yarn-project/p2p/package.json @@ -91,6 +91,7 @@ "libp2p": "1.5.0", "semver": "^7.6.0", "sha3": "^2.1.4", + "snappy": "^7.2.2", "tslib": "^2.4.0", "xxhash-wasm": "^1.1.0" }, diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index f0fc6cd2ecf..38127a88daf 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -148,7 +148,7 @@ export type ReqRespNode = { export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [PING_PROTOCOL]: pingHandler, [STATUS_PROTOCOL]: statusHandler, - [TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))), + [TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Buffer.from('tx')), }; // By default, all requests are valid diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts index 34a0c49a1db..0713b7e8a26 100644 --- a/yarn-project/p2p/src/service/encoding.ts +++ b/yarn-project/p2p/src/service/encoding.ts @@ -2,7 +2,9 @@ import { sha256 } from '@aztec/foundation/crypto'; import { type RPC } from '@chainsafe/libp2p-gossipsub/message'; +import { type DataTransform } from '@chainsafe/libp2p-gossipsub/types'; import { type Message } from '@libp2p/interface'; +import { compressSync, uncompressSync } from 'snappy'; import xxhashFactory from 'xxhash-wasm'; // Load WASM @@ -46,3 +48,14 @@ export function getMsgIdFn(message: Message) { const vec = [Buffer.from(topic), message.data]; return sha256(Buffer.concat(vec)).subarray(0, 20); } + +export class SnappyTransform implements DataTransform { + inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { + const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true })); + return new Uint8Array(uncompressed); + } + + outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { + return new Uint8Array(compressSync(Buffer.from(data))); + } +} diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 28251f9110c..c0666aa069d 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -43,7 +43,7 @@ import { } from '../tx_validator/index.js'; import { type PubSubLibp2p, convertToMultiaddr } from '../util.js'; import { AztecDatastore } from './data_store.js'; -import { fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; +import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from './encoding.js'; import { PeerManager } from './peer_manager.js'; import { PeerErrorSeverity } from './peer_scoring.js'; import { pingHandler, statusHandler } from './reqresp/handlers.js'; @@ -246,6 +246,7 @@ export class LibP2PService extends WithTracer implements P2PService { msgIdFn: getMsgIdFn, msgIdToStrFn: msgIdToStrFn, fastMsgIdFn: fastMsgIdFn, + dataTransform: new SnappyTransform(), metricsRegister: otelMetricsAdapter, metricsTopicStrToLabel: metricsTopicStrToLabels(), scoreParams: createPeerScoreParams({ @@ -282,11 +283,11 @@ export class LibP2PService extends WithTracer implements P2PService { * @param msg - the tx request message * @returns the tx response message */ - const txHandler = (msg: Buffer): Promise => { + const txHandler = (msg: Buffer): Promise => { const txHash = TxHash.fromBuffer(msg); const foundTx = mempools.txPool.getTxByHash(txHash); - const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0)); - return Promise.resolve(asUint8Array); + const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0); + return Promise.resolve(buf); }; const requestResponseHandlers = { diff --git a/yarn-project/p2p/src/service/reqresp/handlers.ts b/yarn-project/p2p/src/service/reqresp/handlers.ts index 688fab959e3..20a9163f88e 100644 --- a/yarn-project/p2p/src/service/reqresp/handlers.ts +++ b/yarn-project/p2p/src/service/reqresp/handlers.ts @@ -3,8 +3,8 @@ * @param _msg - The ping request message. * @returns A resolved promise with the pong response. */ -export function pingHandler(_msg: any): Promise { - return Promise.resolve(Uint8Array.from(Buffer.from('pong'))); +export function pingHandler(_msg: any): Promise { + return Promise.resolve(Buffer.from('pong')); } /** @@ -12,6 +12,6 @@ export function pingHandler(_msg: any): Promise { * @param _msg - The status request message. * @returns A resolved promise with the ok response. */ -export function statusHandler(_msg: any): Promise { - return Promise.resolve(Uint8Array.from(Buffer.from('ok'))); +export function statusHandler(_msg: any): Promise { + return Promise.resolve(Buffer.from('ok')); } diff --git a/yarn-project/p2p/src/service/reqresp/interface.ts b/yarn-project/p2p/src/service/reqresp/interface.ts index 8370b8a8a21..e23608c3665 100644 --- a/yarn-project/p2p/src/service/reqresp/interface.ts +++ b/yarn-project/p2p/src/service/reqresp/interface.ts @@ -16,7 +16,7 @@ export type ReqRespSubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL | * A handler for a sub protocol * The message will arrive as a buffer, and the handler must return a buffer */ -export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; +export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; /** * A type mapping from supprotocol to it's rate limits @@ -83,8 +83,8 @@ export type SubProtocolMap = { * Default handler for unimplemented sub protocols, this SHOULD be overwritten * by the service, but is provided as a fallback */ -const defaultHandler = (_msg: any): Promise => { - return Promise.resolve(Uint8Array.from(Buffer.from('unimplemented'))); +const defaultHandler = (_msg: any): Promise => { + return Promise.resolve(Buffer.from('unimplemented')); }; /** diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 1807a318522..349b3a8f6b5 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -8,6 +8,7 @@ import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../ import { MOCK_SUB_PROTOCOL_HANDLERS, MOCK_SUB_PROTOCOL_VALIDATORS, + type ReqRespNode, connectToPeers, createNodes, startNodes, @@ -23,15 +24,22 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { let peerManager: MockProxy; + let nodes: ReqRespNode[]; beforeEach(() => { peerManager = mock(); }); + afterEach(async () => { + if (nodes) { + await stopNodes(nodes as ReqRespNode[]); + } + }); + it('Should perform a ping request', async () => { // Create two nodes // They need to discover each other - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; await startNodes(nodes); @@ -45,12 +53,10 @@ describe('ReqResp', () => { await sleep(500); expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); - - await stopNodes(nodes); }); it('Should handle gracefully if a peer connected peer is offline', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); const { req: pinger } = nodes[0]; const { req: ponger } = nodes[1]; @@ -66,12 +72,10 @@ describe('ReqResp', () => { const res = await pinger.sendRequest(PING_PROTOCOL, PING_REQUEST); expect(res).toBeUndefined(); - - await stopNodes(nodes); }); it('Should request from a later peer if other peers are offline', async () => { - const nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerManager, 4); await startNodes(nodes); await sleep(500); @@ -86,12 +90,10 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST); expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); - - await stopNodes(nodes); }); it('Should hit a rate limit if too many requests are made in quick succession', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -110,8 +112,6 @@ describe('ReqResp', () => { // Make sure the error message is logged const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`; expect(loggerSpy).toHaveBeenCalledWith(errorMessage); - - await stopNodes(nodes); }); describe('TX REQ PROTOCOL', () => { @@ -120,15 +120,15 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { + protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { - return Promise.resolve(Uint8Array.from(tx.toBuffer())); + return Promise.resolve(tx.toBuffer()); } - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + return Promise.resolve(Buffer.from('')); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -137,8 +137,6 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); expect(res).toEqual(tx); - - await stopNodes(nodes); }); it('Does not crash if tx hash returns undefined', async () => { @@ -147,11 +145,11 @@ describe('ReqResp', () => { const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; // Return nothing - protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { + return Promise.resolve(Buffer.from('')); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -160,12 +158,10 @@ describe('ReqResp', () => { const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); expect(res).toBeUndefined(); - - await stopNodes(nodes); }); it('Should hit individual timeout if nothing is returned over the stream', async () => { - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes); @@ -197,12 +193,10 @@ describe('ReqResp', () => { }), PeerErrorSeverity.HighToleranceError, ); - - await stopNodes(nodes); }); it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { - const nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerManager, 4); await startNodes(nodes); @@ -226,8 +220,6 @@ describe('ReqResp', () => { // Make sure the error message is logged const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`; expect(loggerSpy).toHaveBeenCalledWith(errorMessage); - - await stopNodes(nodes); }); it('Should penalize peer if transaction validation fails', async () => { @@ -236,12 +228,12 @@ describe('ReqResp', () => { // Mock that the node will respond with the tx const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { + protocolHandlers[TX_REQ_PROTOCOL] = (message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { - return Promise.resolve(Uint8Array.from(tx.toBuffer())); + return Promise.resolve(tx.toBuffer()); } - return Promise.resolve(Uint8Array.from(Buffer.from(''))); + return Promise.resolve(Buffer.from('')); }; // Mock that the receiving node will find that the transaction is invalid @@ -251,7 +243,7 @@ describe('ReqResp', () => { return Promise.resolve(false); }; - const nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerManager, 2); await startNodes(nodes, protocolHandlers, protocolValidators); await sleep(500); @@ -268,8 +260,6 @@ describe('ReqResp', () => { }), PeerErrorSeverity.LowToleranceError, ); - - await stopNodes(nodes); }); }); }); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index a2249015c2f..9d67de5c367 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -5,6 +5,7 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; import { type Libp2p } from 'libp2p'; +import { compressSync, uncompressSync } from 'snappy'; import { type Uint8ArrayList } from 'uint8arraylist'; import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; @@ -31,6 +32,9 @@ import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js'; * This service implements the request response sub protocol, it is heavily inspired from * ethereum implementations of the same name. * + * Note, responses get compressed in streamHandler + * so they get decompressed in readMessage + * * see: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain */ export class ReqResp { @@ -232,7 +236,7 @@ export class ReqResp { chunks.push(chunk.subarray()); } const messageData = chunks.concat(); - return Buffer.concat(messageData); + return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer; } /** @@ -269,7 +273,8 @@ export class ReqResp { async function* (source: any) { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); - yield handler(msg); + const response = await handler(msg); + yield new Uint8Array(compressSync(response)); } }, stream, diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 732f75683f7..0a9dde8c978 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -919,6 +919,7 @@ __metadata: libp2p: 1.5.0 semver: ^7.6.0 sha3: ^2.1.4 + snappy: ^7.2.2 ts-node: ^10.9.1 tslib: ^2.4.0 typescript: ^5.0.4 @@ -3402,6 +3403,97 @@ __metadata: languageName: node linkType: hard +"@napi-rs/snappy-android-arm-eabi@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-android-arm-eabi@npm:7.2.2" + conditions: os=android & cpu=arm + languageName: node + linkType: hard + +"@napi-rs/snappy-android-arm64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-android-arm64@npm:7.2.2" + conditions: os=android & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-darwin-arm64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-darwin-arm64@npm:7.2.2" + conditions: os=darwin & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-darwin-x64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-darwin-x64@npm:7.2.2" + conditions: os=darwin & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-freebsd-x64@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-freebsd-x64@npm:7.2.2" + conditions: os=freebsd & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm-gnueabihf@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm-gnueabihf@npm:7.2.2" + conditions: os=linux & cpu=arm + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm64-gnu@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm64-gnu@npm:7.2.2" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-arm64-musl@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-arm64-musl@npm:7.2.2" + conditions: os=linux & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-x64-gnu@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-x64-gnu@npm:7.2.2" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-linux-x64-musl@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-linux-x64-musl@npm:7.2.2" + conditions: os=linux & cpu=x64 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-arm64-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-arm64-msvc@npm:7.2.2" + conditions: os=win32 & cpu=arm64 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-ia32-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-ia32-msvc@npm:7.2.2" + conditions: os=win32 & cpu=ia32 + languageName: node + linkType: hard + +"@napi-rs/snappy-win32-x64-msvc@npm:7.2.2": + version: 7.2.2 + resolution: "@napi-rs/snappy-win32-x64-msvc@npm:7.2.2" + conditions: os=win32 & cpu=x64 + languageName: node + linkType: hard + "@noble/ciphers@npm:^0.4.0": version: 0.4.1 resolution: "@noble/ciphers@npm:0.4.1" @@ -16312,6 +16404,54 @@ __metadata: languageName: node linkType: hard +"snappy@npm:^7.2.2": + version: 7.2.2 + resolution: "snappy@npm:7.2.2" + dependencies: + "@napi-rs/snappy-android-arm-eabi": 7.2.2 + "@napi-rs/snappy-android-arm64": 7.2.2 + "@napi-rs/snappy-darwin-arm64": 7.2.2 + "@napi-rs/snappy-darwin-x64": 7.2.2 + "@napi-rs/snappy-freebsd-x64": 7.2.2 + "@napi-rs/snappy-linux-arm-gnueabihf": 7.2.2 + "@napi-rs/snappy-linux-arm64-gnu": 7.2.2 + "@napi-rs/snappy-linux-arm64-musl": 7.2.2 + "@napi-rs/snappy-linux-x64-gnu": 7.2.2 + "@napi-rs/snappy-linux-x64-musl": 7.2.2 + "@napi-rs/snappy-win32-arm64-msvc": 7.2.2 + "@napi-rs/snappy-win32-ia32-msvc": 7.2.2 + "@napi-rs/snappy-win32-x64-msvc": 7.2.2 + dependenciesMeta: + "@napi-rs/snappy-android-arm-eabi": + optional: true + "@napi-rs/snappy-android-arm64": + optional: true + "@napi-rs/snappy-darwin-arm64": + optional: true + "@napi-rs/snappy-darwin-x64": + optional: true + "@napi-rs/snappy-freebsd-x64": + optional: true + "@napi-rs/snappy-linux-arm-gnueabihf": + optional: true + "@napi-rs/snappy-linux-arm64-gnu": + optional: true + "@napi-rs/snappy-linux-arm64-musl": + optional: true + "@napi-rs/snappy-linux-x64-gnu": + optional: true + "@napi-rs/snappy-linux-x64-musl": + optional: true + "@napi-rs/snappy-win32-arm64-msvc": + optional: true + "@napi-rs/snappy-win32-ia32-msvc": + optional: true + "@napi-rs/snappy-win32-x64-msvc": + optional: true + checksum: cc6ee627d32325c3b3a7220f57bf7f87906372431072b77dfacf5d875a21c54043df8d6f328eadf8d58bda3d9bb558b3f00e1daaa757441cfa1ec20004f715f1 + languageName: node + linkType: hard + "sockjs@npm:^0.3.24": version: 0.3.24 resolution: "sockjs@npm:0.3.24"