Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): request response skeleton #8076

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function pingHandler(_msg: any) {
return Uint8Array.from(Buffer.from('pong'));
}

export function statusHandler(_msg: any) {
return Uint8Array.from(Buffer.from('ok'));
}
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/service/reqresp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Request Response protocol allows nodes to ask their peers for data
* that they missed via the traditional gossip protocol.
*/
13 changes: 13 additions & 0 deletions yarn-project/p2p/src/service/reqresp/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export enum ReqRespType {
Status = 'status',
Ping = 'ping',
/** Ask peers for specific transactions */
TxsByHash = 'txs_by_hash',
}

export const PING_PROTOCOL = '/aztec/ping/0.1.0';
export const STATUS_PROTOCOL = '/aztec/status/0.1.0';

export type SubProtocol = typeof PING_PROTOCOL | typeof STATUS_PROTOCOL;

export type SubProtocolHandler = (msg: string) => Uint8Array;
155 changes: 155 additions & 0 deletions yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { sleep } from '@aztec/foundation/sleep';

import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { PING_PROTOCOL } from './interface.js';
import { ReqResp } from './reqresp.js';

/**
* Creates a libp2p node, pre configured.
* @param boostrapAddrs - an optional list of bootstrap addresses
* @returns Lip2p node
*/
async function createLibp2pNode(boostrapAddrs: string[] = []): Promise<Libp2p> {
const options: Libp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
transports: [tcp()],
};

if (boostrapAddrs.length > 0) {
options.peerDiscovery = [
bootstrap({
list: boostrapAddrs,
}),
];
}

return await createLibp2p(options);
}

/**
* A p2p / req resp node pairing the req node will always contain the p2p node.
* they are provided as a pair to allow access the p2p node directly
*/
type ReqRespNode = {
p2p: Libp2p;
req: ReqResp;
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
};

const startNodes = async (nodes: ReqRespNode[]) => {
for (const node of nodes) {
await node.req.start();
}
};

const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
await node.req.stop();
await node.p2p.stop();
}
};

// Create a req resp node, exposing the underlying p2p node
const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
return {
p2p,
req,
};
};

// Given a node list; hand shake all of the nodes with each other
const connectToPeers = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
for (const otherNode of nodes) {
if (node === otherNode) {
continue;
}
const addr = otherNode.p2p.getMultiaddrs()[0];
await node.p2p.dial(addr);
}
}
};

// The Req Resp protocol should allow nodes to dial specific peers
// and ask for specific data that they missed via the traditional gossip protocol.
describe('ReqResp', () => {
it('Should perform a ping request', async () => {
// Create two nodes
// They need to discover each other
const nodes = await createNodes(2);
const { req: pinger } = nodes[0];

await startNodes(nodes);

// connect the nodes
await connectToPeers(nodes);

await sleep(500);

const res = await pinger.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

await sleep(500);
expect(res?.toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});

it('Should handle gracefully if a peer connected peer is offline', async () => {
const nodes = await createNodes(2);

const { req: pinger } = nodes[0];
const { req: ponger } = nodes[1];
await startNodes(nodes);

// connect the nodes
await connectToPeers(nodes);
await sleep(500);

void ponger.stop();

// It should return undefined if it cannot dial the peer
const res = await pinger.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

expect(res).toBeUndefined();

await stopNodes(nodes);
});

it('Should request from a later peer if other peers are offline', async () => {
const nodes = await createNodes(4);

await startNodes(nodes);
await sleep(500);
await connectToPeers(nodes);
await sleep(500);

// Stop the second middle two nodes
void nodes[1].req.stop();
void nodes[2].req.stop();

// send from the first node
const res = await nodes[0].req.sendRequest(PING_PROTOCOL, Buffer.from('ping'));

expect(res?.toString('utf-8')).toEqual('pong');

await stopNodes(nodes);
});
});
130 changes: 130 additions & 0 deletions yarn-project/p2p/src/service/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// @attribution: lodestar impl for inspiration
import { type Logger, createDebugLogger } from '@aztec/foundation/log';

import { type IncomingStreamData, type PeerId } from '@libp2p/interface';
import { pipe } from 'it-pipe';
import { type Libp2p } from 'libp2p';
import { type Uint8ArrayList } from 'uint8arraylist';

import { pingHandler, statusHandler } from './handlers.js';
import { PING_PROTOCOL, STATUS_PROTOCOL, SubProtocol, SubProtocolHandler } from './interface.js';

/**
* A mapping from a protocol to a handler function
*/
const SUB_PROTOCOL_HANDLERS: Record<SubProtocol, SubProtocolHandler> = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
};

export class ReqResp {
protected readonly logger: Logger;

private abortController: AbortController = new AbortController();

constructor(protected readonly libp2p: Libp2p) {
this.logger = createDebugLogger('aztec:p2p:reqresp');
}

/**
* Start the reqresp service
*/
async start() {
// Register all protocol handlers
for (const subProtocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.handle(subProtocol, this.streamHandler.bind(this, subProtocol as SubProtocol));
}
}

/**
* Stop the reqresp service
*/
async stop() {
// Unregister all handlers
for (const protocol of Object.keys(SUB_PROTOCOL_HANDLERS)) {
await this.libp2p.unhandle(protocol);
}
await this.libp2p.stop();
this.abortController.abort();
}

/**
* Send a request to peers, returns the first response
*
* @param subProtocol - The protocol being requested
* @param payload - The payload to send
* @returns - The response from the peer, otherwise undefined
*/
async sendRequest(subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
// Get active peers
const peers = this.libp2p.getPeers();

// Attempt to ask all of our peers
for (const peer of peers) {
const response = await this.sendRequestToPeer(peer, subProtocol, payload);

// If we get a response, return it, otherwise we iterate onto the next peer
if (response) {
return response;
}
}
return undefined;
}

/**
* Sends a request to a specific peer
*
* @param peerId - The peer to send the request to
* @param subProtocol - The protocol to use to request
* @param payload - The payload to send
* @returns If the request is successful, the response is returned, otherwise undefined
*/
async sendRequestToPeer(peerId: PeerId, subProtocol: SubProtocol, payload: Buffer): Promise<Buffer | undefined> {
try {
const stream = await this.libp2p.dialProtocol(peerId, subProtocol);

const result = await pipe([payload], stream, this.readMessage);
return result;
} catch (e) {
this.logger.warn(`Failed to send request to peer ${peerId.publicKey}`);
return undefined;
}
}

/**
* Read a message returned from a stream into a single buffer
*/
private async readMessage(source: AsyncIterable<Uint8ArrayList>): Promise<Buffer> {
const chunks: Uint8Array[] = [];
for await (const chunk of source) {
chunks.push(chunk.subarray());
}
const messageData = chunks.concat();
return Buffer.concat(messageData);
}

/**
* Stream Handler
* Reads the incoming stream, determines the protocol, then triggers the appropriate handler
*
* @param param0 - The incoming stream data
*/
private async streamHandler(protocol: SubProtocol, { stream }: IncomingStreamData) {
try {
await pipe(
stream,
async function* (source) {
for await (const chunkList of source) {
const msg = Buffer.from(chunkList.subarray()).toString();
yield SUB_PROTOCOL_HANDLERS[protocol](msg);
}
},
stream,
);
} catch (e: any) {
this.logger.warn(e);
} finally {
await stream.close();
}
}
}
Loading