From ba63b43c6e5c09ecda0ed94bdd3b875546400d27 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 11 Oct 2024 13:31:20 +0100 Subject: [PATCH] feat: World state synchronizer reorgs (#9091) Handles reorgs on the world state synchronizer. --- .../archiver/src/archiver/archiver.ts | 28 + yarn-project/archiver/src/test/index.ts | 2 + .../archiver/src/test/mock_archiver.ts | 55 ++ .../src/test/mock_l1_to_l2_message_source.ts | 31 ++ .../archiver/src/test/mock_l2_block_source.ts | 59 ++- .../aztec/src/cli/aztec_start_options.ts | 7 - .../src/interfaces/world_state.ts | 8 - .../src/l2_block_downloader/index.ts | 1 + .../l2_block_stream.test.ts | 174 +++++++ .../l2_block_downloader/l2_block_stream.ts | 155 ++++++ .../circuit-types/src/l2_block_source.ts | 29 ++ .../composed/integration_l1_publisher.test.ts | 4 +- .../end-to-end/src/e2e_synching.test.ts | 14 + .../end-to-end/src/fixtures/setup_p2p_test.ts | 1 - yarn-project/foundation/src/config/env_var.ts | 2 +- yarn-project/foundation/src/error/index.ts | 12 +- .../src/promise/running-promise.test.ts | 44 ++ .../foundation/src/promise/running-promise.ts | 33 +- .../foundation/src/queue/base_memory_queue.ts | 8 + .../p2p/src/client/p2p_client.test.ts | 7 +- .../reqresp/p2p_client.integration.test.ts | 8 +- .../prover-node/src/prover-node.test.ts | 10 +- yarn-project/prover-node/src/prover-node.ts | 7 +- yarn-project/world-state/package.json | 1 + .../world-state/src/native/message.ts | 3 + .../src/native/native_world_state.test.ts | 2 +- .../src/native/native_world_state_cmp.test.ts | 2 +- .../src/native/native_world_state_instance.ts | 4 + .../world-state/src/synchronizer/config.ts | 17 +- .../world-state/src/synchronizer/factory.ts | 15 +- .../server_world_state_synchronizer.test.ts | 485 +++++------------- .../server_world_state_synchronizer.ts | 276 +++++----- .../world-state/src/test/integration.test.ts | 241 +++++++++ .../{native/test_util.ts => test/utils.ts} | 18 + .../src/world-state-db/merkle_tree_db.ts | 27 + .../src/world-state-db/merkle_trees.ts | 21 +- yarn-project/world-state/tsconfig.json | 3 + yarn-project/yarn.lock | 1 + 38 files changed, 1248 insertions(+), 567 deletions(-) create mode 100644 yarn-project/archiver/src/test/mock_archiver.ts create mode 100644 yarn-project/archiver/src/test/mock_l1_to_l2_message_source.ts create mode 100644 yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.test.ts create mode 100644 yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts create mode 100644 yarn-project/foundation/src/promise/running-promise.test.ts create mode 100644 yarn-project/world-state/src/test/integration.test.ts rename yarn-project/world-state/src/{native/test_util.ts => test/utils.ts} (85%) diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 9956fc6549a..f1b9de2b278 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -4,9 +4,11 @@ import { type InboxLeaf, type L1ToL2MessageSource, type L2Block, + type L2BlockId, type L2BlockL2Logs, type L2BlockSource, type L2LogsSource, + type L2Tips, type LogFilter, type LogType, type TxEffect, @@ -654,6 +656,32 @@ export class Archiver implements ArchiveSource { getContractArtifact(address: AztecAddress): Promise { return this.store.getContractArtifact(address); } + + async getL2Tips(): Promise { + const [latestBlockNumber, provenBlockNumber] = await Promise.all([ + this.getBlockNumber(), + this.getProvenBlockNumber(), + ] as const); + + const [latestBlockHeader, provenBlockHeader] = await Promise.all([ + latestBlockNumber > 0 ? this.getBlockHeader(latestBlockNumber) : undefined, + provenBlockNumber > 0 ? this.getBlockHeader(provenBlockNumber) : undefined, + ] as const); + + if (latestBlockNumber > 0 && !latestBlockHeader) { + throw new Error('Failed to retrieve latest block header'); + } + + if (provenBlockNumber > 0 && !provenBlockHeader) { + throw new Error('Failed to retrieve proven block header'); + } + + return { + latest: { number: latestBlockNumber, hash: latestBlockHeader?.hash().toString() } as L2BlockId, + proven: { number: provenBlockNumber, hash: provenBlockHeader?.hash().toString() } as L2BlockId, + finalized: { number: provenBlockNumber, hash: provenBlockHeader?.hash().toString() } as L2BlockId, + }; + } } enum Operation { diff --git a/yarn-project/archiver/src/test/index.ts b/yarn-project/archiver/src/test/index.ts index 5ff05e2acb4..3e22042e59e 100644 --- a/yarn-project/archiver/src/test/index.ts +++ b/yarn-project/archiver/src/test/index.ts @@ -1 +1,3 @@ export * from './mock_l2_block_source.js'; +export * from './mock_l1_to_l2_message_source.js'; +export * from './mock_archiver.js'; diff --git a/yarn-project/archiver/src/test/mock_archiver.ts b/yarn-project/archiver/src/test/mock_archiver.ts new file mode 100644 index 00000000000..5cfb7424551 --- /dev/null +++ b/yarn-project/archiver/src/test/mock_archiver.ts @@ -0,0 +1,55 @@ +import { type L1ToL2MessageSource, type L2Block, type L2BlockSource } from '@aztec/circuit-types'; +import { type Fr } from '@aztec/circuits.js'; + +import { MockL1ToL2MessageSource } from './mock_l1_to_l2_message_source.js'; +import { MockL2BlockSource } from './mock_l2_block_source.js'; + +/** + * A mocked implementation of the archiver that implements L2BlockSource and L1ToL2MessageSource. + */ +export class MockArchiver extends MockL2BlockSource implements L2BlockSource, L1ToL2MessageSource { + private messageSource = new MockL1ToL2MessageSource(0); + + public setL1ToL2Messages(blockNumber: number, msgs: Fr[]) { + this.messageSource.setL1ToL2Messages(blockNumber, msgs); + } + + getL1ToL2Messages(blockNumber: bigint): Promise { + return this.messageSource.getL1ToL2Messages(blockNumber); + } + + getL1ToL2MessageIndex(_l1ToL2Message: Fr, _startIndex: bigint): Promise { + return this.messageSource.getL1ToL2MessageIndex(_l1ToL2Message, _startIndex); + } +} + +/** + * A mocked implementation of the archiver with a set of precomputed blocks and messages. + */ +export class MockPrefilledArchiver extends MockArchiver { + private precomputed: L2Block[]; + + constructor(precomputed: L2Block[], messages: Fr[][]) { + super(); + this.precomputed = precomputed.slice(); + messages.forEach((msgs, i) => this.setL1ToL2Messages(i + 1, msgs)); + } + + public setPrefilledBlocks(blocks: L2Block[], messages: Fr[][]) { + for (const block of blocks) { + this.precomputed[block.number - 1] = block; + } + messages.forEach((msgs, i) => this.setL1ToL2Messages(blocks[i].number, msgs)); + } + + public override createBlocks(numBlocks: number) { + if (this.l2Blocks.length + numBlocks > this.precomputed.length) { + throw new Error( + `Not enough precomputed blocks to create ${numBlocks} more blocks (already at ${this.l2Blocks.length})`, + ); + } + + const fromBlock = this.l2Blocks.length; + this.addBlocks(this.precomputed.slice(fromBlock, fromBlock + numBlocks)); + } +} diff --git a/yarn-project/archiver/src/test/mock_l1_to_l2_message_source.ts b/yarn-project/archiver/src/test/mock_l1_to_l2_message_source.ts new file mode 100644 index 00000000000..cac4157d6ae --- /dev/null +++ b/yarn-project/archiver/src/test/mock_l1_to_l2_message_source.ts @@ -0,0 +1,31 @@ +import { type L1ToL2MessageSource } from '@aztec/circuit-types'; +import { type Fr } from '@aztec/circuits.js'; + +/** + * A mocked implementation of L1ToL2MessageSource to be used in tests. + */ +export class MockL1ToL2MessageSource implements L1ToL2MessageSource { + private messagesPerBlock = new Map(); + + constructor(private blockNumber: number) {} + + public setL1ToL2Messages(blockNumber: number, msgs: Fr[]) { + this.messagesPerBlock.set(blockNumber, msgs); + } + + public setBlockNumber(blockNumber: number) { + this.blockNumber = blockNumber; + } + + getL1ToL2Messages(blockNumber: bigint): Promise { + return Promise.resolve(this.messagesPerBlock.get(Number(blockNumber)) ?? []); + } + + getL1ToL2MessageIndex(_l1ToL2Message: Fr, _startIndex: bigint): Promise { + throw new Error('Method not implemented.'); + } + + getBlockNumber(): Promise { + return Promise.resolve(this.blockNumber); + } +} diff --git a/yarn-project/archiver/src/test/mock_l2_block_source.ts b/yarn-project/archiver/src/test/mock_l2_block_source.ts index 4206f29c4fe..5921831cb2d 100644 --- a/yarn-project/archiver/src/test/mock_l2_block_source.ts +++ b/yarn-project/archiver/src/test/mock_l2_block_source.ts @@ -1,27 +1,38 @@ -import { L2Block, type L2BlockSource, type TxEffect, type TxHash, TxReceipt, TxStatus } from '@aztec/circuit-types'; +import { L2Block, type L2BlockSource, type L2Tips, type TxHash, TxReceipt, TxStatus } from '@aztec/circuit-types'; import { EthAddress, type Header } from '@aztec/circuits.js'; +import { createDebugLogger } from '@aztec/foundation/log'; import { getSlotRangeForEpoch } from '../archiver/epoch_helpers.js'; /** - * A mocked implementation of L2BlockSource to be used in p2p tests. + * A mocked implementation of L2BlockSource to be used in tests. */ -export class MockBlockSource implements L2BlockSource { - private l2Blocks: L2Block[] = []; - private txEffects: TxEffect[] = []; +export class MockL2BlockSource implements L2BlockSource { + protected l2Blocks: L2Block[] = []; + private provenEpochNumber: number = 0; + private provenBlockNumber: number = 0; - constructor(numBlocks = 100, private provenBlockNumber?: number) { - this.addBlocks(numBlocks); - } + private log = createDebugLogger('aztec:archiver:mock_l2_block_source'); - public addBlocks(numBlocks: number) { + public createBlocks(numBlocks: number) { for (let i = 0; i < numBlocks; i++) { - const blockNum = this.l2Blocks.length; - const block = L2Block.random(blockNum, blockNum); + const blockNum = this.l2Blocks.length + 1; + const block = L2Block.random(blockNum); this.l2Blocks.push(block); - this.txEffects.push(...block.body.txEffects); } + + this.log.verbose(`Created ${numBlocks} blocks in the mock L2 block source`); + } + + public addBlocks(blocks: L2Block[]) { + this.l2Blocks.push(...blocks); + this.log.verbose(`Added ${blocks.length} blocks to the mock L2 block source`); + } + + public removeBlocks(numBlocks: number) { + this.l2Blocks = this.l2Blocks.slice(0, -numBlocks); + this.log.verbose(`Removed ${numBlocks} blocks from the mock L2 block source`); } public setProvenBlockNumber(provenBlockNumber: number) { @@ -53,7 +64,7 @@ export class MockBlockSource implements L2BlockSource { * @returns In this mock instance, returns the number of L2 blocks that we've mocked. */ public getBlockNumber() { - return Promise.resolve(this.l2Blocks.length - 1); + return Promise.resolve(this.l2Blocks.length); } public async getProvenBlockNumber(): Promise { @@ -70,7 +81,7 @@ export class MockBlockSource implements L2BlockSource { * @returns The requested L2 block. */ public getBlock(number: number) { - return Promise.resolve(this.l2Blocks[number]); + return Promise.resolve(this.l2Blocks[number - 1]); } /** @@ -82,13 +93,13 @@ export class MockBlockSource implements L2BlockSource { public getBlocks(from: number, limit: number, proven?: boolean) { return Promise.resolve( this.l2Blocks - .slice(from, from + limit) + .slice(from - 1, from - 1 + limit) .filter(b => !proven || this.provenBlockNumber === undefined || b.number <= this.provenBlockNumber), ); } getBlockHeader(number: number | 'latest'): Promise
{ - return Promise.resolve(this.l2Blocks.at(typeof number === 'number' ? number : -1)?.header); + return Promise.resolve(this.l2Blocks.at(typeof number === 'number' ? number - 1 : -1)?.header); } getBlocksForEpoch(epochNumber: bigint): Promise { @@ -106,7 +117,7 @@ export class MockBlockSource implements L2BlockSource { * @returns The requested tx effect. */ public getTxEffect(txHash: TxHash) { - const txEffect = this.txEffects.find(tx => tx.txHash.equals(txHash)); + const txEffect = this.l2Blocks.flatMap(b => b.body.txEffects).find(tx => tx.txHash.equals(txHash)); return Promise.resolve(txEffect); } @@ -135,6 +146,20 @@ export class MockBlockSource implements L2BlockSource { return Promise.resolve(undefined); } + async getL2Tips(): Promise { + const [latest, proven, finalized] = [ + await this.getBlockNumber(), + await this.getProvenBlockNumber(), + await this.getProvenBlockNumber(), + ] as const; + + return { + latest: { number: latest, hash: this.l2Blocks[latest - 1]?.hash().toString() }, + proven: { number: proven, hash: this.l2Blocks[proven - 1]?.hash().toString() }, + finalized: { number: finalized, hash: this.l2Blocks[finalized - 1]?.hash().toString() }, + }; + } + getL2EpochNumber(): Promise { throw new Error('Method not implemented.'); } diff --git a/yarn-project/aztec/src/cli/aztec_start_options.ts b/yarn-project/aztec/src/cli/aztec_start_options.ts index 99aa5c54218..9d9c00a7d98 100644 --- a/yarn-project/aztec/src/cli/aztec_start_options.ts +++ b/yarn-project/aztec/src/cli/aztec_start_options.ts @@ -193,13 +193,6 @@ export const aztecStartOptions: { [key: string]: AztecStartOption[] } = { defaultValue: undefined, envVar: 'L1_PRIVATE_KEY', }, - { - flag: '--node.l2QueueSize ', - description: 'Size of queue of L2 blocks to store in world state', - defaultValue: 1000, - envVar: 'L2_QUEUE_SIZE', - parseVal: val => parseInt(val, 10), - }, { flag: '--node.worldStateBlockCheckIntervalMS ', description: 'Frequency in which to check for blocks in ms', diff --git a/yarn-project/circuit-types/src/interfaces/world_state.ts b/yarn-project/circuit-types/src/interfaces/world_state.ts index 4c11bfe8f97..d3d2697fa34 100644 --- a/yarn-project/circuit-types/src/interfaces/world_state.ts +++ b/yarn-project/circuit-types/src/interfaces/world_state.ts @@ -52,14 +52,6 @@ export interface WorldStateSynchronizer { */ syncImmediate(minBlockNumber?: number): Promise; - /** - * Pauses the synchronizer, syncs to the target block number, forks world state, and resumes. - * @param targetBlockNumber - The block number to sync to. - * @param forkIncludeUncommitted - Whether to include uncommitted data in the fork. - * @returns The db forked at the requested target block number. - */ - syncImmediateAndFork(targetBlockNumber: number): Promise; - /** * Forks the current in-memory state based off the current committed state, and returns an instance that cannot modify the underlying data store. */ diff --git a/yarn-project/circuit-types/src/l2_block_downloader/index.ts b/yarn-project/circuit-types/src/l2_block_downloader/index.ts index 9084cc7d81a..830bdc93d16 100644 --- a/yarn-project/circuit-types/src/l2_block_downloader/index.ts +++ b/yarn-project/circuit-types/src/l2_block_downloader/index.ts @@ -1 +1,2 @@ export * from './l2_block_downloader.js'; +export * from './l2_block_stream.js'; diff --git a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.test.ts b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.test.ts new file mode 100644 index 00000000000..b49f0c60734 --- /dev/null +++ b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.test.ts @@ -0,0 +1,174 @@ +import { Fr, type Header } from '@aztec/circuits.js'; +import { compactArray } from '@aztec/foundation/collection'; + +import { type MockProxy, mock } from 'jest-mock-extended'; +import times from 'lodash.times'; + +import { type L2Block } from '../l2_block.js'; +import { type L2BlockSource, type L2BlockTag } from '../l2_block_source.js'; +import { + L2BlockStream, + type L2BlockStreamEvent, + type L2BlockStreamEventHandler, + type L2BlockStreamLocalDataProvider, +} from './l2_block_stream.js'; + +describe('L2BlockStream', () => { + let blockStream: TestL2BlockStream; + + let blockSource: MockProxy; + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + + let latest: number = 0; + + beforeEach(() => { + blockSource = mock(); + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + + // Archiver returns headers with hashes equal to the block number for simplicity + blockSource.getBlockHeader.mockImplementation(number => + Promise.resolve(makeHeader(number === 'latest' ? 1 : number)), + ); + + // And returns blocks up until what was reported as the latest block + blockSource.getBlocks.mockImplementation((from, limit) => + Promise.resolve(compactArray(times(limit, i => (from + i > latest ? undefined : makeBlock(from + i))))), + ); + + blockStream = new TestL2BlockStream(blockSource, localData, handler, { batchSize: 10 }); + }); + + const makeBlock = (number: number) => ({ number } as L2Block); + + const makeHeader = (number: number) => mock
({ hash: () => new Fr(number) }); + + const setRemoteTips = (latest_: number, proven?: number, finalized?: number) => { + proven = proven ?? 0; + finalized = finalized ?? 0; + latest = latest_; + + blockSource.getL2Tips.mockResolvedValue({ + latest: { number: latest, hash: latest.toString() }, + proven: { number: proven, hash: proven.toString() }, + finalized: { number: finalized, hash: finalized.toString() }, + }); + }; + + describe('work', () => { + it('pulls new blocks from start', async () => { + setRemoteTips(5); + + await blockStream.work(); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 1)) }]); + }); + + it('pulls new blocks from offset', async () => { + setRemoteTips(15); + localData.latest = 10; + + await blockStream.work(); + expect(blockSource.getBlocks).toHaveBeenCalledWith(11, 5, undefined); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 11)) }]); + }); + + it('pulls new blocks in multiple batches', async () => { + setRemoteTips(45); + + await blockStream.work(); + expect(blockSource.getBlocks).toHaveBeenCalledTimes(5); + expect(handler.events).toEqual([ + { type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) }, + { type: 'blocks-added', blocks: times(10, i => makeBlock(i + 11)) }, + { type: 'blocks-added', blocks: times(10, i => makeBlock(i + 21)) }, + { type: 'blocks-added', blocks: times(10, i => makeBlock(i + 31)) }, + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) }, + ]); + }); + + it('halts pulling blocks if stopped', async () => { + setRemoteTips(45); + blockStream.running = false; + + await blockStream.work(); + expect(blockSource.getBlocks).toHaveBeenCalledTimes(1); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) }]); + }); + + it('handles a reorg and requests blocks from new tip', async () => { + setRemoteTips(45); + localData.latest = 40; + + for (const i of [37, 38, 39, 40]) { + // Mess up the block hashes for a bunch of blocks + localData.blockHashes[i] = `0xaa${i.toString()}`; + } + + await blockStream.work(); + expect(handler.events).toEqual([ + { type: 'chain-pruned', blockNumber: 36 }, + { type: 'blocks-added', blocks: times(9, i => makeBlock(i + 37)) }, + ]); + }); + + it('emits events for chain proven and finalized', async () => { + setRemoteTips(45, 40, 35); + localData.latest = 40; + localData.proven = 10; + localData.finalized = 10; + + await blockStream.work(); + expect(handler.events).toEqual([ + { type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) }, + { type: 'chain-proven', blockNumber: 40 }, + { type: 'chain-finalized', blockNumber: 35 }, + ]); + }); + + it('does not emit events for chain proven or finalized if local data ignores them', async () => { + setRemoteTips(45, 40, 35); + localData.latest = 40; + + await blockStream.work(); + expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(5, i => makeBlock(i + 41)) }]); + }); + }); +}); + +class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler { + public readonly events: L2BlockStreamEvent[] = []; + + handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + this.events.push(event); + return Promise.resolve(); + } +} + +class TestL2BlockStreamLocalDataProvider implements L2BlockStreamLocalDataProvider { + public readonly blockHashes: Record = {}; + + public latest = 0; + public proven: number | undefined = undefined; + public finalized: number | undefined = undefined; + + public getL2BlockHash(number: number): Promise { + return Promise.resolve(number > this.latest ? undefined : this.blockHashes[number] ?? new Fr(number).toString()); + } + + public getL2Tips(): Promise<{ latest: number } & Partial>> { + return Promise.resolve(this); + } +} + +class TestL2BlockStream extends L2BlockStream { + public running = true; + + public override work() { + return super.work(); + } + + public override isRunning(): boolean { + return this.running; + } +} diff --git a/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts new file mode 100644 index 00000000000..0039c860127 --- /dev/null +++ b/yarn-project/circuit-types/src/l2_block_downloader/l2_block_stream.ts @@ -0,0 +1,155 @@ +import { AbortError } from '@aztec/foundation/error'; +import { createDebugLogger } from '@aztec/foundation/log'; +import { RunningPromise } from '@aztec/foundation/running-promise'; + +import { type L2Block } from '../l2_block.js'; +import { type L2BlockId, type L2BlockSource, type L2BlockTag } from '../l2_block_source.js'; + +/** Creates a stream of events for new blocks, chain tips updates, and reorgs, out of polling an archiver. */ +export class L2BlockStream { + private readonly runningPromise: RunningPromise; + + private readonly log = createDebugLogger('aztec:l2_block_stream'); + + constructor( + private l2BlockSource: L2BlockSource, + private localData: L2BlockStreamLocalDataProvider, + private handler: L2BlockStreamEventHandler, + private opts: { + proven?: boolean; + pollIntervalMS?: number; + batchSize?: number; + }, + ) { + this.runningPromise = new RunningPromise(() => this.work(), this.opts.pollIntervalMS ?? 1000); + } + + public start() { + this.log.verbose(`Starting L2 block stream`, this.opts); + this.runningPromise.start(); + } + + public async stop() { + await this.runningPromise.stop(); + } + + public isRunning() { + return this.runningPromise.isRunning(); + } + + public sync() { + return this.runningPromise.trigger(); + } + + protected async work() { + try { + const sourceTips = await this.l2BlockSource.getL2Tips(); + const localTips = await this.localData.getL2Tips(); + this.log.debug(`Running L2 block stream`, { + sourceLatest: sourceTips.latest.number, + localLatest: localTips.latest, + sourceFinalized: sourceTips.finalized.number, + localFinalized: localTips.finalized, + sourceProven: sourceTips.proven.number, + localProven: localTips.proven, + sourceLatestHash: sourceTips.latest.hash, + sourceProvenHash: sourceTips.proven.hash, + sourceFinalizedHash: sourceTips.finalized.hash, + }); + + // Check if there was a reorg and emit a chain-pruned event if so. + let latestBlockNumber = localTips.latest; + while (!(await this.areBlockHashesEqual(latestBlockNumber, sourceTips.latest))) { + latestBlockNumber--; + } + if (latestBlockNumber < localTips.latest) { + this.log.verbose(`Reorg detected. Pruning blocks from ${latestBlockNumber + 1} to ${localTips.latest}.`); + await this.emitEvent({ type: 'chain-pruned', blockNumber: latestBlockNumber }); + } + + // Request new blocks from the source. + while (latestBlockNumber < sourceTips.latest.number) { + const from = latestBlockNumber + 1; + const limit = Math.min(this.opts.batchSize ?? 20, sourceTips.latest.number - from + 1); + this.log.debug(`Requesting blocks from ${from} limit ${limit}`); + const blocks = await this.l2BlockSource.getBlocks(from, limit, this.opts.proven); + if (blocks.length === 0) { + break; + } + await this.emitEvent({ type: 'blocks-added', blocks }); + latestBlockNumber = blocks.at(-1)!.number; + } + + // Update the proven and finalized tips. + // TODO(palla/reorg): Should we emit this before passing the new blocks? This would allow world-state to skip + // building the data structures for the pending chain if it's unneeded. + if (localTips.proven !== undefined && sourceTips.proven.number !== localTips.proven) { + await this.emitEvent({ type: 'chain-proven', blockNumber: sourceTips.proven.number }); + } + if (localTips.finalized !== undefined && sourceTips.finalized.number !== localTips.finalized) { + await this.emitEvent({ type: 'chain-finalized', blockNumber: sourceTips.finalized.number }); + } + } catch (err: any) { + if (err.name === 'AbortError') { + return; + } + this.log.error(`Error processing block stream`, err); + } + } + + private async areBlockHashesEqual(blockNumber: number, sourceLatest: L2BlockId) { + if (blockNumber === 0) { + return true; + } + const localBlockHash = await this.localData.getL2BlockHash(blockNumber); + const sourceBlockHash = + sourceLatest.number === blockNumber && sourceLatest.hash + ? sourceLatest.hash + : await this.l2BlockSource.getBlockHeader(blockNumber).then(h => h?.hash().toString()); + this.log.debug(`Comparing block hashes for block ${blockNumber}`, { localBlockHash, sourceBlockHash }); + return localBlockHash === sourceBlockHash; + } + + private async emitEvent(event: L2BlockStreamEvent) { + this.log.debug( + `Emitting ${event.type} (${event.type === 'blocks-added' ? event.blocks.length : event.blockNumber})`, + ); + await this.handler.handleBlockStreamEvent(event); + if (!this.isRunning()) { + throw new AbortError(); + } + } +} + +/** Interface to the local view of the chain. Implemented by world-state. */ +export interface L2BlockStreamLocalDataProvider { + getL2BlockHash(number: number): Promise; + getL2Tips(): Promise<{ latest: number } & Partial>>; +} + +/** Interface to a handler of events emitted. */ +export interface L2BlockStreamEventHandler { + handleBlockStreamEvent(event: L2BlockStreamEvent): Promise; +} + +export type L2BlockStreamEvent = + | { + type: 'blocks-added'; + /** New blocks added to the chain. */ + blocks: L2Block[]; + } + | { + type: 'chain-pruned'; + /** Last correct block number (new tip of the unproven chain). */ + blockNumber: number; + } + | { + type: 'chain-proven'; + /** New proven block number */ + blockNumber: number; + } + | { + type: 'chain-finalized'; + /** New finalized block number */ + blockNumber: number; + }; diff --git a/yarn-project/circuit-types/src/l2_block_source.ts b/yarn-project/circuit-types/src/l2_block_source.ts index 0d9fc597839..6cc0af03059 100644 --- a/yarn-project/circuit-types/src/l2_block_source.ts +++ b/yarn-project/circuit-types/src/l2_block_source.ts @@ -99,6 +99,11 @@ export interface L2BlockSource { */ isEpochComplete(epochNumber: bigint): Promise; + /** + * Returns the tips of the L2 chain. + */ + getL2Tips(): Promise; + /** * Starts the L2 block source. * @param blockUntilSynced - If true, blocks until the data source has fully synced. @@ -112,3 +117,27 @@ export interface L2BlockSource { */ stop(): Promise; } + +/** + * Identifier for L2 block tags. + * - latest: Latest block pushed to L1. + * - proven: Proven block on L1. + * - finalized: Proven block on a finalized L1 block (not implemented, set to proven for now). + */ +export type L2BlockTag = 'latest' | 'proven' | 'finalized'; + +/** Tips of the L2 chain. */ +export type L2Tips = Record; + +/** Identifies a block by number and hash. */ +export type L2BlockId = + | { + number: 0; + hash: undefined; + } + | { + /** L2 block number. */ + number: number; + /** L2 block hash. */ + hash: string; + }; diff --git a/yarn-project/end-to-end/src/composed/integration_l1_publisher.test.ts b/yarn-project/end-to-end/src/composed/integration_l1_publisher.test.ts index 48d58ea0d16..2d06056b408 100644 --- a/yarn-project/end-to-end/src/composed/integration_l1_publisher.test.ts +++ b/yarn-project/end-to-end/src/composed/integration_l1_publisher.test.ts @@ -153,16 +153,14 @@ describe('L1Publisher integration', () => { client: publicClient, }); - const tmpStore = openTmpStore(); builderDb = await NativeWorldStateService.tmp(EthAddress.fromString(rollupAddress)); blockSource = mock(); blockSource.getBlocks.mockResolvedValue([]); const worldStateConfig: WorldStateConfig = { worldStateBlockCheckIntervalMS: 10000, - l2QueueSize: 10, worldStateProvenBlocksOnly: false, }; - worldStateSynchronizer = new ServerWorldStateSynchronizer(tmpStore, builderDb, blockSource, worldStateConfig); + worldStateSynchronizer = new ServerWorldStateSynchronizer(builderDb, blockSource, worldStateConfig); await worldStateSynchronizer.start(); fork = await worldStateSynchronizer.fork(); builder = new LightweightBlockBuilder(fork, new NoopTelemetryClient()); diff --git a/yarn-project/end-to-end/src/e2e_synching.test.ts b/yarn-project/end-to-end/src/e2e_synching.test.ts index dde10ef5583..1a2952b30bb 100644 --- a/yarn-project/end-to-end/src/e2e_synching.test.ts +++ b/yarn-project/end-to-end/src/e2e_synching.test.ts @@ -57,6 +57,7 @@ import { SchnorrHardcodedAccountContract, SpamContract, TokenContract } from '@a import { type PXEService } from '@aztec/pxe'; import { L1Publisher } from '@aztec/sequencer-client'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; +import { createWorldStateSynchronizer } from '@aztec/world-state'; import * as fs from 'fs'; import { getContract } from 'viem'; @@ -540,6 +541,10 @@ describe('e2e_synching', () => { const archiver = await createArchiver(opts.config!); const pendingBlockNumber = await rollup.read.getPendingBlockNumber(); + const worldState = await createWorldStateSynchronizer(opts.config!, archiver, new NoopTelemetryClient()); + await worldState.start(); + expect(await worldState.getLatestBlockNumber()).toEqual(Number(pendingBlockNumber)); + // We prune the last token and schnorr contract const assumeProvenThrough = pendingBlockNumber - 2n; await rollup.write.setAssumeProvenThroughBlockNumber([assumeProvenThrough]); @@ -589,7 +594,16 @@ describe('e2e_synching', () => { expect(await archiver.getLogs(blockTip.number, 1, t)).toEqual([]); }); + // Check world state reverted as well + expect(await worldState.getLatestBlockNumber()).toEqual(Number(assumeProvenThrough)); + const worldStateLatestBlockHash = await worldState.getL2BlockHash(Number(assumeProvenThrough)); + const archiverLatestBlockHash = await archiver + .getBlockHeader(Number(assumeProvenThrough)) + .then(b => b?.hash()); + expect(worldStateLatestBlockHash).toEqual(archiverLatestBlockHash?.toString()); + await archiver.stop(); + await worldState.stop(); }, ASSUME_PROVEN_THROUGH, ); diff --git a/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts b/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts index 659443a1445..13152cf557d 100644 --- a/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts +++ b/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts @@ -114,7 +114,6 @@ export async function createValidatorConfig( maxTxsPerBlock: config.maxTxsPerBlock, p2pEnabled: true, blockCheckIntervalMS: 1000, - l2QueueSize: 1, transactionProtocol: '', dataDirectory, bootstrapNodes: bootstrapNodeEnr ? [bootstrapNodeEnr] : [], diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 2e001882f2e..e66dc17d510 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -134,8 +134,8 @@ export type EnvVar = | 'VALIDATOR_PRIVATE_KEY' | 'VERSION' | 'WS_BLOCK_CHECK_INTERVAL_MS' - | 'WS_L2_BLOCK_QUEUE_SIZE' | 'WS_PROVEN_BLOCKS_ONLY' + | 'WS_BLOCK_REQUEST_BATCH_SIZE' | 'VERIFIER_VIEM_POLLING_INTERVAL_MS' | 'L1_READER_VIEM_POLLING_INTERVAL_MS' | 'PROVER_VIEM_POLLING_INTERVAL_MS' diff --git a/yarn-project/foundation/src/error/index.ts b/yarn-project/foundation/src/error/index.ts index 825e2bc9378..36d0783d188 100644 --- a/yarn-project/foundation/src/error/index.ts +++ b/yarn-project/foundation/src/error/index.ts @@ -3,14 +3,20 @@ * This custom error class extends the built-in Error class in JavaScript and * can be used to handle cases where a process or task is terminated before completion. */ -export class InterruptError extends Error {} +export class InterruptError extends Error { + public override readonly name = 'InterruptError'; +} /** * An error thrown when an action times out. */ -export class TimeoutError extends Error {} +export class TimeoutError extends Error { + public override readonly name = 'TimeoutError'; +} /** * Represents an error thrown when an operation is aborted. */ -export class AbortError extends Error {} +export class AbortError extends Error { + public override readonly name = 'AbortError'; +} diff --git a/yarn-project/foundation/src/promise/running-promise.test.ts b/yarn-project/foundation/src/promise/running-promise.test.ts new file mode 100644 index 00000000000..f1ac79297b7 --- /dev/null +++ b/yarn-project/foundation/src/promise/running-promise.test.ts @@ -0,0 +1,44 @@ +import { sleep } from '../sleep/index.js'; +import { RunningPromise } from './running-promise.js'; + +describe('RunningPromise', () => { + let runningPromise: RunningPromise; + let counter: number; + let fn: () => Promise; + + beforeEach(() => { + counter = 0; + fn = async () => { + counter++; + await sleep(100); + }; + runningPromise = new RunningPromise(fn, 50); + }); + + afterEach(async () => { + await runningPromise.stop(); + }); + + describe('trigger', () => { + it('immediately runs the function when not running and awaits for completion', async () => { + await runningPromise.trigger(); + expect(counter).toEqual(1); + }); + + it('immediately runs the function if sleeping', async () => { + runningPromise.start(); + await sleep(110); + expect(counter).toEqual(1); + await runningPromise.trigger(); + expect(counter).toEqual(2); + }); + + it('waits for current run to finish before triggering', async () => { + runningPromise.start(); + await sleep(10); + expect(counter).toEqual(1); + await runningPromise.trigger(); + expect(counter).toEqual(2); + }); + }); +}); diff --git a/yarn-project/foundation/src/promise/running-promise.ts b/yarn-project/foundation/src/promise/running-promise.ts index 5a2ef071f27..47f7d1e643d 100644 --- a/yarn-project/foundation/src/promise/running-promise.ts +++ b/yarn-project/foundation/src/promise/running-promise.ts @@ -1,4 +1,5 @@ import { InterruptibleSleep } from '../sleep/index.js'; +import { type PromiseWithResolvers, promiseWithResolvers } from './utils.js'; /** * RunningPromise is a utility class that helps manage the execution of an asynchronous function @@ -9,6 +10,7 @@ export class RunningPromise { private running = false; private runningPromise = Promise.resolve(); private interruptibleSleep = new InterruptibleSleep(); + private requested: PromiseWithResolvers | undefined = undefined; constructor(private fn: () => void | Promise, private pollingIntervalMS = 10000) {} @@ -20,8 +22,19 @@ export class RunningPromise { const poll = async () => { while (this.running) { + const hasRequested = this.requested !== undefined; await this.fn(); - await this.interruptibleSleep.sleep(this.pollingIntervalMS); + + // If an immediate run had been requested *before* the function started running, resolve the request. + if (hasRequested) { + this.requested!.resolve(); + this.requested = undefined; + } + + // If no immediate run was requested, sleep for the polling interval. + if (this.requested === undefined) { + await this.interruptibleSleep.sleep(this.pollingIntervalMS); + } } }; this.runningPromise = poll(); @@ -45,6 +58,24 @@ export class RunningPromise { return this.running; } + /** + * Triggers an immediate run of the function, bypassing the polling interval. + * If the function is currently running, it will be allowed to continue and then called again immediately. + */ + public async trigger() { + if (!this.running) { + return this.fn(); + } + + let requested = this.requested; + if (!requested) { + requested = promiseWithResolvers(); + this.requested = requested; + this.interruptibleSleep.interrupt(); + } + await requested!.promise; + } + /** * Updates the polling interval. The new interval will take effect after the next poll. * @param pollingIntervalMS The polling interval in milliseconds. diff --git a/yarn-project/foundation/src/queue/base_memory_queue.ts b/yarn-project/foundation/src/queue/base_memory_queue.ts index 4677aa8e0b3..cd92dbcb85d 100644 --- a/yarn-project/foundation/src/queue/base_memory_queue.ts +++ b/yarn-project/foundation/src/queue/base_memory_queue.ts @@ -24,6 +24,14 @@ export abstract class BaseMemoryQueue { return this.items.length; } + /** + * Returns next item within the queue, or undefined if the queue is empty. Does not block. + * @returns The next item in the queue. + */ + public getImmediate(): T | undefined { + return this.items.get(); + } + /** * Returns next item within the queue, or blocks until an item has been put into the queue. * diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index dae29f25505..ae0adba6a2e 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -1,4 +1,4 @@ -import { MockBlockSource } from '@aztec/archiver/test'; +import { MockL2BlockSource } from '@aztec/archiver/test'; import { mockEpochProofQuote, mockTx } from '@aztec/circuit-types'; import { retryUntil } from '@aztec/foundation/retry'; import { type AztecKVStore } from '@aztec/kv-store'; @@ -26,7 +26,7 @@ describe('In-Memory P2P Client', () => { let attestationPool: Mockify; let epochProofQuotePool: Mockify; let mempools: MemPools; - let blockSource: MockBlockSource; + let blockSource: MockL2BlockSource; let p2pService: Mockify; let kvStore: AztecKVStore; let client: P2PClient; @@ -67,7 +67,8 @@ describe('In-Memory P2P Client', () => { deleteQuotesToEpoch: jest.fn(), }; - blockSource = new MockBlockSource(); + blockSource = new MockL2BlockSource(); + blockSource.createBlocks(100); mempools = { txPool, diff --git a/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts b/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts index eccf288e134..b91aa6aa703 100644 --- a/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/p2p_client.integration.test.ts @@ -1,5 +1,5 @@ // An integration test for the p2p client to test req resp protocols -import { MockBlockSource } from '@aztec/archiver/test'; +import { MockL2BlockSource } from '@aztec/archiver/test'; import { type ClientProtocolCircuitVerifier, type WorldStateSynchronizer, mockTx } from '@aztec/circuit-types'; import { EthAddress } from '@aztec/circuits.js'; import { createDebugLogger } from '@aztec/foundation/log'; @@ -49,7 +49,7 @@ describe('Req Resp p2p client integration', () => { let txPool: Mockify; let attestationPool: Mockify; let epochProofQuotePool: Mockify; - let blockSource: MockBlockSource; + let blockSource: MockL2BlockSource; let kvStore: AztecKVStore; let worldStateSynchronizer: WorldStateSynchronizer; let proofVerifier: ClientProtocolCircuitVerifier; @@ -142,7 +142,9 @@ describe('Req Resp p2p client integration', () => { deleteQuotesToEpoch: jest.fn(), }; - blockSource = new MockBlockSource(); + blockSource = new MockL2BlockSource(); + blockSource.createBlocks(100); + proofVerifier = alwaysTrueVerifier ? new AlwaysTrueCircuitVerifier() : new AlwaysFalseCircuitVerifier(); kvStore = openTmpStore(); const deps = { diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 4d19edf9213..a32668f3877 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -115,7 +115,7 @@ describe('prover-node', () => { config = { maxPendingJobs: 3, pollingIntervalMs: 10 }; // World state returns a new mock db every time it is asked to fork - worldState.syncImmediateAndFork.mockImplementation(() => Promise.resolve(mock())); + worldState.fork.mockImplementation(() => Promise.resolve(mock())); worldState.status.mockResolvedValue({ syncedToL2Block: 1, state: WorldStateRunningState.RUNNING }); // Publisher returns its sender address @@ -178,14 +178,6 @@ describe('prover-node', () => { expect(jobs[0].epochNumber).toEqual(10n); }); - it('fails to start proving if world state is synced past the first block in the epoch', async () => { - // This test will probably be no longer necessary once we have the proper world state - worldState.status.mockResolvedValue({ syncedToL2Block: 21, state: WorldStateRunningState.RUNNING }); - await proverNode.handleClaim(claim); - - expect(jobs.length).toEqual(0); - }); - it('does not prove the same epoch twice', async () => { await proverNode.handleClaim(claim); await proverNode.handleClaim(claim); diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index be86b31e398..02513db5219 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -228,13 +228,10 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler { const fromBlock = blocks[0].number; const toBlock = blocks.at(-1)!.number; - if ((await this.worldState.status()).syncedToL2Block >= fromBlock) { - throw new Error(`Cannot create proving job for block ${fromBlock} as it is behind the current world state`); - } - // Fast forward world state to right before the target block and get a fork this.log.verbose(`Creating proving job for epoch ${epochNumber} for block range ${fromBlock} to ${toBlock}`); - const db = await this.worldState.syncImmediateAndFork(fromBlock - 1); + await this.worldState.syncImmediate(fromBlock - 1); + const db = await this.worldState.fork(fromBlock - 1); // Create a processor using the forked world state const publicProcessorFactory = new PublicProcessorFactory( diff --git a/yarn-project/world-state/package.json b/yarn-project/world-state/package.json index 246dbe0a359..ef8a0d61a86 100644 --- a/yarn-project/world-state/package.json +++ b/yarn-project/world-state/package.json @@ -73,6 +73,7 @@ "tslib": "^2.4.0" }, "devDependencies": { + "@aztec/archiver": "workspace:^", "@jest/globals": "^29.5.0", "@types/bindings": "^1.5.5", "@types/jest": "^29.5.0", diff --git a/yarn-project/world-state/src/native/message.ts b/yarn-project/world-state/src/native/message.ts index 0fed942a3ad..64d0e0b3faa 100644 --- a/yarn-project/world-state/src/native/message.ts +++ b/yarn-project/world-state/src/native/message.ts @@ -85,8 +85,11 @@ interface WithTreeId { } export interface WorldStateStatus { + /** Last block number that can still be unwound. */ unfinalisedBlockNumber: bigint; + /** Last block number that is finalised and cannot be unwound. */ finalisedBlockNumber: bigint; + /** Oldest block still available for historical queries and forks. */ oldestHistoricalBlock: bigint; } diff --git a/yarn-project/world-state/src/native/native_world_state.test.ts b/yarn-project/world-state/src/native/native_world_state.test.ts index 308a274a83d..3892d3f1968 100644 --- a/yarn-project/world-state/src/native/native_world_state.test.ts +++ b/yarn-project/world-state/src/native/native_world_state.test.ts @@ -6,8 +6,8 @@ import { mkdtemp, rm } from 'fs/promises'; import { tmpdir } from 'os'; import { join } from 'path'; +import { assertSameState, compareChains, mockBlock } from '../test/utils.js'; import { NativeWorldStateService } from './native_world_state.js'; -import { assertSameState, compareChains, mockBlock } from './test_util.js'; describe('NativeWorldState', () => { let dataDir: string; diff --git a/yarn-project/world-state/src/native/native_world_state_cmp.test.ts b/yarn-project/world-state/src/native/native_world_state_cmp.test.ts index 836dc246c8e..803ea9f28e9 100644 --- a/yarn-project/world-state/src/native/native_world_state_cmp.test.ts +++ b/yarn-project/world-state/src/native/native_world_state_cmp.test.ts @@ -16,9 +16,9 @@ import { mkdtemp, rm } from 'fs/promises'; import { tmpdir } from 'os'; import { join } from 'path'; +import { mockBlock } from '../test/utils.js'; import { MerkleTrees } from '../world-state-db/merkle_trees.js'; import { NativeWorldStateService } from './native_world_state.js'; -import { mockBlock } from './test_util.js'; jest.setTimeout(60_000); diff --git a/yarn-project/world-state/src/native/native_world_state_instance.ts b/yarn-project/world-state/src/native/native_world_state_instance.ts index f9b8c675ed5..e1dd6c88d6a 100644 --- a/yarn-project/world-state/src/native/native_world_state_instance.ts +++ b/yarn-project/world-state/src/native/native_world_state_instance.ts @@ -155,6 +155,10 @@ export class NativeWorldState implements NativeWorldStateInstance { data['blockNumber'] = body.blockNumber; } + if ('toBlockNumber' in body) { + data['toBlockNumber'] = body.toBlockNumber; + } + if ('leafIndex' in body) { data['leafIndex'] = body.leafIndex; } diff --git a/yarn-project/world-state/src/synchronizer/config.ts b/yarn-project/world-state/src/synchronizer/config.ts index bfe762b23fd..92f126bbc9b 100644 --- a/yarn-project/world-state/src/synchronizer/config.ts +++ b/yarn-project/world-state/src/synchronizer/config.ts @@ -5,11 +5,11 @@ export interface WorldStateConfig { /** The frequency in which to check. */ worldStateBlockCheckIntervalMS: number; - /** Size of queue of L2 blocks to store. */ - l2QueueSize: number; - /** Whether to follow only the proven chain. */ worldStateProvenBlocksOnly: boolean; + + /** Size of the batch for each get-blocks request from the synchronizer to the archiver. */ + worldStateBlockRequestBatchSize?: number; } export const worldStateConfigMappings: ConfigMappingsType = { @@ -19,17 +19,16 @@ export const worldStateConfigMappings: ConfigMappingsType = { defaultValue: 100, description: 'The frequency in which to check.', }, - l2QueueSize: { - env: 'WS_L2_BLOCK_QUEUE_SIZE', - parseEnv: (val: string) => +val, - defaultValue: 1000, - description: 'Size of queue of L2 blocks to store.', - }, worldStateProvenBlocksOnly: { env: 'WS_PROVEN_BLOCKS_ONLY', description: 'Whether to follow only the proven chain.', ...booleanConfigHelper(), }, + worldStateBlockRequestBatchSize: { + env: 'WS_BLOCK_REQUEST_BATCH_SIZE', + parseEnv: (val: string | undefined) => (val ? +val : undefined), + description: 'Size of the batch for each get-blocks request from the synchronizer to the archiver.', + }, }; /** diff --git a/yarn-project/world-state/src/synchronizer/factory.ts b/yarn-project/world-state/src/synchronizer/factory.ts index 48689d126a8..0e15ac07fd5 100644 --- a/yarn-project/world-state/src/synchronizer/factory.ts +++ b/yarn-project/world-state/src/synchronizer/factory.ts @@ -2,6 +2,7 @@ import { type L1ToL2MessageSource, type L2BlockSource } from '@aztec/circuit-typ import { createDebugLogger } from '@aztec/foundation/log'; import { type DataStoreConfig, createStore } from '@aztec/kv-store/utils'; import { type TelemetryClient } from '@aztec/telemetry-client'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { NativeWorldStateService } from '../native/native_world_state.js'; import { MerkleTrees } from '../world-state-db/merkle_trees.js'; @@ -13,10 +14,16 @@ export async function createWorldStateSynchronizer( l2BlockSource: L2BlockSource & L1ToL2MessageSource, client: TelemetryClient, ) { - const store = await createStore('world-state', config, createDebugLogger('aztec:world-state:lmdb')); + const merkleTrees = await createWorldState(config, client); + return new ServerWorldStateSynchronizer(merkleTrees, l2BlockSource, config); +} - const merkleTrees = process.env.USE_LEGACY_WORLD_STATE - ? await MerkleTrees.new(store, client) +export async function createWorldState(config: DataStoreConfig, client: TelemetryClient = new NoopTelemetryClient()) { + const merkleTrees = ['true', '1'].includes(process.env.USE_LEGACY_WORLD_STATE ?? '') + ? await MerkleTrees.new( + await createStore('world-state', config, createDebugLogger('aztec:world-state:lmdb')), + client, + ) : config.dataDirectory ? await NativeWorldStateService.new(config.l1Contracts.rollupAddress, config.dataDirectory) : await NativeWorldStateService.tmp( @@ -24,5 +31,5 @@ export async function createWorldStateSynchronizer( !['true', '1'].includes(process.env.DEBUG_WORLD_STATE!), ); - return new ServerWorldStateSynchronizer(store, merkleTrees, l2BlockSource, config); + return merkleTrees; } diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.test.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.test.ts index ac999ecdd65..53875ca6e8f 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.test.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.test.ts @@ -1,423 +1,220 @@ -import { type L1ToL2MessageSource, L2Block, type L2BlockSource, WorldStateRunningState } from '@aztec/circuit-types'; -import { Fr } from '@aztec/circuits.js'; +import { + type L1ToL2MessageSource, + L2Block, + type L2BlockSource, + type L2BlockStream, + type MerkleTreeReadOperations, + WorldStateRunningState, +} from '@aztec/circuit-types'; +import { Fr, MerkleTreeCalculator } from '@aztec/circuits.js'; import { L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js/constants'; +import { times } from '@aztec/foundation/collection'; import { randomInt } from '@aztec/foundation/crypto'; -import { createDebugLogger } from '@aztec/foundation/log'; -import { sleep } from '@aztec/foundation/sleep'; -import { type AztecKVStore } from '@aztec/kv-store'; -import { openTmpStore } from '@aztec/kv-store/utils'; -import { SHA256Trunc, StandardTree } from '@aztec/merkle-tree'; +import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; +import { SHA256Trunc } from '@aztec/merkle-tree'; import { jest } from '@jest/globals'; -import { mock } from 'jest-mock-extended'; +import { type MockProxy, mock } from 'jest-mock-extended'; -import { type MerkleTreeAdminDatabase, type MerkleTrees, type WorldStateConfig } from '../index.js'; +import { type MerkleTreeAdminDatabase, type WorldStateConfig } from '../index.js'; import { ServerWorldStateSynchronizer } from './server_world_state_synchronizer.js'; -const LATEST_BLOCK_NUMBER = 5; -const getLatestBlockNumber = () => Promise.resolve(LATEST_BLOCK_NUMBER); -let nextBlocks: L2Block[] = []; -const consumeNextBlocks = () => { - const blocks = nextBlocks; - nextBlocks = []; - return Promise.resolve(blocks); -}; - -const log = createDebugLogger('aztec:server_world_state_synchronizer_test'); - -describe('server_world_state_synchronizer', () => { +describe('ServerWorldStateSynchronizer', () => { jest.setTimeout(30_000); - let db: AztecKVStore; + let log: DebugLogger; + let l1ToL2Messages: Fr[]; let inHash: Buffer; - const blockAndMessagesSource = mock({ - getBlockNumber: jest.fn(getLatestBlockNumber), - getBlocks: jest.fn(consumeNextBlocks), - getL1ToL2Messages: jest.fn(() => Promise.resolve(l1ToL2Messages)), - }); - - const merkleTreeDb = mock({ - handleL2BlockAndMessages: jest.fn(() => - Promise.resolve({ unfinalisedBlockNumber: 0n, finalisedBlockNumber: 0n, oldestHistoricalBlock: 0n }), - ), - }); + let blockAndMessagesSource: MockProxy; + let merkleTreeDb: MockProxy; + let merkleTreeRead: MockProxy; + let l2BlockStream: MockProxy; - const performInitialSync = async (server: ServerWorldStateSynchronizer) => { - // test initial state - let status = await server.status(); - expect(status.syncedToL2Block).toEqual(0); - expect(status.state).toEqual(WorldStateRunningState.IDLE); + let server: TestWorldStateSynchronizer; + let latestHandledBlockNumber: number; - // create the initial blocks - nextBlocks = Array(LATEST_BLOCK_NUMBER) - .fill(0) - .map((_, index: number) => getRandomBlock(index + 1)); + const LATEST_BLOCK_NUMBER = 5; - // start the sync process and await it - await server.start().catch(err => log.error('Sync not completed: ', err)); + beforeAll(() => { + log = createDebugLogger('aztec:world-state:test:server_world_state_synchronizer'); - status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); - }; + // Seed l1 to l2 msgs + l1ToL2Messages = times(randomInt(2 ** L1_TO_L2_MSG_SUBTREE_HEIGHT), Fr.random); - const performSubsequentSync = async (server: ServerWorldStateSynchronizer, count: number) => { - // test initial state - let status = await server.status(); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); - expect(status.state).toEqual(WorldStateRunningState.IDLE); + // Compute inHash for verification + inHash = new MerkleTreeCalculator( + L1_TO_L2_MSG_SUBTREE_HEIGHT, + Buffer.alloc(32), + new SHA256Trunc().hash, + ).computeTreeRoot(l1ToL2Messages.map(msg => msg.toBuffer())); + }); - // create the initial blocks - nextBlocks = Array(count) - .fill(0) - .map((_, index: number) => getRandomBlock(LATEST_BLOCK_NUMBER + index + 1)); + beforeEach(() => { + blockAndMessagesSource = mock(); + blockAndMessagesSource.getBlockNumber.mockResolvedValue(LATEST_BLOCK_NUMBER); + blockAndMessagesSource.getL1ToL2Messages.mockResolvedValue(l1ToL2Messages); - blockAndMessagesSource.getBlockNumber.mockResolvedValueOnce(LATEST_BLOCK_NUMBER + count); + merkleTreeRead = mock(); - // start the sync process and await it - await server.start().catch(err => log.error('Sync not completed: ', err)); + merkleTreeDb = mock(); + merkleTreeDb.getCommitted.mockReturnValue(merkleTreeRead); + merkleTreeDb.handleL2BlockAndMessages.mockImplementation((l2Block: L2Block) => { + latestHandledBlockNumber = l2Block.number; + return Promise.resolve({ unfinalisedBlockNumber: 0n, finalisedBlockNumber: 0n, oldestHistoricalBlock: 0n }); + }); + latestHandledBlockNumber = 0; - status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + count); - }; + l2BlockStream = mock(); - const createSynchronizer = (blockCheckInterval = 100) => { - const worldStateConfig: WorldStateConfig = { - worldStateBlockCheckIntervalMS: blockCheckInterval, - l2QueueSize: 1000, + // Note that worldStateProvenBlocksOnly is the only config value that is used by the synchronizer itself + // Others are relayed to the blockstream, which is mocked in this test suite + const config: WorldStateConfig = { + worldStateBlockCheckIntervalMS: 100, worldStateProvenBlocksOnly: false, }; - return new ServerWorldStateSynchronizer( - db, - merkleTreeDb as any as MerkleTrees, - blockAndMessagesSource, - worldStateConfig, - ); - }; - - const getRandomBlock = (blockNumber: number) => { - return L2Block.random(blockNumber, 4, 2, 3, 2, 1, inHash); - }; - - beforeAll(async () => { - const numMessages = randomInt(2 ** L1_TO_L2_MSG_SUBTREE_HEIGHT); - l1ToL2Messages = Array(numMessages) - .fill(0) - .map(() => Fr.random()); - const tree = new StandardTree( - openTmpStore(true), - new SHA256Trunc(), - 'empty_subtree_in_hash', - L1_TO_L2_MSG_SUBTREE_HEIGHT, - 0n, - Fr, - ); - await tree.appendLeaves(l1ToL2Messages); - inHash = tree.getRoot(true); + server = new TestWorldStateSynchronizer(merkleTreeDb, blockAndMessagesSource, config, l2BlockStream); }); - beforeEach(() => { - db = openTmpStore(); + afterEach(async () => { + await server.stop(); }); - it('can be constructed', () => { - expect(createSynchronizer()).toBeTruthy(); - }); + const pushBlocks = async (from: number, to: number) => { + await server.handleBlockStreamEvent({ + type: 'blocks-added', + blocks: times(to - from + 1, i => L2Block.random(i + from, 4, 2, 3, 2, 1, inHash)), + }); + server.latest = to; + }; - it('updates sync progress', async () => { - const server = createSynchronizer(); + const expectServerStatus = async (state: WorldStateRunningState, blockNumber: number) => { + await expect(server.status()).resolves.toEqual(expect.objectContaining({ state })); + expect(latestHandledBlockNumber).toEqual(blockNumber); + }; + it('updates sync progress', async () => { // test initial state - let status = await server.status(); - expect(status.syncedToL2Block).toEqual(0); - expect(status.state).toEqual(WorldStateRunningState.IDLE); - - // create an initial block - let currentBlockNumber = 0; - nextBlocks = [getRandomBlock(currentBlockNumber + 1)]; + await expectServerStatus(WorldStateRunningState.IDLE, 0); // start the sync process but don't await - server.start().catch(err => log.error('Sync not completed: ', err)); - - // now setup a loop to monitor the sync progress and push new blocks in - while (currentBlockNumber <= LATEST_BLOCK_NUMBER) { - status = await server.status(); - expect( - status.syncedToL2Block >= currentBlockNumber || status.syncedToL2Block <= currentBlockNumber + 1, - ).toBeTruthy(); - if (status.syncedToL2Block === LATEST_BLOCK_NUMBER) { - break; - } - expect( - status.state >= WorldStateRunningState.IDLE || status.state <= WorldStateRunningState.SYNCHING, - ).toBeTruthy(); - if (status.syncedToL2Block === currentBlockNumber) { - await sleep(100); - continue; - } - currentBlockNumber++; - nextBlocks = [getRandomBlock(currentBlockNumber + 1)]; - } - - // check the status again, should be fully synced - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.RUNNING); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); + let syncFinished = false; + server + .start() + .then(() => (syncFinished = true)) + .catch(err => log.error('Sync not completed: ', err)); + + // push a bunch of blocks + await pushBlocks(1, 3); + await expectServerStatus(WorldStateRunningState.SYNCHING, 3); + expect(syncFinished).toBeFalsy(); + + // and push the remaining ones + await pushBlocks(4, 5); + await expectServerStatus(WorldStateRunningState.RUNNING, 5); + expect(syncFinished).toBeTruthy(); // stop the synchronizer await server.stop(); - // check the final status - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); - }); - - it('enables blocking until synced', async () => { - const server = createSynchronizer(); - let currentBlockNumber = 0; - - const newBlocks = async () => { - while (currentBlockNumber <= LATEST_BLOCK_NUMBER) { - await sleep(100); - nextBlocks = [...nextBlocks, getRandomBlock(++currentBlockNumber)]; - } - }; - - // kick off the background queueing of blocks - const newBlockPromise = newBlocks(); - - // kick off the synching - const syncPromise = server.start(); - - // await the synching - await syncPromise; - - await newBlockPromise; - - let status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.RUNNING); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); - await server.stop(); - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); + // and check the final status + await expectServerStatus(WorldStateRunningState.STOPPED, 5); + expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(5); }); it('handles multiple calls to start', async () => { - const server = createSynchronizer(); - let currentBlockNumber = 0; - - const newBlocks = async () => { - while (currentBlockNumber < LATEST_BLOCK_NUMBER) { - await sleep(100); - const newBlock = getRandomBlock(++currentBlockNumber); - nextBlocks = [...nextBlocks, newBlock]; - } - }; + // start the sync and push 5 blocks + void server.start(); + await pushBlocks(1, 5); - // kick off the background queueing of blocks - const newBlockPromise = newBlocks(); - - // kick off the synching + // starting the sync again should not trigger more operations await server.start(); - - // call start again, should get back the same promise await server.start(); - // wait until the block production has finished - await newBlockPromise; - - await server.stop(); + await expectServerStatus(WorldStateRunningState.RUNNING, 5); + expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(5); }); it('immediately syncs if no new blocks', async () => { - const server = createSynchronizer(); - blockAndMessagesSource.getBlockNumber.mockImplementationOnce(() => { - return Promise.resolve(0); - }); - - // kick off the synching - const syncPromise = server.start(); + blockAndMessagesSource.getBlockNumber.mockResolvedValue(0); - // it should already be synced, no need to push new blocks - await syncPromise; - - const status = await server.status(); - expect(status.state).toBe(WorldStateRunningState.RUNNING); - expect(status.syncedToL2Block).toBe(0); - await server.stop(); + await server.start(); + await expectServerStatus(WorldStateRunningState.RUNNING, 0); }); - it("can't be started if already stopped", async () => { - const server = createSynchronizer(); - blockAndMessagesSource.getBlockNumber.mockImplementationOnce(() => { - return Promise.resolve(0); - }); + it('cannot be started if already stopped', async () => { + blockAndMessagesSource.getBlockNumber.mockResolvedValue(0); - // kick off the synching - const syncPromise = server.start(); - await syncPromise; + await server.start(); await server.stop(); await expect(server.start()).rejects.toThrow(); }); - it('adds the received L2 blocks and messages', async () => { - merkleTreeDb.handleL2BlockAndMessages.mockClear(); - const server = createSynchronizer(); - const totalBlocks = LATEST_BLOCK_NUMBER + 1; - nextBlocks = Array(totalBlocks) - .fill(0) - .map((_, index) => getRandomBlock(index)); - // sync the server - await server.start(); - - expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(totalBlocks); - await server.stop(); - }); - it('can immediately sync to latest', async () => { - const server = createSynchronizer(10000); - - await performInitialSync(server); - - // the server should now be asleep for a long time - // we will add a new block and force an immediate sync - nextBlocks = [getRandomBlock(LATEST_BLOCK_NUMBER + 1)]; - await server.syncImmediate(); - - let status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 1); + void server.start(); + await pushBlocks(1, 5); - nextBlocks = [getRandomBlock(LATEST_BLOCK_NUMBER + 2), getRandomBlock(LATEST_BLOCK_NUMBER + 3)]; + l2BlockStream.sync.mockImplementation(() => pushBlocks(6, 7)); await server.syncImmediate(); - status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 3); - - // stop the synchronizer - await server.stop(); - - // check the final status - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 3); + await expectServerStatus(WorldStateRunningState.RUNNING, 7); + expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(7); }); it('can immediately sync to a minimum block number', async () => { - const server = createSynchronizer(10000); - - await performInitialSync(server); - - // the server should now be asleep for a long time - // we will add 20 blocks and force a sync to at least LATEST + 5 - nextBlocks = Array(20) - .fill(0) - .map((_, index: number) => getRandomBlock(index + 1 + LATEST_BLOCK_NUMBER)); - await server.syncImmediate(LATEST_BLOCK_NUMBER + 5); + void server.start(); + await pushBlocks(1, 5); - // we should have synced all of the blocks - let status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 20); + l2BlockStream.sync.mockImplementation(() => pushBlocks(6, 8)); + await server.syncImmediate(7); - // stop the synchronizer - await server.stop(); - - // check the final status - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 20); + await expectServerStatus(WorldStateRunningState.RUNNING, 8); + expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(8); }); - it('can immediately sync to a minimum block in the past', async () => { - const server = createSynchronizer(10000); - - await performInitialSync(server); - // syncing to a block in the past should succeed - await server.syncImmediate(LATEST_BLOCK_NUMBER - 1); - // syncing to the current block should succeed - await server.syncImmediate(LATEST_BLOCK_NUMBER); - - // we should have synced all of the blocks - let status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER); + it('sync returns immediately if block was already synced', async () => { + void server.start(); + await pushBlocks(1, 5); - // stop the synchronizer - await server.stop(); + await server.syncImmediate(4); + expect(l2BlockStream.sync).not.toHaveBeenCalled(); - // check the final status - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER); + await expectServerStatus(WorldStateRunningState.RUNNING, 5); + expect(merkleTreeDb.handleL2BlockAndMessages).toHaveBeenCalledTimes(5); }); it('throws if you try to sync to an unavailable block', async () => { - const server = createSynchronizer(); - - await performInitialSync(server); - - // the server should now be asleep for a long time - // we will add 2 blocks and force a sync to at least LATEST + 5 - nextBlocks = Array(2) - .fill(0) - .map((_, index: number) => getRandomBlock(index + 1 + LATEST_BLOCK_NUMBER)); - await expect(server.syncImmediate(LATEST_BLOCK_NUMBER + 5)).rejects.toThrow( - `Unable to sync to block number ${LATEST_BLOCK_NUMBER + 5}, currently synced to block ${LATEST_BLOCK_NUMBER + 2}`, - ); - - let status = await server.status(); - expect(status.syncedToL2Block).toBe(LATEST_BLOCK_NUMBER + 2); - - // stop the synchronizer - await server.stop(); + void server.start(); + await pushBlocks(1, 5); - // check the final status - status = await server.status(); - expect(status.state).toEqual(WorldStateRunningState.STOPPED); - expect(status.syncedToL2Block).toEqual(LATEST_BLOCK_NUMBER + 2); + await expect(server.syncImmediate(8)).rejects.toThrow(/unable to sync/i); }); it('throws if you try to immediate sync when not running', async () => { - const server = createSynchronizer(10000); - - // test initial state - const status = await server.status(); - expect(status.syncedToL2Block).toEqual(0); - expect(status.state).toEqual(WorldStateRunningState.IDLE); - - // create an initial block - nextBlocks = Array(LATEST_BLOCK_NUMBER) - .fill(0) - .map((_, index: number) => getRandomBlock(index + 1)); - - await expect(server.syncImmediate()).rejects.toThrow(`World State is not running, unable to perform sync`); - }); - - it('restores the last synced block', async () => { - const initialServer = createSynchronizer(10000); - - await performInitialSync(initialServer); - await initialServer.stop(); - - const server = createSynchronizer(10000); - const status = await server.status(); - expect(status).toEqual({ - state: WorldStateRunningState.IDLE, - syncedToL2Block: LATEST_BLOCK_NUMBER, - }); - }); - - it('starts syncing from the last block', async () => { - const initialServer = createSynchronizer(10000); - - await performInitialSync(initialServer); - await initialServer.stop(); - - const server = createSynchronizer(10000); - await performSubsequentSync(server, 2); - await server.stop(); + await expect(server.syncImmediate(3)).rejects.toThrow(/is not running/i); }); }); + +class TestWorldStateSynchronizer extends ServerWorldStateSynchronizer { + public latest = 0; + + constructor( + merkleTrees: MerkleTreeAdminDatabase, + blockAndMessagesSource: L2BlockSource & L1ToL2MessageSource, + worldStateConfig: WorldStateConfig, + private mockBlockStream: L2BlockStream, + ) { + super(merkleTrees, blockAndMessagesSource, worldStateConfig); + } + + protected override createBlockStream(): L2BlockStream { + return this.mockBlockStream; + } + + public override getL2Tips() { + return Promise.resolve({ latest: this.latest, proven: undefined, finalized: undefined }); + } +} diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index 70e1d2137cc..44956ba4da6 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -1,8 +1,13 @@ import { type L1ToL2MessageSource, type L2Block, - L2BlockDownloader, type L2BlockSource, + L2BlockStream, + type L2BlockStreamEvent, + type L2BlockStreamEventHandler, + type L2BlockStreamLocalDataProvider, + type L2BlockTag, + MerkleTreeId, type MerkleTreeReadOperations, type MerkleTreeWriteOperations, WorldStateRunningState, @@ -15,9 +20,7 @@ import { L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js/constants'; import { type Fr } from '@aztec/foundation/fields'; import { createDebugLogger } from '@aztec/foundation/log'; import { promiseWithResolvers } from '@aztec/foundation/promise'; -import { SerialQueue } from '@aztec/foundation/queue'; import { elapsed } from '@aztec/foundation/timer'; -import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store'; import { SHA256Trunc } from '@aztec/merkle-tree'; import { type WorldStateStatus } from '../native/message.js'; @@ -25,39 +28,28 @@ import { type MerkleTreeAdminDatabase } from '../world-state-db/merkle_tree_db.j import { type WorldStateConfig } from './config.js'; /** - * Synchronizes the world state with the L2 blocks from a L2BlockSource. - * The synchronizer will download the L2 blocks from the L2BlockSource and insert the new note hashes into the merkle - * tree. + * Synchronizes the world state with the L2 blocks from a L2BlockSource via a block stream. + * The synchronizer will download the L2 blocks from the L2BlockSource and update the merkle trees. + * Handles chain reorgs via the L2BlockStream. */ -export class ServerWorldStateSynchronizer implements WorldStateSynchronizer { - private latestBlockNumberAtStart = 0; +export class ServerWorldStateSynchronizer + implements WorldStateSynchronizer, L2BlockStreamLocalDataProvider, L2BlockStreamEventHandler +{ + private readonly merkleTreeCommitted: MerkleTreeReadOperations; - // TODO(palla/prover-node): JobQueue, stopping, runningPromise, pausedPromise, pausedResolve - // should all be hidden under a single abstraction. Also, check if we actually need the jobqueue. - private l2BlockDownloader: L2BlockDownloader; - private syncPromise: Promise = Promise.resolve(); - private syncResolve?: () => void = undefined; - private jobQueue = new SerialQueue(); - private stopping = false; - private runningPromise: Promise = Promise.resolve(); - private pausedPromise?: Promise = undefined; - private pausedResolve?: () => void = undefined; + private latestBlockNumberAtStart = 0; private currentState: WorldStateRunningState = WorldStateRunningState.IDLE; - private blockNumber: AztecSingleton; + + private syncPromise = promiseWithResolvers(); + protected blockStream: L2BlockStream | undefined; constructor( - store: AztecKVStore, - private merkleTreeDb: MerkleTreeAdminDatabase, - private l2BlockSource: L2BlockSource & L1ToL2MessageSource, - private config: WorldStateConfig, - private log = createDebugLogger('aztec:world_state'), + private readonly merkleTreeDb: MerkleTreeAdminDatabase, + private readonly l2BlockSource: L2BlockSource & L1ToL2MessageSource, + private readonly config: WorldStateConfig, + private readonly log = createDebugLogger('aztec:world_state'), ) { - this.blockNumber = store.openSingleton('world_state_synch_last_block_number'); - this.l2BlockDownloader = new L2BlockDownloader(l2BlockSource, { - maxQueueSize: config.l2QueueSize, - pollIntervalMS: config.worldStateBlockCheckIntervalMS, - proven: config.worldStateProvenBlocksOnly, - }); + this.merkleTreeCommitted = this.merkleTreeDb.getCommitted(); } public getCommitted(): MerkleTreeReadOperations { @@ -80,86 +72,56 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer { return this.syncPromise; } - // get the current latest block number + // Get the current latest block number this.latestBlockNumberAtStart = await (this.config.worldStateProvenBlocksOnly ? this.l2BlockSource.getProvenBlockNumber() : this.l2BlockSource.getBlockNumber()); - const blockToDownloadFrom = this.currentL2BlockNum + 1; + const blockToDownloadFrom = (await this.getLatestBlockNumber()) + 1; - // if there are blocks to be retrieved, go to a synching state if (blockToDownloadFrom <= this.latestBlockNumberAtStart) { + // If there are blocks to be retrieved, go to a synching state this.setCurrentState(WorldStateRunningState.SYNCHING); - this.syncPromise = new Promise(resolve => { - this.syncResolve = resolve; - }); - this.log.info(`Starting sync from ${blockToDownloadFrom}, latest block ${this.latestBlockNumberAtStart}`); + this.log.verbose(`Starting sync from ${blockToDownloadFrom} to latest block ${this.latestBlockNumberAtStart}`); } else { - // if no blocks to be retrieved, go straight to running + // If no blocks to be retrieved, go straight to running this.setCurrentState(WorldStateRunningState.RUNNING); - this.syncPromise = Promise.resolve(); - this.log.debug( - `Next block ${blockToDownloadFrom} already beyond latest block at ${this.latestBlockNumberAtStart}`, - ); + this.syncPromise.resolve(); + this.log.debug(`Next block ${blockToDownloadFrom} already beyond latest block ${this.latestBlockNumberAtStart}`); } - // start looking for further blocks - const blockProcess = async () => { - while (!this.stopping) { - await this.jobQueue.put(() => this.collectAndProcessBlocks()); - if (this.pausedPromise) { - await this.pausedPromise; - } - } - }; - this.jobQueue.start(); - this.runningPromise = blockProcess(); - this.l2BlockDownloader.start(blockToDownloadFrom); - this.log.info(`Started block downloader from block ${blockToDownloadFrom}`); - return this.syncPromise; + this.blockStream = this.createBlockStream(); + this.blockStream.start(); + this.log.info(`Started world state synchronizer from block ${blockToDownloadFrom}`); + return this.syncPromise.promise; + } + + protected createBlockStream() { + return new L2BlockStream(this.l2BlockSource, this, this, { + proven: this.config.worldStateProvenBlocksOnly, + pollIntervalMS: this.config.worldStateBlockCheckIntervalMS, + batchSize: this.config.worldStateBlockRequestBatchSize, + }); } public async stop() { - this.log.debug('Stopping world state...'); - this.stopping = true; - await this.l2BlockDownloader.stop(); - this.log.debug('Cancelling job queue...'); - await this.jobQueue.cancel(); - this.log.debug('Stopping Merkle trees'); + this.log.debug('Stopping block stream...'); + await this.blockStream?.stop(); + this.log.debug('Stopping merkle trees...'); await this.merkleTreeDb.close(); - this.log.debug('Awaiting promise'); - await this.runningPromise; this.setCurrentState(WorldStateRunningState.STOPPED); - this.log.info(`Stopped`); - } - - private get currentL2BlockNum(): number { - return this.blockNumber.get() ?? 0; + this.log.info(`Stopped world state synchronizer`); } - private async pause() { - this.log.debug('Pausing world state synchronizer'); - ({ promise: this.pausedPromise, resolve: this.pausedResolve } = promiseWithResolvers()); - await this.jobQueue.syncPoint(); - this.log.debug('Paused world state synchronizer'); - } - - private resume() { - if (this.pausedResolve) { - this.log.debug('Resuming world state synchronizer'); - this.pausedResolve(); - this.pausedResolve = undefined; - this.pausedPromise = undefined; - this.log.debug('Resumed world state synchronizer'); - } + public async status(): Promise { + return { + syncedToL2Block: (await this.getL2Tips()).latest, + state: this.currentState, + }; } - public status(): Promise { - const status = { - syncedToL2Block: this.currentL2BlockNum, - state: this.currentState, - } as WorldStateSynchronizerStatus; - return Promise.resolve(status); + public async getLatestBlockNumber() { + return (await this.getL2Tips()).latest; } /** @@ -168,71 +130,80 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer { * @returns A promise that resolves with the block number the world state was synced to */ public async syncImmediate(targetBlockNumber?: number): Promise { - if (this.currentState !== WorldStateRunningState.RUNNING) { - throw new Error(`World State is not running, unable to perform sync`); + if (this.currentState !== WorldStateRunningState.RUNNING || this.blockStream === undefined) { + throw new Error(`World State is not running. Unable to perform sync.`); } - // If we have been given a block number to sync to and we have reached that number then return. - if (targetBlockNumber !== undefined && targetBlockNumber <= this.currentL2BlockNum) { - return this.currentL2BlockNum; + + // If we have been given a block number to sync to and we have reached that number then return + const currentBlockNumber = await this.getLatestBlockNumber(); + if (targetBlockNumber !== undefined && targetBlockNumber <= currentBlockNumber) { + return currentBlockNumber; } - this.log.debug(`World State at ${this.currentL2BlockNum} told to sync to ${targetBlockNumber ?? 'latest'}`); - // ensure any outstanding block updates are completed first - await this.jobQueue.syncPoint(); - - while (true) { - // Check the block number again - if (targetBlockNumber !== undefined && targetBlockNumber <= this.currentL2BlockNum) { - return this.currentL2BlockNum; - } - // Poll for more blocks, requesting even unproven blocks. - const numBlocks = await this.l2BlockDownloader.pollImmediate(targetBlockNumber, false); - this.log.debug(`Block download immediate poll yielded ${numBlocks} blocks`); - if (numBlocks) { - // More blocks were received, process them and go round again - await this.jobQueue.put(() => this.collectAndProcessBlocks()); - continue; - } - // No blocks are available, if we have been given a block number then we can't achieve it - if (targetBlockNumber !== undefined) { - throw new Error( - `Unable to sync to block number ${targetBlockNumber}, currently synced to block ${this.currentL2BlockNum}`, - ); - } - return this.currentL2BlockNum; + this.log.debug(`World State at ${currentBlockNumber} told to sync to ${targetBlockNumber ?? 'latest'}`); + + // Force the block stream to sync against the archiver now + await this.blockStream.sync(); + + // If we have been given a block number to sync to and we have not reached that number then fail + const updatedBlockNumber = await this.getLatestBlockNumber(); + if (targetBlockNumber !== undefined && targetBlockNumber > updatedBlockNumber) { + throw new Error(`Unable to sync to block number ${targetBlockNumber} (last synced is ${updatedBlockNumber})`); } + + return updatedBlockNumber; } - public async syncImmediateAndFork(targetBlockNumber: number): Promise { - try { - await this.pause(); - await this.syncImmediate(targetBlockNumber); - return await this.merkleTreeDb.fork(targetBlockNumber); - } finally { - this.resume(); - } + /** Returns the L2 block hash for a given number. Used by the L2BlockStream for detecting reorgs. */ + public getL2BlockHash(number: number): Promise { + return number === 0 + ? Promise.resolve(this.merkleTreeCommitted.getInitialHeader().hash().toString()) + : this.merkleTreeCommitted.getLeafValue(MerkleTreeId.ARCHIVE, BigInt(number)).then(leaf => leaf?.toString()); } - /** - * Checks for the availability of new blocks and processes them. - */ - private async collectAndProcessBlocks() { - // This request for blocks will timeout after 1 second if no blocks are received - const blocks = await this.l2BlockDownloader.getBlocks(1); - const messagePromises = blocks.map(block => this.l2BlockSource.getL1ToL2Messages(BigInt(block.number))); - const l1ToL2Messages: Fr[][] = await Promise.all(messagePromises); + /** Returns the latest L2 block number for each tip of the chain (latest, proven, finalized). */ + public async getL2Tips(): Promise<{ latest: number } & Partial>> { + const status = await this.merkleTreeDb.getStatus(); + return { + latest: Number(status.unfinalisedBlockNumber), + finalized: Number(status.finalisedBlockNumber), + proven: Number(status.finalisedBlockNumber), // TODO(palla/reorg): Using finalised as proven for now + }; + } - await this.handleL2BlocksAndMessages(blocks, l1ToL2Messages); + /** Handles an event emitted by the block stream. */ + public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + try { + switch (event.type) { + case 'blocks-added': + await this.handleL2Blocks(event.blocks); + break; + case 'chain-pruned': + await this.handleChainPruned(event.blockNumber); + break; + case 'chain-proven': + await this.handleChainProven(event.blockNumber); + break; + case 'chain-finalized': + await this.handleChainFinalized(event.blockNumber); + break; + } + } catch (err) { + this.log.error('Error processing block stream', err); + } } /** * Handles a list of L2 blocks (i.e. Inserts the new note hashes into the merkle tree). * @param l2Blocks - The L2 blocks to handle. - * @param l1ToL2Messages - The L1 to L2 messages for each block. * @returns Whether the block handled was produced by this same node. */ - private async handleL2BlocksAndMessages(l2Blocks: L2Block[], l1ToL2Messages: Fr[][]) { + private async handleL2Blocks(l2Blocks: L2Block[]) { + this.log.verbose(`Handling new L2 blocks from ${l2Blocks[0].number} to ${l2Blocks[l2Blocks.length - 1].number}`); + const messagePromises = l2Blocks.map(block => this.l2BlockSource.getL1ToL2Messages(BigInt(block.number))); + const l1ToL2Messages: Fr[][] = await Promise.all(messagePromises); + for (let i = 0; i < l2Blocks.length; i++) { - const [duration, result] = await elapsed(() => this.handleL2BlockAndMessages(l2Blocks[i], l1ToL2Messages[i])); + const [duration, result] = await elapsed(() => this.handleL2Block(l2Blocks[i], l1ToL2Messages[i])); this.log.verbose(`Handled new L2 block`, { eventName: 'l2-block-handled', duration, @@ -250,26 +221,39 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer { * @param l1ToL2Messages - The L1 to L2 messages for the block. * @returns Whether the block handled was produced by this same node. */ - private async handleL2BlockAndMessages(l2Block: L2Block, l1ToL2Messages: Fr[]): Promise { + private async handleL2Block(l2Block: L2Block, l1ToL2Messages: Fr[]): Promise { // First we check that the L1 to L2 messages hash to the block inHash. // Note that we cannot optimize this check by checking the root of the subtree after inserting the messages // to the real L1_TO_L2_MESSAGE_TREE (like we do in merkleTreeDb.handleL2BlockAndMessages(...)) because that // tree uses pedersen and we don't have access to the converted root. - this.#verifyMessagesHashToInHash(l1ToL2Messages, l2Block.header.contentCommitment.inHash); + this.verifyMessagesHashToInHash(l1ToL2Messages, l2Block.header.contentCommitment.inHash); // If the above check succeeds, we can proceed to handle the block. const result = await this.merkleTreeDb.handleL2BlockAndMessages(l2Block, l1ToL2Messages); - await this.blockNumber.set(l2Block.number); if (this.currentState === WorldStateRunningState.SYNCHING && l2Block.number >= this.latestBlockNumberAtStart) { this.setCurrentState(WorldStateRunningState.RUNNING); - if (this.syncResolve !== undefined) { - this.syncResolve(); - } + this.syncPromise.resolve(); } + return result; } + private async handleChainFinalized(blockNumber: number) { + this.log.verbose(`Chain finalized at block ${blockNumber}`); + await this.merkleTreeDb.setFinalised(BigInt(blockNumber)); + } + + private handleChainProven(blockNumber: number) { + this.log.verbose(`Chain proven at block ${blockNumber}`); + return Promise.resolve(); + } + + private async handleChainPruned(blockNumber: number) { + this.log.info(`Chain pruned to block ${blockNumber}`); + await this.merkleTreeDb.unwindBlocks(BigInt(blockNumber)); + } + /** * Method to set the value of the current state. * @param newState - New state value. @@ -285,7 +269,7 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer { * @param inHash - The inHash of the block. * @throws If the L1 to L2 messages do not hash to the block inHash. */ - #verifyMessagesHashToInHash(l1ToL2Messages: Fr[], inHash: Buffer) { + protected verifyMessagesHashToInHash(l1ToL2Messages: Fr[], inHash: Buffer) { const treeCalculator = new MerkleTreeCalculator( L1_TO_L2_MSG_SUBTREE_HEIGHT, Buffer.alloc(32), diff --git a/yarn-project/world-state/src/test/integration.test.ts b/yarn-project/world-state/src/test/integration.test.ts new file mode 100644 index 00000000000..5b06e4e1c12 --- /dev/null +++ b/yarn-project/world-state/src/test/integration.test.ts @@ -0,0 +1,241 @@ +import { MockPrefilledArchiver } from '@aztec/archiver/test'; +import { type L2Block, MerkleTreeId } from '@aztec/circuit-types'; +import { EthAddress, type Fr } from '@aztec/circuits.js'; +import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; +import { sleep } from '@aztec/foundation/sleep'; +import { type DataStoreConfig } from '@aztec/kv-store/utils'; + +import { jest } from '@jest/globals'; + +import { NativeWorldStateService } from '../native/native_world_state.js'; +import { type WorldStateConfig } from '../synchronizer/config.js'; +import { createWorldState } from '../synchronizer/factory.js'; +import { ServerWorldStateSynchronizer } from '../synchronizer/server_world_state_synchronizer.js'; +import { mockBlocks } from './utils.js'; + +describe('world-state integration', () => { + let rollupAddress: EthAddress; + let archiver: MockPrefilledArchiver; + let db: NativeWorldStateService; + let synchronizer: TestWorldStateSynchronizer; + let config: WorldStateConfig & DataStoreConfig; + let log: DebugLogger; + + let blocks: L2Block[]; + let messages: Fr[][]; + + const MAX_BLOCK_COUNT = 20; + + beforeAll(async () => { + log = createDebugLogger('aztec:world-state:test:integration'); + rollupAddress = EthAddress.random(); + const db = await NativeWorldStateService.tmp(rollupAddress); + log.info(`Generating ${MAX_BLOCK_COUNT} mock blocks`); + ({ blocks, messages } = await mockBlocks(1, MAX_BLOCK_COUNT, 1, db)); + log.info(`Generated ${blocks.length} mock blocks`); + }); + + beforeEach(async () => { + config = { + dataDirectory: undefined, + l1Contracts: { rollupAddress }, + worldStateBlockCheckIntervalMS: 20, + worldStateProvenBlocksOnly: false, + worldStateBlockRequestBatchSize: 5, + }; + + archiver = new MockPrefilledArchiver(blocks, messages); + + db = (await createWorldState(config)) as NativeWorldStateService; + synchronizer = new TestWorldStateSynchronizer(db, archiver, config); + log.info(`Created synchronizer`); + }); + + afterEach(async () => { + await synchronizer.stop(); + await db.close(); + }); + + const awaitSync = () => sleep(200); + + const expectSynchedBlockHashMatches = async (number: number) => { + const syncedBlockHash = await db.getCommitted().getLeafValue(MerkleTreeId.ARCHIVE, BigInt(number)); + const archiverBlockHash = await archiver.getBlockHeader(number).then(h => h?.hash()); + expect(syncedBlockHash).toEqual(archiverBlockHash); + }; + + const expectSynchedToBlock = async (latest: number, finalized?: number) => { + const tips = await synchronizer.getL2Tips(); + expect(tips.latest).toEqual(latest); + await expectSynchedBlockHashMatches(latest); + + if (finalized !== undefined) { + expect(tips.finalized).toEqual(finalized); + await expectSynchedBlockHashMatches(finalized); + } + }; + + describe('block syncing', () => { + it('performs initial sync from the archiver from genesis', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + }); + + it('syncs new blocks from the archiver from genesis', async () => { + await synchronizer.start(); + archiver.createBlocks(5); + await awaitSync(); + await expectSynchedToBlock(5); + }); + + it('syncs new blocks as they are added to archiver', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + + archiver.createBlocks(3); + await awaitSync(); + await expectSynchedToBlock(8); + }); + + it('syncs new blocks via multiple batches', async () => { + archiver.createBlocks(10); + await synchronizer.start(); + await expectSynchedToBlock(10); + + archiver.createBlocks(10); + await awaitSync(); + await expectSynchedToBlock(20); + }); + + it('syncs from latest block when restarting', async () => { + const getBlocksSpy = jest.spyOn(archiver, 'getBlocks'); + + await synchronizer.start(); + archiver.createBlocks(5); + await awaitSync(); + await expectSynchedToBlock(5); + await synchronizer.stopBlockStream(); + + synchronizer = new TestWorldStateSynchronizer(db, archiver, config); + + archiver.createBlocks(3); + await synchronizer.start(); + await expectSynchedToBlock(8); + + archiver.createBlocks(4); + await awaitSync(); + await expectSynchedToBlock(12); + + expect(getBlocksSpy).toHaveBeenCalledTimes(3); + expect(getBlocksSpy).toHaveBeenCalledWith(1, 5, false); + expect(getBlocksSpy).toHaveBeenCalledWith(6, 3, false); + expect(getBlocksSpy).toHaveBeenCalledWith(9, 4, false); + }); + + it('syncs only proven blocks when instructed', async () => { + synchronizer = new TestWorldStateSynchronizer(db, archiver, { ...config, worldStateProvenBlocksOnly: true }); + + archiver.createBlocks(5); + archiver.setProvenBlockNumber(3); + await synchronizer.start(); + await expectSynchedToBlock(3); + + archiver.setProvenBlockNumber(4); + await awaitSync(); + await expectSynchedToBlock(4); + }); + }); + + describe('reorgs', () => { + it('prunes blocks upon a reorg and resyncs', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + + // Create blocks for an alternate chain forking off block 2 + const { blocks, messages } = await mockBlocks(3, 5, 1, db); + archiver.setPrefilledBlocks(blocks, messages); + + archiver.removeBlocks(3); + archiver.createBlocks(2); + await awaitSync(); + await expectSynchedToBlock(4); + }); + }); + + describe('immediate sync', () => { + beforeEach(() => { + // Set up a synchronizer with a longer block check interval to avoid interference with immediate sync + synchronizer = new TestWorldStateSynchronizer(db, archiver, { ...config, worldStateBlockCheckIntervalMS: 1000 }); + }); + + it('syncs immediately to the latest block', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + + archiver.createBlocks(2); + await expectSynchedToBlock(5); + await synchronizer.syncImmediate(); + await expectSynchedToBlock(7); + }); + + it('syncs immediately to at least the target block', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + + archiver.createBlocks(2); + await expectSynchedToBlock(5); + await synchronizer.syncImmediate(6); + await expectSynchedToBlock(7); + }); + + it('syncs immediately to a past block', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + + archiver.createBlocks(2); + await expectSynchedToBlock(5); + await synchronizer.syncImmediate(4); + await expectSynchedToBlock(5); + }); + + it('fails to sync to unreachable block', async () => { + archiver.createBlocks(5); + await synchronizer.start(); + await expectSynchedToBlock(5); + + archiver.createBlocks(2); + await expectSynchedToBlock(5); + await expect(() => synchronizer.syncImmediate(9)).rejects.toThrow(/unable to sync/i); + }); + }); + + describe('finalized chain', () => { + it('syncs finalized chain tip', async () => { + archiver.createBlocks(5); + archiver.setProvenBlockNumber(3); + + await synchronizer.start(); + await awaitSync(); + await expectSynchedToBlock(5, 3); + + archiver.setProvenBlockNumber(4); + await awaitSync(); + await expectSynchedToBlock(5, 4); + }); + }); +}); + +class TestWorldStateSynchronizer extends ServerWorldStateSynchronizer { + // Skip validation for the sake of this test + protected override verifyMessagesHashToInHash(_l1ToL2Messages: Fr[], _inHash: Buffer): void {} + + // Stops the block stream but not the db so we can reuse it for another synchronizer + public async stopBlockStream() { + await this.blockStream?.stop(); + } +} diff --git a/yarn-project/world-state/src/native/test_util.ts b/yarn-project/world-state/src/test/utils.ts similarity index 85% rename from yarn-project/world-state/src/native/test_util.ts rename to yarn-project/world-state/src/test/utils.ts index 7c7a39c6989..f8b74bcef0a 100644 --- a/yarn-project/world-state/src/native/test_util.ts +++ b/yarn-project/world-state/src/test/utils.ts @@ -19,6 +19,8 @@ import { } from '@aztec/circuits.js'; import { padArrayEnd } from '@aztec/foundation/collection'; +import { type NativeWorldStateService } from '../native/native_world_state.js'; + export async function mockBlock(blockNum: number, size: number, fork: MerkleTreeWriteOperations) { const l2Block = L2Block.random(blockNum, size); const l1ToL2Messages = Array(16).fill(0).map(Fr.random); @@ -80,6 +82,22 @@ export async function mockBlock(blockNum: number, size: number, fork: MerkleTree }; } +export async function mockBlocks(from: number, count: number, numTxs: number, worldState: NativeWorldStateService) { + const tempFork = await worldState.fork(from - 1); + + const blocks = []; + const messagesArray = []; + for (let blockNumber = from; blockNumber < from + count; blockNumber++) { + const { block, messages } = await mockBlock(blockNumber, numTxs, tempFork); + blocks.push(block); + messagesArray.push(messages); + } + + await tempFork.close(); + + return { blocks, messages: messagesArray }; +} + export async function assertSameState(forkA: MerkleTreeReadOperations, forkB: MerkleTreeReadOperations) { const nativeStateRef = await forkA.getStateReference(); const nativeArchive = await forkA.getTreeInfo(MerkleTreeId.ARCHIVE); diff --git a/yarn-project/world-state/src/world-state-db/merkle_tree_db.ts b/yarn-project/world-state/src/world-state-db/merkle_tree_db.ts index 2f3dd68ae8a..9ca9e586f9d 100644 --- a/yarn-project/world-state/src/world-state-db/merkle_tree_db.ts +++ b/yarn-project/world-state/src/world-state-db/merkle_tree_db.ts @@ -57,6 +57,33 @@ export interface MerkleTreeAdminDatabase { */ fork(blockNumber?: number): Promise; + /** + * Removes all historical snapshots up to but not including the given block number + * @param toBlockNumber The block number of the new oldest historical block + * @returns The new WorldStateStatus + */ + removeHistoricalBlocks(toBlockNumber: bigint): Promise; + + /** + * Removes all pending blocks down to but not including the given block number + * @param toBlockNumber The block number of the new tip of the pending chain, + * @returns The new WorldStateStatus + */ + unwindBlocks(toBlockNumber: bigint): Promise; + + /** + * Advances the finalised block number to be the number provided + * @param toBlockNumber The block number that is now the tip of the finalised chain + * @returns The new WorldStateStatus + */ + setFinalised(toBlockNumber: bigint): Promise; + + /** + * Gets the current status of the database. + * @returns The current WorldStateStatus. + */ + getStatus(): Promise; + /** Stops the database */ close(): Promise; } diff --git a/yarn-project/world-state/src/world-state-db/merkle_trees.ts b/yarn-project/world-state/src/world-state-db/merkle_trees.ts index 07349604cac..23923699f27 100644 --- a/yarn-project/world-state/src/world-state-db/merkle_trees.ts +++ b/yarn-project/world-state/src/world-state-db/merkle_trees.ts @@ -197,7 +197,26 @@ export class MerkleTrees implements MerkleTreeAdminDatabase { } } - public async fork(): Promise { + public removeHistoricalBlocks(_toBlockNumber: bigint): Promise { + throw new Error('Method not implemented.'); + } + + public unwindBlocks(_toBlockNumber: bigint): Promise { + throw new Error('Method not implemented.'); + } + + public setFinalised(_toBlockNumber: bigint): Promise { + throw new Error('Method not implemented.'); + } + + public getStatus(): Promise { + throw new Error('Method not implemented.'); + } + + public async fork(blockNumber?: number): Promise { + if (blockNumber) { + throw new Error('Block number forking is not supported in js world state'); + } const [ms, db] = await elapsed(async () => { const forked = await this.store.fork(); return MerkleTrees.new(forked, this.telemetryClient, this.log); diff --git a/yarn-project/world-state/tsconfig.json b/yarn-project/world-state/tsconfig.json index 51c22f31bc5..3a835f4686d 100644 --- a/yarn-project/world-state/tsconfig.json +++ b/yarn-project/world-state/tsconfig.json @@ -26,6 +26,9 @@ }, { "path": "../types" + }, + { + "path": "../archiver" } ], "include": ["src"] diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 63efcfd8210..8fc1be6284e 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -1251,6 +1251,7 @@ __metadata: version: 0.0.0-use.local resolution: "@aztec/world-state@workspace:world-state" dependencies: + "@aztec/archiver": "workspace:^" "@aztec/circuit-types": "workspace:^" "@aztec/circuits.js": "workspace:^" "@aztec/foundation": "workspace:^"