diff --git a/yarn-project/p2p/src/errors/reqresp.error.ts b/yarn-project/p2p/src/errors/reqresp.error.ts index a31973d3e67..21749b7473d 100644 --- a/yarn-project/p2p/src/errors/reqresp.error.ts +++ b/yarn-project/p2p/src/errors/reqresp.error.ts @@ -3,7 +3,7 @@ * This error will be thrown when a request to a specific peer times out. * @category Errors */ -export class IndiviualReqRespTimeoutError extends Error { +export class IndividualReqRespTimeoutError extends Error { constructor() { super(`Request to peer timed out`); } @@ -19,3 +19,17 @@ export class CollectiveReqRespTimeoutError extends Error { super(`Request to all peers timed out`); } } + +/** Invalid response error + * + * This error will be thrown when a response is received that is not valid. + * + * This error does not need to be punished as message validators will handle punishing invalid + * requests + * @category Errors + */ +export class InvalidResponseError extends Error { + constructor() { + super(`Invalid response received`); + } +} diff --git a/yarn-project/p2p/src/service/encoding.ts b/yarn-project/p2p/src/service/encoding.ts index 0713b7e8a26..3d9e41f7db3 100644 --- a/yarn-project/p2p/src/service/encoding.ts +++ b/yarn-project/p2p/src/service/encoding.ts @@ -49,13 +49,31 @@ export function getMsgIdFn(message: Message) { return sha256(Buffer.concat(vec)).subarray(0, 20); } +/** + * Snappy transform for libp2p gossipsub + */ export class SnappyTransform implements DataTransform { + // Topic string included to satisfy DataTransform interface inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { - const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true })); - return new Uint8Array(uncompressed); + return this.inboundTransformNoTopic(Buffer.from(data)); + } + + public inboundTransformNoTopic(data: Buffer): Buffer { + if (data.length === 0) { + return data; + } + return Buffer.from(uncompressSync(data, { asBuffer: true })); } + // Topic string included to satisfy DataTransform interface outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array { - return new Uint8Array(compressSync(Buffer.from(data))); + return this.outboundTransformNoTopic(Buffer.from(data)); + } + + public outboundTransformNoTopic(data: Buffer): Buffer { + if (data.length === 0) { + return data; + } + return Buffer.from(compressSync(data)); } } diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index f7091c2d5eb..0caeda85991 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; -import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js'; +import { CollectiveReqRespTimeoutError, IndividualReqRespTimeoutError } from '../../errors/reqresp.error.js'; import { MOCK_SUB_PROTOCOL_HANDLERS, MOCK_SUB_PROTOCOL_VALIDATORS, @@ -86,9 +86,27 @@ describe('ReqResp', () => { void nodes[1].req.stop(); void nodes[2].req.stop(); + const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); + // send from the first node const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST); + // We expect the logger to have been called twice with the peer ids citing the inability to connect + expect(loggerSpy).toHaveBeenCalledWith( + expect.stringContaining(`Connection reset: ${nodes[1].p2p.peerId.toString()}`), + { + peerId: nodes[1].p2p.peerId.toString(), + subProtocol: PING_PROTOCOL, + }, + ); + expect(loggerSpy).toHaveBeenCalledWith( + expect.stringContaining(`Connection reset: ${nodes[2].p2p.peerId.toString()}`), + { + peerId: nodes[2].p2p.peerId.toString(), + subProtocol: PING_PROTOCOL, + }, + ); + expect(res?.toBuffer().toString('utf-8')).toEqual('pong'); }); @@ -111,7 +129,7 @@ describe('ReqResp', () => { // Make sure the error message is logged const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`; - expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage)); }); describe('Tx req protocol', () => { @@ -139,6 +157,29 @@ describe('ReqResp', () => { expect(res).toEqual(tx); }); + it('Handle returning empty buffers', async () => { + const tx = mockTx(); + const txHash = tx.getTxHash(); + + const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; + protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise => { + return Promise.resolve(Buffer.alloc(0)); + }; + + nodes = await createNodes(peerManager, 2); + + const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); + + await startNodes(nodes, protocolHandlers); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash); + expect(spySendRequestToPeer).toHaveBeenCalledTimes(1); + expect(res).toEqual(undefined); + }); + it('Does not crash if tx hash returns undefined', async () => { const tx = mockTx(); const txHash = tx.getTxHash(); @@ -170,7 +211,7 @@ describe('ReqResp', () => { }); // Spy on the logger to make sure the error message is logged - const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error'); + const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); await sleep(500); await connectToPeers(nodes); @@ -183,9 +224,13 @@ describe('ReqResp', () => { // Make sure the error message is logged const peerId = nodes[1].p2p.peerId.toString(); expect(loggerSpy).toHaveBeenCalledWith( - expect.stringMatching(/Error sending request to peer/i), - expect.any(Error), - { peerId, subProtocol: '/aztec/req/tx/0.1.0' }, + `Timeout error: ${ + new IndividualReqRespTimeoutError().message + } | peerId: ${peerId} | subProtocol: ${TX_REQ_PROTOCOL}`, + expect.objectContaining({ + peerId: peerId, + subProtocol: TX_REQ_PROTOCOL, + }), ); // Expect the peer to be penalized for timing out @@ -209,7 +254,7 @@ describe('ReqResp', () => { } // Spy on the logger to make sure the error message is logged - const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error'); + const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug'); await sleep(500); await connectToPeers(nodes); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index e6794c6fecf..43aa2cfa75d 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -5,10 +5,14 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; import { type Libp2p } from 'libp2p'; -import { compressSync, uncompressSync } from 'snappy'; import { type Uint8ArrayList } from 'uint8arraylist'; -import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; +import { + CollectiveReqRespTimeoutError, + IndividualReqRespTimeoutError, + InvalidResponseError, +} from '../../errors/reqresp.error.js'; +import { SnappyTransform } from '../encoding.js'; import { type PeerManager } from '../peer_manager.js'; import { PeerErrorSeverity } from '../peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; @@ -49,6 +53,8 @@ export class ReqResp { private rateLimiter: RequestResponseRateLimiter; + private snappyTransform: SnappyTransform; + constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) { this.logger = createLogger('p2p:reqresp'); @@ -56,6 +62,7 @@ export class ReqResp { this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; this.rateLimiter = new RequestResponseRateLimiter(peerManager); + this.snappyTransform = new SnappyTransform(); } /** @@ -143,8 +150,7 @@ export class ReqResp { // The response validator handles peer punishment within const isValid = await responseValidator(request, object, peer); if (!isValid) { - this.logger.error(`Invalid response for ${subProtocol} from ${peer.toString()}`); - return undefined; + throw new InvalidResponseError(); } return object; } @@ -159,7 +165,7 @@ export class ReqResp { () => new CollectiveReqRespTimeoutError(), ); } catch (e: any) { - this.logger.error(`${e.message} | subProtocol: ${subProtocol}`); + this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`); return undefined; } } @@ -200,18 +206,14 @@ export class ReqResp { // Open the stream with a timeout const result = await executeTimeoutWithCustomError( - (): Promise => pipe([payload], stream!, this.readMessage), + (): Promise => pipe([payload], stream!, this.readMessage.bind(this)), this.individualRequestTimeoutMs, - () => new IndiviualReqRespTimeoutError(), + () => new IndividualReqRespTimeoutError(), ); - await stream.close(); - this.logger.trace(`Stream closed with ${peerId.toString()} for ${subProtocol}`); - return result; } catch (e: any) { - this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol }); - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); + this.handleResponseError(e, peerId, subProtocol); } finally { if (stream) { try { @@ -224,7 +226,70 @@ export class ReqResp { } } } - return undefined; + } + + /** + * Handle a response error + * + * ReqResp errors are punished differently depending on the severity of the offense + * + * @param e - The error + * @param peerId - The peer id + * @param subProtocol - The sub protocol + * @returns If the error is non pubishable, then undefined is returned, otherwise the peer is penalized + */ + private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void { + const severity = this.categorizeError(e, peerId, subProtocol); + if (severity) { + this.peerManager.penalizePeer(peerId, severity); + } + } + + /** + * Categorize the error and log it. + */ + private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined { + // Non pubishable errors + // We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour + const logTags = { + peerId: peerId.toString(), + subProtocol, + }; + if (e instanceof CollectiveReqRespTimeoutError || e instanceof InvalidResponseError) { + this.logger.debug( + `Non-punishable error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`, + logTags, + ); + return undefined; + } + + // Pubishable errors + // Connection reset errors in the networking stack are punished with high severity + // it just signals an unreliable peer + // We assume that the requesting node has a functioning networking stack. + if (e?.code === 'ECONNRESET' || e?.code === 'EPIPE') { + this.logger.debug(`Connection reset: ${peerId.toString()}`, logTags); + return PeerErrorSeverity.HighToleranceError; + } + + if (e?.code === 'ECONNREFUSED') { + this.logger.debug(`Connection refused: ${peerId.toString()}`, logTags); + return PeerErrorSeverity.HighToleranceError; + } + + // Timeout errors are punished with high tolerance, they can be due to a geogrpahically far away peer or an + // overloaded peer + if (e instanceof IndividualReqRespTimeoutError) { + this.logger.debug( + `Timeout error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`, + logTags, + ); + return PeerErrorSeverity.HighToleranceError; + } + + // Catch all error + this.logger.error(`Unexpected error sending request to peer`, e, logTags); + return PeerErrorSeverity.HighToleranceError; } /** @@ -235,8 +300,8 @@ export class ReqResp { for await (const chunk of source) { chunks.push(chunk.subarray()); } - const messageData = chunks.concat(); - return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer; + const messageData = Buffer.concat(chunks); + return this.snappyTransform.inboundTransformNoTopic(messageData); } /** @@ -266,6 +331,7 @@ export class ReqResp { } const handler = this.subProtocolHandlers[protocol]; + const transform = this.snappyTransform; try { await pipe( @@ -274,7 +340,7 @@ export class ReqResp { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); const response = await handler(msg); - yield new Uint8Array(compressSync(response)); + yield new Uint8Array(transform.outboundTransformNoTopic(response)); } }, stream,