Skip to content

Commit

Permalink
Merge 1a636ea into d74d0fc
Browse files Browse the repository at this point in the history
  • Loading branch information
Maddiaa0 authored Dec 13, 2024
2 parents d74d0fc + 1a636ea commit 87f117d
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 27 deletions.
16 changes: 15 additions & 1 deletion yarn-project/p2p/src/errors/reqresp.error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* This error will be thrown when a request to a specific peer times out.
* @category Errors
*/
export class IndiviualReqRespTimeoutError extends Error {
export class IndividualReqRespTimeoutError extends Error {
constructor() {
super(`Request to peer timed out`);
}
Expand All @@ -19,3 +19,17 @@ export class CollectiveReqRespTimeoutError extends Error {
super(`Request to all peers timed out`);
}
}

/** Invalid response error
*
* This error will be thrown when a response is received that is not valid.
*
* This error does not need to be punished as message validators will handle punishing invalid
* requests
* @category Errors
*/
export class InvalidResponseError extends Error {
constructor() {
super(`Invalid response received`);
}
}
24 changes: 21 additions & 3 deletions yarn-project/p2p/src/service/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,31 @@ export function getMsgIdFn(message: Message) {
return sha256(Buffer.concat(vec)).subarray(0, 20);
}

/**
* Snappy transform for libp2p gossipsub
*/
export class SnappyTransform implements DataTransform {
// Topic string included to satisfy DataTransform interface
inboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
const uncompressed = Buffer.from(uncompressSync(Buffer.from(data), { asBuffer: true }));
return new Uint8Array(uncompressed);
return this.inboundTransformNoTopic(Buffer.from(data));
}

public inboundTransformNoTopic(data: Buffer): Buffer {
if (data.length === 0) {
return data;
}
return Buffer.from(uncompressSync(data, { asBuffer: true }));
}

// Topic string included to satisfy DataTransform interface
outboundTransform(_topicStr: string, data: Uint8Array): Uint8Array {
return new Uint8Array(compressSync(Buffer.from(data)));
return this.outboundTransformNoTopic(Buffer.from(data));
}

public outboundTransformNoTopic(data: Buffer): Buffer {
if (data.length === 0) {
return data;
}
return Buffer.from(compressSync(data));
}
}
59 changes: 52 additions & 7 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep';
import { describe, expect, it, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { CollectiveReqRespTimeoutError, IndividualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import {
MOCK_SUB_PROTOCOL_HANDLERS,
MOCK_SUB_PROTOCOL_VALIDATORS,
Expand Down Expand Up @@ -86,9 +86,27 @@ describe('ReqResp', () => {
void nodes[1].req.stop();
void nodes[2].req.stop();

const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

// send from the first node
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, PING_REQUEST);

// We expect the logger to have been called twice with the peer ids citing the inability to connect
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`Connection reset: ${nodes[1].p2p.peerId.toString()}`),
{
peerId: nodes[1].p2p.peerId.toString(),
subProtocol: PING_PROTOCOL,
},
);
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringContaining(`Connection reset: ${nodes[2].p2p.peerId.toString()}`),
{
peerId: nodes[2].p2p.peerId.toString(),
subProtocol: PING_PROTOCOL,
},
);

expect(res?.toBuffer().toString('utf-8')).toEqual('pong');
});

Expand All @@ -111,7 +129,7 @@ describe('ReqResp', () => {

// Make sure the error message is logged
const errorMessage = `Rate limit exceeded for ${PING_PROTOCOL} from ${nodes[0].p2p.peerId.toString()}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);
expect(loggerSpy).toHaveBeenCalledWith(expect.stringContaining(errorMessage));
});

describe('Tx req protocol', () => {
Expand Down Expand Up @@ -139,6 +157,29 @@ describe('ReqResp', () => {
expect(res).toEqual(tx);
});

it('Handle returning empty buffers', async () => {
const tx = mockTx();
const txHash = tx.getTxHash();

const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS;
protocolHandlers[TX_REQ_PROTOCOL] = (_message: Buffer): Promise<Buffer> => {
return Promise.resolve(Buffer.alloc(0));
};

nodes = await createNodes(peerManager, 2);

const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer');

await startNodes(nodes, protocolHandlers);
await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, txHash);
expect(spySendRequestToPeer).toHaveBeenCalledTimes(1);
expect(res).toEqual(undefined);
});

it('Does not crash if tx hash returns undefined', async () => {
const tx = mockTx();
const txHash = tx.getTxHash();
Expand Down Expand Up @@ -170,7 +211,7 @@ describe('ReqResp', () => {
});

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

await sleep(500);
await connectToPeers(nodes);
Expand All @@ -183,9 +224,13 @@ describe('ReqResp', () => {
// Make sure the error message is logged
const peerId = nodes[1].p2p.peerId.toString();
expect(loggerSpy).toHaveBeenCalledWith(
expect.stringMatching(/Error sending request to peer/i),
expect.any(Error),
{ peerId, subProtocol: '/aztec/req/tx/0.1.0' },
`Timeout error: ${
new IndividualReqRespTimeoutError().message
} | peerId: ${peerId} | subProtocol: ${TX_REQ_PROTOCOL}`,
expect.objectContaining({
peerId: peerId,
subProtocol: TX_REQ_PROTOCOL,
}),
);

// Expect the peer to be penalized for timing out
Expand All @@ -209,7 +254,7 @@ describe('ReqResp', () => {
}

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'debug');

await sleep(500);
await connectToPeers(nodes);
Expand Down
98 changes: 82 additions & 16 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import { executeTimeoutWithCustomError } from '@aztec/foundation/timer';
import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { compressSync, uncompressSync } from 'snappy';
import { type Uint8ArrayList } from 'uint8arraylist';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import {
CollectiveReqRespTimeoutError,
IndividualReqRespTimeoutError,
InvalidResponseError,
} from '../../errors/reqresp.error.js';
import { SnappyTransform } from '../encoding.js';
import { type PeerManager } from '../peer_manager.js';
import { PeerErrorSeverity } from '../peer_scoring.js';
import { type P2PReqRespConfig } from './config.js';
Expand Down Expand Up @@ -49,13 +53,16 @@ export class ReqResp {

private rateLimiter: RequestResponseRateLimiter;

private snappyTransform: SnappyTransform;

constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) {
this.logger = createLogger('p2p:reqresp');

this.overallRequestTimeoutMs = config.overallRequestTimeoutMs;
this.individualRequestTimeoutMs = config.individualRequestTimeoutMs;

this.rateLimiter = new RequestResponseRateLimiter(peerManager);
this.snappyTransform = new SnappyTransform();
}

/**
Expand Down Expand Up @@ -143,8 +150,7 @@ export class ReqResp {
// The response validator handles peer punishment within
const isValid = await responseValidator(request, object, peer);
if (!isValid) {
this.logger.error(`Invalid response for ${subProtocol} from ${peer.toString()}`);
return undefined;
throw new InvalidResponseError();
}
return object;
}
Expand All @@ -159,7 +165,7 @@ export class ReqResp {
() => new CollectiveReqRespTimeoutError(),
);
} catch (e: any) {
this.logger.error(`${e.message} | subProtocol: ${subProtocol}`);
this.logger.debug(`${e.message} | subProtocol: ${subProtocol}`);
return undefined;
}
}
Expand Down Expand Up @@ -200,18 +206,14 @@ export class ReqResp {

// Open the stream with a timeout
const result = await executeTimeoutWithCustomError<Buffer>(
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage),
(): Promise<Buffer> => pipe([payload], stream!, this.readMessage.bind(this)),
this.individualRequestTimeoutMs,
() => new IndiviualReqRespTimeoutError(),
() => new IndividualReqRespTimeoutError(),
);

await stream.close();
this.logger.trace(`Stream closed with ${peerId.toString()} for ${subProtocol}`);

return result;
} catch (e: any) {
this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol });
this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError);
this.handleResponseError(e, peerId, subProtocol);
} finally {
if (stream) {
try {
Expand All @@ -224,7 +226,70 @@ export class ReqResp {
}
}
}
return undefined;
}

/**
* Handle a response error
*
* ReqResp errors are punished differently depending on the severity of the offense
*
* @param e - The error
* @param peerId - The peer id
* @param subProtocol - The sub protocol
* @returns If the error is non pubishable, then undefined is returned, otherwise the peer is penalized
*/
private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void {
const severity = this.categorizeError(e, peerId, subProtocol);
if (severity) {
this.peerManager.penalizePeer(peerId, severity);
}
}

/**
* Categorize the error and log it.
*/
private categorizeError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): PeerErrorSeverity | undefined {
// Non pubishable errors
// We do not punish a collective timeout, as the node triggers this interupt, independent of the peer's behaviour
const logTags = {
peerId: peerId.toString(),
subProtocol,
};
if (e instanceof CollectiveReqRespTimeoutError || e instanceof InvalidResponseError) {
this.logger.debug(
`Non-punishable error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`,
logTags,
);
return undefined;
}

// Pubishable errors
// Connection reset errors in the networking stack are punished with high severity
// it just signals an unreliable peer
// We assume that the requesting node has a functioning networking stack.
if (e?.code === 'ECONNRESET' || e?.code === 'EPIPE') {
this.logger.debug(`Connection reset: ${peerId.toString()}`, logTags);
return PeerErrorSeverity.HighToleranceError;
}

if (e?.code === 'ECONNREFUSED') {
this.logger.debug(`Connection refused: ${peerId.toString()}`, logTags);
return PeerErrorSeverity.HighToleranceError;
}

// Timeout errors are punished with high tolerance, they can be due to a geogrpahically far away peer or an
// overloaded peer
if (e instanceof IndividualReqRespTimeoutError) {
this.logger.debug(
`Timeout error: ${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`,
logTags,
);
return PeerErrorSeverity.HighToleranceError;
}

// Catch all error
this.logger.error(`Unexpected error sending request to peer`, e, logTags);
return PeerErrorSeverity.HighToleranceError;
}

/**
Expand All @@ -235,8 +300,8 @@ export class ReqResp {
for await (const chunk of source) {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return uncompressSync(Buffer.concat(messageData), { asBuffer: true }) as Buffer;
const messageData = Buffer.concat(chunks);
return this.snappyTransform.inboundTransformNoTopic(messageData);
}

/**
Expand Down Expand Up @@ -266,6 +331,7 @@ export class ReqResp {
}

const handler = this.subProtocolHandlers[protocol];
const transform = this.snappyTransform;

try {
await pipe(
Expand All @@ -274,7 +340,7 @@ export class ReqResp {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray());
const response = await handler(msg);
yield new Uint8Array(compressSync(response));
yield new Uint8Array(transform.outboundTransformNoTopic(response));
}
},
stream,
Expand Down

0 comments on commit 87f117d

Please sign in to comment.