Skip to content

Commit

Permalink
feat(p2p): more comprehensive peer management, dial retries, persiste…
Browse files Browse the repository at this point in the history
…nce fix (#6953)
  • Loading branch information
spypsy authored Jun 20, 2024
1 parent fa70876 commit cdd1cbd
Show file tree
Hide file tree
Showing 14 changed files with 454 additions and 272 deletions.
8 changes: 5 additions & 3 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ export class AztecNodeService implements AztecNode {
* @param config - The configuration to be used by the aztec node.
* @returns - A fully synced Aztec Node for use in development/testing.
*/
public static async createAndSync(config: AztecNodeConfig) {
public static async createAndSync(
config: AztecNodeConfig,
log = createDebugLogger('aztec:node'),
storeLog = createDebugLogger('aztec:node:lmdb'),
) {
const ethereumChain = createEthereumChain(config.rpcUrl, config.apiKey);
//validate that the actual chain id matches that specified in configuration
if (config.chainId !== ethereumChain.chainInfo.id) {
Expand All @@ -131,8 +135,6 @@ export class AztecNodeService implements AztecNode {
);
}

const log = createDebugLogger('aztec:node');
const storeLog = createDebugLogger('aztec:node:lmdb');
const store = await initStoreForRollup(
AztecLmdbStore.open(config.dataDirectory, false, storeLog),
config.l1Contracts.rollupAddress,
Expand Down
3 changes: 3 additions & 0 deletions yarn-project/end-to-end/Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ E2E_TEST:
# Run our docker compose, ending whenever sandbox ends, filtering out noisy eth_getLogs
RUN docker run -e HARDWARE_CONCURRENCY=$hardware_concurrency --rm aztecprotocol/end-to-end:$AZTEC_DOCKER_TAG $test || $allow_fail

e2e-p2p:
DO +E2E_TEST --test=./src/e2e_p2p_network.test.ts

e2e-2-pxes:
DO +E2E_TEST --test=./src/e2e_2_pxes.test.ts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import {
GrumpkinScalar,
type SentTx,
TxStatus,
createDebugLogger,
sleep,
} from '@aztec/aztec.js';
import { type BootNodeConfig, BootstrapNode, createLibP2PPeerId } from '@aztec/p2p';
import { type PXEService, createPXEService, getPXEServiceConfig as getRpcConfig } from '@aztec/pxe';

import fs from 'fs';
import { mnemonicToAccount } from 'viem/accounts';

import { MNEMONIC } from './fixtures/fixtures.js';
Expand All @@ -30,21 +33,36 @@ interface NodeContext {
account: AztecAddress;
}

const PEER_ID_PRIVATE_KEYS = [
'0802122002f651fd8653925529e3baccb8489b3af4d7d9db440cbf5df4a63ff04ea69683',
'08021220c3bd886df5fe5b33376096ad0dab3d2dc86ed2a361d5fde70f24d979dc73da41',
'080212206b6567ac759db5434e79495ec7458e5e93fe479a5b80713446e0bce5439a5655',
'08021220366453668099bdacdf08fab476ee1fced6bf00ddc1223d6c2ee626e7236fb526',
];

describe('e2e_p2p_network', () => {
let config: AztecNodeConfig;
let logger: DebugLogger;
let teardown: () => Promise<void>;
let bootstrapNode: BootstrapNode;
let bootstrapNodeEnr: string;

beforeEach(async () => {
({ teardown, config, logger } = await setup(1));
({ teardown, config, logger } = await setup(0));
bootstrapNode = await createBootstrapNode();
bootstrapNodeEnr = bootstrapNode.getENR().encodeTxt();
});

afterEach(() => teardown());

afterAll(() => {
for (let i = 0; i < NUM_NODES; i++) {
fs.rmSync(`./data-${i}`, { recursive: true, force: true });
}
});

it('should rollup txs from all peers', async () => {
// create the bootstrap node for the network
const bootstrapNode = await createBootstrapNode();
const bootstrapNodeEnr = bootstrapNode.getENR();
if (!bootstrapNodeEnr) {
throw new Error('Bootstrap node ENR is not available');
}
Expand All @@ -53,14 +71,29 @@ describe('e2e_p2p_network', () => {
// should be set so that the only way for rollups to be built
// is if the txs are successfully gossiped around the nodes.
const contexts: NodeContext[] = [];
const nodes: AztecNodeService[] = [];
for (let i = 0; i < NUM_NODES; i++) {
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr?.encodeTxt(), i);
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i);
nodes.push(node);
}

// wait a bit for peers to discover each other
await sleep(2000);

for (const node of nodes) {
const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
contexts.push(context);
}

// now ensure that all txs were successfully mined
await Promise.all(contexts.flatMap(context => context.txs.map(tx => tx.wait())));
await Promise.all(
contexts.flatMap((context, i) =>
context.txs.map(async (tx, j) => {
logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
),
);

// shutdown all nodes.
for (const context of contexts) {
Expand All @@ -70,6 +103,61 @@ describe('e2e_p2p_network', () => {
await bootstrapNode.stop();
});

it('should re-discover stored peers without bootstrap node', async () => {
const contexts: NodeContext[] = [];
const nodes: AztecNodeService[] = [];
for (let i = 0; i < NUM_NODES; i++) {
const node = await createNode(i + 1 + BOOT_NODE_UDP_PORT, bootstrapNodeEnr, i, `./data-${i}`);
nodes.push(node);
}
// wait a bit for peers to discover each other
await sleep(3000);

// stop bootstrap node
await bootstrapNode.stop();

// create new nodes from datadir
const newNodes: AztecNodeService[] = [];

// stop all nodes
for (let i = 0; i < NUM_NODES; i++) {
const node = nodes[i];
await node.stop();
logger.info(`Node ${i} stopped`);
await sleep(1200);
const newNode = await createNode(i + 1 + BOOT_NODE_UDP_PORT, undefined, i, `./data-${i}`);
logger.info(`Node ${i} restarted`);
newNodes.push(newNode);
// const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
// contexts.push(context);
}

// wait a bit for peers to discover each other
await sleep(2000);

for (const node of newNodes) {
const context = await createPXEServiceAndSubmitTransactions(node, NUM_TXS_PER_NODE);
contexts.push(context);
}

// now ensure that all txs were successfully mined
await Promise.all(
contexts.flatMap((context, i) =>
context.txs.map(async (tx, j) => {
logger.info(`Waiting for tx ${i}-${j}: ${await tx.getTxHash()} to be mined`);
return tx.wait();
}),
),
);

// shutdown all nodes.
// for (const context of contexts) {
for (const context of contexts) {
await context.node.stop();
await context.pxeService.stop();
}
});

const createBootstrapNode = async () => {
const peerId = await createLibP2PPeerId();
const bootstrapNode = new BootstrapNode();
Expand All @@ -87,7 +175,12 @@ describe('e2e_p2p_network', () => {
};

// creates a P2P enabled instance of Aztec Node Service
const createNode = async (tcpListenPort: number, bootstrapNode: string, publisherAddressIndex: number) => {
const createNode = async (
tcpListenPort: number,
bootstrapNode: string | undefined,
publisherAddressIndex: number,
dataDirectory?: string,
) => {
// We use different L1 publisher accounts in order to avoid duplicate tx nonces. We start from
// publisherAddressIndex + 1 because index 0 was already used during test environment setup.
const hdAccount = mnemonicToAccount(MNEMONIC, { addressIndex: publisherAddressIndex + 1 });
Expand All @@ -96,38 +189,21 @@ describe('e2e_p2p_network', () => {

const newConfig: AztecNodeConfig = {
...config,
peerIdPrivateKey: PEER_ID_PRIVATE_KEYS[publisherAddressIndex],
udpListenAddress: `0.0.0.0:${tcpListenPort}`,
tcpListenAddress: `0.0.0.0:${tcpListenPort}`,
tcpAnnounceAddress: `127.0.0.1:${tcpListenPort}`,
udpAnnounceAddress: `127.0.0.1:${tcpListenPort}`,
bootstrapNodes: [bootstrapNode],
minTxsPerBlock: NUM_TXS_PER_BLOCK,
maxTxsPerBlock: NUM_TXS_PER_BLOCK,
p2pEnabled: true,
p2pBlockCheckIntervalMS: 1000,
p2pL2QueueSize: 1,
transactionProtocol: '',
dataDirectory,
bootstrapNodes: bootstrapNode ? [bootstrapNode] : [],
};
return await AztecNodeService.createAndSync(newConfig);
};

// submits a set of transactions to the provided Private eXecution Environment (PXE)
const submitTxsTo = async (pxe: PXEService, account: AztecAddress, numTxs: number) => {
const txs: SentTx[] = [];
for (let i = 0; i < numTxs; i++) {
const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy();
logger.info(`Tx sent with hash ${await tx.getTxHash()}`);
const receipt = await tx.getReceipt();
expect(receipt).toEqual(
expect.objectContaining({
status: TxStatus.PENDING,
error: '',
}),
);
logger.info(`Receipt received for ${await tx.getTxHash()}`);
txs.push(tx);
}
return txs;
return await AztecNodeService.createAndSync(newConfig, createDebugLogger(`aztec:node-${tcpListenPort}`));
};

// creates an instance of the PXE and submit a given number of transactions to it.
Expand All @@ -142,12 +218,44 @@ describe('e2e_p2p_network', () => {
const completeAddress = CompleteAddress.fromSecretKeyAndPartialAddress(secretKey, Fr.random());
await pxeService.registerAccount(secretKey, completeAddress.partialAddress);

const txs = await submitTxsTo(pxeService, completeAddress.address, numTxs);
const txs = await submitTxsTo(pxeService, numTxs);
return {
txs,
account: completeAddress.address,
pxeService,
node,
};
};

// submits a set of transactions to the provided Private eXecution Environment (PXE)
const submitTxsTo = async (pxe: PXEService, numTxs: number) => {
const txs: SentTx[] = [];
for (let i = 0; i < numTxs; i++) {
// const tx = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random()).deploy();
const accountManager = getSchnorrAccount(pxe, Fr.random(), GrumpkinScalar.random(), Fr.random());
const deployMethod = await accountManager.getDeployMethod();
await deployMethod.create({
contractAddressSalt: accountManager.salt,
skipClassRegistration: true,
skipPublicDeployment: true,
universalDeploy: true,
});
await deployMethod.prove({});
const tx = deployMethod.send();

const txHash = await tx.getTxHash();

logger.info(`Tx sent with hash ${txHash}`);
const receipt = await tx.getReceipt();
expect(receipt).toEqual(
expect.objectContaining({
status: TxStatus.PENDING,
error: '',
}),
);
logger.info(`Receipt received for ${txHash}`);
txs.push(tx);
}
return txs;
};
});
10 changes: 4 additions & 6 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { type AztecKVStore } from '@aztec/kv-store';
import { P2PClient } from '../client/p2p_client.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from '../service/discV5_service.js';
import { DummyP2PService, DummyPeerDiscoveryService } from '../service/dummy_service.js';
import { DummyP2PService } from '../service/dummy_service.js';
import { LibP2PService, createLibP2PPeerId } from '../service/index.js';
import { type TxPool } from '../tx_pool/index.js';
import { getPublicIp, splitAddressPort } from '../util.js';
Expand All @@ -17,7 +17,6 @@ export const createP2PClient = async (
txPool: TxPool,
l2BlockSource: L2BlockSource,
) => {
let discv5Service;
let p2pService;

if (config.p2pEnabled) {
Expand All @@ -40,7 +39,7 @@ export const createP2PClient = async (
config.tcpAnnounceAddress = tcpAnnounceAddress;
} else {
throw new Error(
`Invalid announceTcpAddress provided: ${splitTcpAnnounceAddress}. Expected format: <addr>:<port>`,
`Invalid announceTcpAddress provided: ${configTcpAnnounceAddress}. Expected format: <addr>:<port>`,
);
}
}
Expand All @@ -59,11 +58,10 @@ export const createP2PClient = async (

// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
discv5Service = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discv5Service, peerId, txPool);
const discoveryService = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, store);
} else {
p2pService = new DummyP2PService();
discv5Service = new DummyPeerDiscoveryService();
}
return new P2PClient(store, l2BlockSource, txPool, p2pService);
};
1 change: 0 additions & 1 deletion yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ describe('In-Memory P2P Client', () => {
start: jest.fn(),
stop: jest.fn(),
propagateTx: jest.fn(),
settledTxs: jest.fn(),
};

blockSource = new MockBlockSource();
Expand Down
3 changes: 1 addition & 2 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class P2PClient implements P2P {
this.log.debug('Stopped block downloader');
await this.runningPromise;
this.setCurrentState(P2PClientState.STOPPED);
this.log.info('P2P client stopped...');
this.log.info('P2P client stopped.');
}

/**
Expand Down Expand Up @@ -278,7 +278,6 @@ export class P2PClient implements P2P {
for (const block of blocks) {
const txHashes = block.body.txEffects.map(txEffect => txEffect.txHash);
await this.txPool.deleteTxs(txHashes);
this.p2pService.settledTxs(txHashes);
}
}

Expand Down
Loading

0 comments on commit cdd1cbd

Please sign in to comment.