Skip to content

Commit

Permalink
fix: handle reorgs in the p2p-client
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Oct 25, 2024
1 parent 5ec4766 commit cbbce08
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 42 deletions.
4 changes: 2 additions & 2 deletions yarn-project/archiver/src/test/mock_l2_block_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ export class MockL2BlockSource implements L2BlockSource {
return Promise.resolve(this.l2Blocks.length);
}

public async getProvenBlockNumber(): Promise<number> {
return this.provenBlockNumber ?? (await this.getBlockNumber());
public getProvenBlockNumber(): Promise<number> {
return Promise.resolve(this.provenBlockNumber);
}

public getProvenL2EpochNumber(): Promise<number | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ export class L2BlockStream {
const localTips = await this.localData.getL2Tips();
this.log.debug(`Running L2 block stream`, {
sourceLatest: sourceTips.latest.number,
localLatest: localTips.latest,
localLatest: localTips.latest.number,
sourceFinalized: sourceTips.finalized.number,
localFinalized: localTips.finalized,
localFinalized: localTips.finalized.number,
sourceProven: sourceTips.proven.number,
localProven: localTips.proven,
localProven: localTips.proven.number,
sourceLatestHash: sourceTips.latest.hash,
localLatestHash: localTips.latest.hash,
sourceProvenHash: sourceTips.proven.hash,
localProvenHash: localTips.proven.hash,
sourceFinalizedHash: sourceTips.finalized.hash,
localFinalizedHash: localTips.finalized.hash,
});

// Check if there was a reorg and emit a chain-pruned event if so.
Expand All @@ -71,7 +74,7 @@ export class L2BlockStream {
while (latestBlockNumber < sourceTips.latest.number) {
const from = latestBlockNumber + 1;
const limit = Math.min(this.opts.batchSize ?? 20, sourceTips.latest.number - from + 1);
this.log.debug(`Requesting blocks from ${from} limit ${limit}`);
this.log.debug(`Requesting blocks from ${from} limit ${limit} proven=${this.opts.proven}`);
const blocks = await this.l2BlockSource.getBlocks(from, limit, this.opts.proven);
if (blocks.length === 0) {
break;
Expand Down
67 changes: 64 additions & 3 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { MockL2BlockSource } from '@aztec/archiver/test';
import { mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { L2Block, mockEpochProofQuote, mockTx } from '@aztec/circuit-types';
import { Fr } from '@aztec/circuits.js';
import { retryUntil } from '@aztec/foundation/retry';
import { sleep } from '@aztec/foundation/sleep';
import { type AztecKVStore } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';
import { type TelemetryClient } from '@aztec/telemetry-client';
Expand Down Expand Up @@ -103,8 +105,8 @@ describe('In-Memory P2P Client', () => {
await client.start();
expect(client.isReady()).toEqual(true);

await client.stop();
expect(client.isReady()).toEqual(false);
// await client.stop();
// expect(client.isReady()).toEqual(false);
});

it('adds txs to pool', async () => {
Expand Down Expand Up @@ -236,5 +238,64 @@ describe('In-Memory P2P Client', () => {
expect(epochProofQuotePool.deleteQuotesToEpoch).toBeCalledWith(3n);
});

describe('Chain prunes', () => {
it('moves the tips on a chain reorg', async () => {
blockSource.setProvenBlockNumber(0);
await client.start();

await advanceToProvenBlock(90);

await expect(client.getL2Tips()).resolves.toEqual({
latest: { number: 100, hash: expect.any(String) },
proven: { number: 90, hash: expect.any(String) },
finalized: { number: 90, hash: expect.any(String) },
});

blockSource.removeBlocks(10);

// give the client a chance to react to the reorg
await sleep(100);

await expect(client.getL2Tips()).resolves.toEqual({
latest: { number: 90, hash: expect.any(String) },
proven: { number: 90, hash: expect.any(String) },
finalized: { number: 90, hash: expect.any(String) },
});

blockSource.addBlocks([L2Block.random(91), L2Block.random(92)]);

// give the client a chance to react to the new blocks
await sleep(100);

await expect(client.getL2Tips()).resolves.toEqual({
latest: { number: 92, hash: expect.any(String) },
proven: { number: 90, hash: expect.any(String) },
finalized: { number: 90, hash: expect.any(String) },
});
});

it('deletes txs created from a pruned block', async () => {
client = new P2PClient(kvStore, blockSource, mempools, p2pService, 10, telemetryClient);
blockSource.setProvenBlockNumber(0);
await client.start();

// add two txs to the pool. One build against block 90, one against block 95
// then prune the chain back to block 90
// only one tx should be deleted
const goodTx = mockTx();
goodTx.data.constants.globalVariables.blockNumber = new Fr(90);

const badTx = mockTx();
badTx.data.constants.globalVariables.blockNumber = new Fr(95);

txPool.getAllTxs.mockReturnValue([goodTx, badTx]);

blockSource.removeBlocks(10);
await sleep(150);
expect(txPool.deleteTxs).toHaveBeenCalledWith([badTx.getTxHash()]);
await client.stop();
});
});

// TODO(https://github.com/AztecProtocol/aztec-packages/issues/7971): tests for attestation pool pruning
});
138 changes: 106 additions & 32 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ import {
type BlockProposal,
type EpochProofQuote,
type L2Block,
L2BlockDownloader,
type L2BlockId,
type L2BlockSource,
L2BlockStream,
type L2BlockStreamEvent,
type L2Tips,
type Tx,
type TxHash,
} from '@aztec/circuit-types';
import { INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js/constants';
import { createDebugLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store';
import { type AztecKVStore, type AztecMap, type AztecSingleton } from '@aztec/kv-store';
import { Attributes, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import { type ENR } from '@chainsafe/enr';
Expand Down Expand Up @@ -172,12 +174,6 @@ export interface P2P {
* The P2P client implementation.
*/
export class P2PClient extends WithTracer implements P2P {
/** L2 block download to stay in sync with latest blocks. */
private latestBlockDownloader: L2BlockDownloader;

/** L2 block download to stay in sync with proven blocks. */
private provenBlockDownloader: L2BlockDownloader;

/** Property that indicates whether the client is running. */
private stopping = false;

Expand All @@ -190,13 +186,16 @@ export class P2PClient extends WithTracer implements P2P {
private latestBlockNumberAtStart = -1;
private provenBlockNumberAtStart = -1;

private synchedBlockHashes: AztecMap<number, string>;
private synchedLatestBlockNumber: AztecSingleton<number>;
private synchedProvenBlockNumber: AztecSingleton<number>;

private txPool: TxPool;
private attestationPool: AttestationPool;
private epochProofQuotePool: EpochProofQuotePool;

private blockStream;

/**
* In-memory P2P client constructor.
* @param store - The client's instance of the KV store.
Expand All @@ -217,14 +216,14 @@ export class P2PClient extends WithTracer implements P2P {
) {
super(telemetryClient, 'P2PClient');

const { blockCheckIntervalMS: checkInterval, l2QueueSize: p2pL2QueueSize } = getP2PConfigEnvVars();
const l2DownloaderOpts = { maxQueueSize: p2pL2QueueSize, pollIntervalMS: checkInterval };
// TODO(palla/prover-node): This effectively downloads blocks twice from the archiver, which is an issue
// if the archiver is remote. We should refactor this so the downloader keeps a single queue and handles
// latest/proven metadata, as well as block reorgs.
this.latestBlockDownloader = new L2BlockDownloader(l2BlockSource, l2DownloaderOpts);
this.provenBlockDownloader = new L2BlockDownloader(l2BlockSource, { ...l2DownloaderOpts, proven: true });
const { blockCheckIntervalMS, l2QueueSize } = getP2PConfigEnvVars();

this.blockStream = new L2BlockStream(l2BlockSource, this, this, {
batchSize: l2QueueSize,
pollIntervalMS: blockCheckIntervalMS,
});

this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes');
this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block');
this.synchedProvenBlockNumber = store.openSingleton('p2p_pool_last_proven_l2_block');

Expand All @@ -233,6 +232,64 @@ export class P2PClient extends WithTracer implements P2P {
this.epochProofQuotePool = mempools.epochProofQuotePool;
}

public getL2BlockHash(number: number): Promise<string | undefined> {
return Promise.resolve(this.synchedBlockHashes.get(number));
}

public getL2Tips(): Promise<L2Tips> {
const latestBlockNumber = this.getSyncedLatestBlockNum();
let latestBlockHash: string | undefined;
const provenBlockNumber = this.getSyncedProvenBlockNum();
let provenBlockHash: string | undefined;

if (latestBlockNumber > 0) {
latestBlockHash = this.synchedBlockHashes.get(latestBlockNumber);
if (typeof latestBlockHash === 'undefined') {
this.log.warn(`Block hash for latest block ${latestBlockNumber} not found`);
throw new Error();
}
}

if (provenBlockNumber > 0) {
provenBlockHash = this.synchedBlockHashes.get(provenBlockNumber);
if (typeof provenBlockHash === 'undefined') {
this.log.warn(`Block hash for proven block ${provenBlockNumber} not found`);
throw new Error();
}
}

return Promise.resolve({
latest: { hash: latestBlockHash!, number: latestBlockNumber },
proven: { hash: provenBlockHash!, number: provenBlockNumber },
finalized: { hash: provenBlockHash!, number: provenBlockNumber },
});
}

public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
this.log.debug(`Handling block stream event ${event.type}`);
switch (event.type) {
case 'blocks-added':
await this.handleLatestL2Blocks(event.blocks);
break;
case 'chain-finalized':
// TODO (alexg): I think we can prune the block hashes map here
break;
case 'chain-proven': {
const from = this.getSyncedProvenBlockNum() + 1;
const limit = event.blockNumber - from + 1;
await this.handleProvenL2Blocks(await this.l2BlockSource.getBlocks(from, limit));
break;
}
case 'chain-pruned':
await this.handlePruneL2Blocks(event.blockNumber);
break;
default: {
const _: never = event;
break;
}
}
}

#assertIsReady() {
if (!this.isReady()) {
throw new Error('P2P client not ready');
Expand Down Expand Up @@ -286,21 +343,7 @@ export class P2PClient extends WithTracer implements P2P {
// publish any txs in TxPool after its doing initial sync
this.syncPromise = this.syncPromise.then(() => this.publishStoredTxs());

// start looking for further blocks
const processLatest = async () => {
while (!this.stopping) {
await this.latestBlockDownloader.getBlocks(1).then(this.handleLatestL2Blocks.bind(this));
}
};
const processProven = async () => {
while (!this.stopping) {
await this.provenBlockDownloader.getBlocks(1).then(this.handleProvenL2Blocks.bind(this));
}
};

this.runningPromise = Promise.all([processLatest(), processProven()]).then(() => {});
this.latestBlockDownloader.start(syncedLatestBlock);
this.provenBlockDownloader.start(syncedLatestBlock);
this.blockStream.start();
this.log.verbose(`Started block downloader from block ${syncedLatestBlock}`);

return this.syncPromise;
Expand All @@ -315,8 +358,7 @@ export class P2PClient extends WithTracer implements P2P {
this.stopping = true;
await this.p2pService.stop();
this.log.debug('Stopped p2p service');
await this.latestBlockDownloader.stop();
await this.provenBlockDownloader.stop();
await this.blockStream.stop();
this.log.debug('Stopped block downloader');
await this.runningPromise;
this.setCurrentState(P2PClientState.STOPPED);
Expand Down Expand Up @@ -519,8 +561,10 @@ export class P2PClient extends WithTracer implements P2P {
if (!blocks.length) {
return Promise.resolve();
}

await this.markTxsAsMinedFromBlocks(blocks);
const lastBlockNum = blocks[blocks.length - 1].number;
await Promise.all(blocks.map(block => this.synchedBlockHashes.set(block.number, block.hash().toString())));
await this.synchedLatestBlockNumber.set(lastBlockNum);
this.log.debug(`Synched to latest block ${lastBlockNum}`);
await this.startServiceIfSynched();
Expand Down Expand Up @@ -558,7 +602,37 @@ export class P2PClient extends WithTracer implements P2P {
await this.startServiceIfSynched();
}

/**
* Updates the tx pool after a chain prune.
* @param latestBlock - The block number the chain was pruned to.
*/
private async handlePruneL2Blocks(latestBlock: number): Promise<void> {
const txsToDelete: TxHash[] = [];
for (const tx of this.txPool.getAllTxs()) {
// every tx that's been generated against a block that has now been pruned is no longer valid
// NOTE (alexg): I think this check against block hash instead of block number?
if (tx.data.constants.globalVariables.blockNumber.toNumber() > latestBlock) {
txsToDelete.push(tx.getTxHash());
}
}

// TODO (alexg): Delete or re-add txs that were created against the proven block but mined in one of the pruned blocks
// e.g. I create a tx against proven block 42 but it the sequencer includes it in block 45. The chain gets pruned back to 42.
// That tx now lingers in the pool as 'mined' but it really is no longer mined. It's also not technically invalid.

this.log.info(
`Detected chain prune. Removing invalid txs count=${
txsToDelete.length
} newLatestBlock=${latestBlock} previousLatestBlock=${this.getSyncedLatestBlockNum()}`,
);
await this.txPool.deleteTxs(txsToDelete);
await this.synchedLatestBlockNumber.set(latestBlock);
await this.synchedProvenBlockNumber.set(latestBlock);
// no need to update block hashes, as they will be updated as new blocks are added
}

private async startServiceIfSynched() {
// TODO (alexg): I don't think this check works if there's a reorg
if (
this.currentState === P2PClientState.SYNCHING &&
this.getSyncedLatestBlockNum() >= this.latestBlockNumberAtStart &&
Expand Down
6 changes: 5 additions & 1 deletion yarn-project/sequencer-client/src/sequencer/sequencer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ export class Sequencer {
this.metrics.recordNewBlock(newGlobalVariables.blockNumber.toNumber(), validTxs.length);
const workTimer = new Timer();
this.state = SequencerState.CREATING_BLOCK;
this.log.info(`Building block ${newGlobalVariables.blockNumber.toNumber()} with ${validTxs.length} transactions`);
this.log.info(
`Building blockNumber=${newGlobalVariables.blockNumber.toNumber()} txCount=${
validTxs.length
} slotNumber=${newGlobalVariables.slotNumber.toNumber()}`,
);

// Get l1 to l2 messages from the contract
this.log.debug('Requesting L1 to L2 messages from contract');
Expand Down

0 comments on commit cbbce08

Please sign in to comment.