diff --git a/yarn-project/archiver/package.json b/yarn-project/archiver/package.json index 3366d4b3f1b..22579ff402e 100644 --- a/yarn-project/archiver/package.json +++ b/yarn-project/archiver/package.json @@ -22,8 +22,6 @@ "formatting": "run -T prettier --check ./src && run -T eslint ./src", "formatting:fix": "run -T eslint --fix ./src && run -T prettier -w ./src", "test": "NODE_NO_WARNINGS=1 node --experimental-vm-modules ../node_modules/.bin/jest --passWithNoTests", - "start": "node ./dest", - "start:dev": "tsc-watch -p tsconfig.json --onSuccess 'yarn start'", "test:integration": "concurrently -k -s first -c reset,dim -n test,anvil \"yarn test:integration:run\" \"anvil\"", "test:integration:run": "NODE_NO_WARNINGS=1 node --experimental-vm-modules $(yarn bin jest) --no-cache --config jest.integration.config.json" }, diff --git a/yarn-project/archiver/src/archiver/archiver.test.ts b/yarn-project/archiver/src/archiver/archiver.test.ts index bf8621b6e5d..87173270dc5 100644 --- a/yarn-project/archiver/src/archiver/archiver.test.ts +++ b/yarn-project/archiver/src/archiver/archiver.test.ts @@ -56,14 +56,32 @@ describe('Archiver', () => { let archiver: Archiver; let blocks: L2Block[]; + let l2BlockProposedLogs: Log[]; + let l2MessageSentLogs: Log[]; + const GENESIS_ROOT = new Fr(GENESIS_ARCHIVE_ROOT).toString(); beforeEach(() => { now = +new Date(); publicClient = mock>({ + // Return a block with a reasonable timestamp getBlock: ((args: any) => ({ timestamp: args.blockNumber * BigInt(DefaultL1ContractsConfig.ethereumSlotDuration) + BigInt(now), })) as any, + // Return the logs mocked whenever the public client is queried + getLogs: ((args: any) => { + let logs = undefined; + if (args!.event!.name === 'MessageSent') { + logs = l2MessageSentLogs; + } else if (args!.event!.name === 'L2BlockProposed') { + logs = l2BlockProposedLogs; + } else { + throw new Error(`Unknown event: ${args!.event!.name}`); + } + return Promise.resolve( + logs.filter(log => log.blockNumber >= args.fromBlock && log.blockNumber <= args.toBlock), + ); + }) as any, }); instrumentation = mock({ isEnabled: () => true }); @@ -71,12 +89,17 @@ describe('Archiver', () => { archiver = new Archiver( publicClient, - rollupAddress, - inboxAddress, - registryAddress, + { rollupAddress, inboxAddress, registryAddress }, archiverStore, - 1000, + { pollingIntervalMs: 1000, batchSize: 1000 }, instrumentation, + { + l1GenesisTime: BigInt(now), + l1StartBlock: 0n, + epochDuration: 4, + slotDuration: 24, + ethereumSlotDuration: 12, + }, ); blocks = blockNumbers.map(x => L2Block.random(x, txsPerBlock, x + 1, 2)); @@ -97,6 +120,9 @@ describe('Archiver', () => { inboxRead = mock(); ((archiver as any).inbox as any).read = inboxRead; + + l2MessageSentLogs = []; + l2BlockProposedLogs = []; }); afterEach(async () => { @@ -127,27 +153,16 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(2n).mockResolvedValueOnce(6n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(98n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(99n, 1n, 1n), - ], - L2BlockProposed: [makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString())], - }); - - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(2504n, 2n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(2505n, 2n, 1n), - makeMessageSentEventWithIndexInL2BlockSubtree(2505n, 2n, 2n), - makeMessageSentEventWithIndexInL2BlockSubtree(2506n, 3n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(2510n, 2n, blocks[1].archive.root.toString()), - makeL2BlockProposedEvent(2520n, 3n, blocks[2].archive.root.toString()), - ], - }); + makeMessageSentEvent(98n, 1n, 0n); + makeMessageSentEvent(99n, 1n, 1n); + makeL2BlockProposedEvent(101n, 1n, blocks[0].archive.root.toString()); + makeMessageSentEvent(2504n, 2n, 0n); + makeMessageSentEvent(2505n, 2n, 1n); + makeMessageSentEvent(2505n, 2n, 2n); + makeMessageSentEvent(2506n, 3n, 1n); + makeL2BlockProposedEvent(2510n, 2n, blocks[1].archive.root.toString()); + makeL2BlockProposedEvent(2520n, 3n, blocks[2].archive.root.toString()); publicClient.getTransaction.mockResolvedValueOnce(rollupTxs[0]); rollupTxs.slice(1).forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -228,17 +243,11 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(2n).mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - makeL2BlockProposedEvent(90n, 3n, badArchive), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); + makeL2BlockProposedEvent(90n, 3n, badArchive); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -250,8 +259,10 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - const errorMessage = `Archive mismatch matching, ignoring block ${3} with archive: ${badArchive}, expected ${blocks[2].archive.root.toString()}`; - expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + expect(loggerSpy).toHaveBeenCalledWith(expect.stringMatching(/archive root mismatch/i), { + actual: badArchive, + expected: blocks[2].archive.root.toString(), + }); }, 10_000); it('skip event search if no changes found', async () => { @@ -271,16 +282,10 @@ describe('Archiver', () => { inboxRead.totalMessagesInserted.mockResolvedValueOnce(0n).mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -292,10 +297,7 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - - // For some reason, this is 1-indexed. - expect(loggerSpy).toHaveBeenNthCalledWith(1, `Retrieved no new L1 to L2 messages between L1 blocks 1 and 50.`); - expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from 1 to 50`); + expect(loggerSpy).toHaveBeenCalledWith(`No blocks to retrieve from 1 to 50`); }, 10_000); it('handles L2 reorg', async () => { @@ -328,16 +330,10 @@ describe('Archiver', () => { .mockResolvedValueOnce(2n) .mockResolvedValueOnce(2n); - mockGetLogs({ - messageSent: [ - makeMessageSentEventWithIndexInL2BlockSubtree(66n, 1n, 0n), - makeMessageSentEventWithIndexInL2BlockSubtree(68n, 1n, 1n), - ], - L2BlockProposed: [ - makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()), - makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()), - ], - }); + makeMessageSentEvent(66n, 1n, 0n); + makeMessageSentEvent(68n, 1n, 1n); + makeL2BlockProposedEvent(70n, 1n, blocks[0].archive.root.toString()); + makeL2BlockProposedEvent(80n, 2n, blocks[1].archive.root.toString()); rollupTxs.forEach(tx => publicClient.getTransaction.mockResolvedValueOnce(tx)); @@ -350,14 +346,12 @@ describe('Archiver', () => { latestBlockNum = await archiver.getBlockNumber(); expect(latestBlockNum).toEqual(numL2BlocksInTest); - // For some reason, this is 1-indexed. - expect(loggerSpy).toHaveBeenNthCalledWith(1, `Retrieved no new L1 to L2 messages between L1 blocks 1 and 50.`); - expect(loggerSpy).toHaveBeenNthCalledWith(2, `No blocks to retrieve from 1 to 50`); + expect(loggerSpy).toHaveBeenCalledWith(`No blocks to retrieve from 1 to 50`); // Lets take a look to see if we can find re-org stuff! await sleep(1000); - expect(loggerSpy).toHaveBeenNthCalledWith(9, `L2 prune has been detected.`); + expect(loggerSpy).toHaveBeenCalledWith(`L2 prune has been detected.`); // Should also see the block number be reduced latestBlockNum = await archiver.getBlockNumber(); @@ -375,57 +369,40 @@ describe('Archiver', () => { // TODO(palla/reorg): Add a unit test for the archiver handleEpochPrune xit('handles an upcoming L2 prune', () => {}); - // logs should be created in order of how archiver syncs. - const mockGetLogs = (logs: { - messageSent?: ReturnType[]; - L2BlockProposed?: ReturnType[]; - }) => { - if (logs.messageSent) { - publicClient.getLogs.mockResolvedValueOnce(logs.messageSent); - } - if (logs.L2BlockProposed) { - publicClient.getLogs.mockResolvedValueOnce(logs.L2BlockProposed); - } + /** + * Makes a fake L2BlockProposed event for testing purposes and registers it to be returned by the public client. + * @param l1BlockNum - L1 block number. + * @param l2BlockNum - L2 Block number. + */ + const makeL2BlockProposedEvent = (l1BlockNum: bigint, l2BlockNum: bigint, archive: `0x${string}`) => { + const log = { + blockNumber: l1BlockNum, + args: { blockNumber: l2BlockNum, archive }, + transactionHash: `0x${l2BlockNum}`, + } as Log; + l2BlockProposedLogs.push(log); }; -}); - -/** - * Makes a fake L2BlockProposed event for testing purposes. - * @param l1BlockNum - L1 block number. - * @param l2BlockNum - L2 Block number. - * @returns An L2BlockProposed event log. - */ -function makeL2BlockProposedEvent(l1BlockNum: bigint, l2BlockNum: bigint, archive: `0x${string}`) { - return { - blockNumber: l1BlockNum, - args: { blockNumber: l2BlockNum, archive }, - transactionHash: `0x${l2BlockNum}`, - } as Log; -} -/** - * Makes fake L1ToL2 MessageSent events for testing purposes. - * @param l1BlockNum - L1 block number. - * @param l2BlockNumber - The L2 block number for which the message was included. - * @param indexInSubtree - the index in the l2Block's subtree in the L1 to L2 Messages Tree. - * @returns MessageSent event logs. - */ -function makeMessageSentEventWithIndexInL2BlockSubtree( - l1BlockNum: bigint, - l2BlockNumber: bigint, - indexInSubtree: bigint, -) { - const index = indexInSubtree + InboxLeaf.smallestIndexFromL2Block(l2BlockNumber); - return { - blockNumber: l1BlockNum, - args: { - l2BlockNumber, - index, - hash: Fr.random().toString(), - }, - transactionHash: `0x${l1BlockNum}`, - } as Log; -} + /** + * Makes fake L1ToL2 MessageSent events for testing purposes and registers it to be returned by the public client. + * @param l1BlockNum - L1 block number. + * @param l2BlockNumber - The L2 block number for which the message was included. + * @param indexInSubtree - the index in the l2Block's subtree in the L1 to L2 Messages Tree. + */ + const makeMessageSentEvent = (l1BlockNum: bigint, l2BlockNumber: bigint, indexInSubtree: bigint) => { + const index = indexInSubtree + InboxLeaf.smallestIndexFromL2Block(l2BlockNumber); + const log = { + blockNumber: l1BlockNum, + args: { + l2BlockNumber, + index, + hash: Fr.random().toString(), + }, + transactionHash: `0x${l1BlockNum}`, + } as Log; + l2MessageSentLogs.push(log); + }; +}); /** * Makes a fake rollup tx for testing purposes. diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 831295da0be..83510020b72 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -38,7 +38,7 @@ import { Fr } from '@aztec/foundation/fields'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { count } from '@aztec/foundation/string'; -import { Timer } from '@aztec/foundation/timer'; +import { elapsed } from '@aztec/foundation/timer'; import { InboxAbi, RollupAbi } from '@aztec/l1-artifacts'; import { ContractClassRegisteredEvent, @@ -61,7 +61,7 @@ import { import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js'; import { type ArchiverConfig } from './config.js'; -import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js'; +import { retrieveBlocksFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js'; import { getEpochNumberAtTimestamp, getSlotAtTimestamp, @@ -112,25 +112,23 @@ export class Archiver implements ArchiveSource { */ constructor( private readonly publicClient: PublicClient, - private readonly rollupAddress: EthAddress, - readonly inboxAddress: EthAddress, - private readonly registryAddress: EthAddress, + private readonly l1Addresses: { rollupAddress: EthAddress; inboxAddress: EthAddress; registryAddress: EthAddress }, readonly dataStore: ArchiverDataStore, - private readonly pollingIntervalMs: number, + private readonly config: { pollingIntervalMs: number; batchSize: number }, private readonly instrumentation: ArchiverInstrumentation, - private readonly l1constants: L1RollupConstants = EmptyL1RollupConstants, + private readonly l1constants: L1RollupConstants, private readonly log: DebugLogger = createDebugLogger('aztec:archiver'), ) { this.store = new ArchiverStoreHelper(dataStore); this.rollup = getContract({ - address: rollupAddress.toString(), + address: l1Addresses.rollupAddress.toString(), abi: RollupAbi, client: publicClient, }); this.inbox = getContract({ - address: inboxAddress.toString(), + address: l1Addresses.inboxAddress.toString(), abi: InboxAbi, client: publicClient, }); @@ -171,11 +169,12 @@ export class Archiver implements ArchiveSource { const archiver = new Archiver( publicClient, - config.l1Contracts.rollupAddress, - config.l1Contracts.inboxAddress, - config.l1Contracts.registryAddress, + config.l1Contracts, archiverStore, - config.archiverPollingIntervalMS ?? 10_000, + { + pollingIntervalMs: config.archiverPollingIntervalMS ?? 10_000, + batchSize: config.archiverBatchSize ?? 100, + }, new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()), { l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration }, ); @@ -196,7 +195,7 @@ export class Archiver implements ArchiveSource { await this.sync(blockUntilSynced); } - this.runningPromise = new RunningPromise(() => this.safeSync(), this.pollingIntervalMs); + this.runningPromise = new RunningPromise(() => this.safeSync(), this.config.pollingIntervalMs); this.runningPromise.start(); } @@ -213,9 +212,8 @@ export class Archiver implements ArchiveSource { /** * Fetches logs from L1 contracts and processes them. - * @param blockUntilSynced - If true, blocks until the archiver has fully synced. */ - private async sync(blockUntilSynced: boolean) { + private async sync(initialRun: boolean) { /** * We keep track of three "pointers" to L1 blocks: * 1. the last L1 block that published an L2 block @@ -232,9 +230,9 @@ export class Archiver implements ArchiveSource { const { blocksSynchedTo = l1StartBlock, messagesSynchedTo = l1StartBlock } = await this.store.getSynchPoint(); const currentL1BlockNumber = await this.publicClient.getBlockNumber(); - if (blockUntilSynced) { + if (initialRun) { this.log.info( - `Starting archiver sync to rollup contract ${this.rollupAddress.toString()} from L1 block ${Math.min( + `Starting archiver sync to rollup contract ${this.l1Addresses.rollupAddress.toString()} from L1 block ${Math.min( Number(blocksSynchedTo), Number(messagesSynchedTo), )} to current L1 block ${currentL1BlockNumber}`, @@ -261,7 +259,7 @@ export class Archiver implements ArchiveSource { */ // ********** Events that are processed per L1 block ********** - await this.handleL1ToL2Messages(blockUntilSynced, messagesSynchedTo, currentL1BlockNumber); + await this.handleL1ToL2Messages(messagesSynchedTo, currentL1BlockNumber); // Store latest l1 block number and timestamp seen. Used for epoch and slots calculations. if (!this.l1BlockNumber || this.l1BlockNumber < currentL1BlockNumber) { @@ -272,7 +270,7 @@ export class Archiver implements ArchiveSource { // ********** Events that are processed per L2 block ********** if (currentL1BlockNumber > blocksSynchedTo) { // First we retrieve new L2 blocks - const { provenBlockNumber } = await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber); + const { provenBlockNumber } = await this.handleL2blocks(blocksSynchedTo, currentL1BlockNumber); // And then we prune the current epoch if it'd reorg on next submission. // Note that we don't do this before retrieving L2 blocks because we may need to retrieve // blocks from more than 2 epochs ago, so we want to make sure we have the latest view of @@ -281,7 +279,7 @@ export class Archiver implements ArchiveSource { await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber); } - if (blockUntilSynced) { + if (initialRun) { this.log.info(`Initial archiver sync to L1 block ${currentL1BlockNumber} complete.`); } } @@ -311,11 +309,18 @@ export class Archiver implements ArchiveSource { } } - private async handleL1ToL2Messages( - blockUntilSynced: boolean, - messagesSynchedTo: bigint, - currentL1BlockNumber: bigint, - ) { + private nextRange(end: bigint, limit: bigint): [bigint, bigint] { + const batchSize = (this.config.batchSize * this.l1constants.slotDuration) / this.l1constants.ethereumSlotDuration; + const nextStart = end + 1n; + const nextEnd = nextStart + BigInt(batchSize); + if (nextEnd > limit) { + return [nextStart, limit]; + } + return [nextStart, nextEnd]; + } + + private async handleL1ToL2Messages(messagesSynchedTo: bigint, currentL1BlockNumber: bigint) { + this.log.trace(`Handling L1 to L2 messages from ${messagesSynchedTo} to ${currentL1BlockNumber}.`); if (currentL1BlockNumber <= messagesSynchedTo) { return; } @@ -325,30 +330,30 @@ export class Archiver implements ArchiveSource { if (localTotalMessageCount === destinationTotalMessageCount) { await this.store.setMessageSynchedL1BlockNumber(currentL1BlockNumber); - this.log.debug( + this.log.trace( `Retrieved no new L1 to L2 messages between L1 blocks ${messagesSynchedTo + 1n} and ${currentL1BlockNumber}.`, ); return; } - const retrievedL1ToL2Messages = await retrieveL1ToL2Messages( - this.inbox, - blockUntilSynced, - messagesSynchedTo + 1n, - currentL1BlockNumber, - ); - - await this.store.addL1ToL2Messages(retrievedL1ToL2Messages); - - this.log.verbose( - `Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${ - messagesSynchedTo + 1n - } and ${currentL1BlockNumber}.`, - ); + // Retrieve messages in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks, + let searchStartBlock: bigint = messagesSynchedTo; + let searchEndBlock: bigint = messagesSynchedTo; + do { + [searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber); + this.log.trace(`Retrieving L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`); + const retrievedL1ToL2Messages = await retrieveL1ToL2Messages(this.inbox, searchStartBlock, searchEndBlock); + this.log.verbose( + `Retrieved ${retrievedL1ToL2Messages.retrievedData.length} new L1 to L2 messages between L1 blocks ${searchStartBlock} and ${searchEndBlock}.`, + ); + await this.store.addL1ToL2Messages(retrievedL1ToL2Messages); + for (const msg of retrievedL1ToL2Messages.retrievedData) { + this.log.debug(`Downloaded L1 to L2 message`, { leaf: msg.leaf.toString(), index: msg.index }); + } + } while (searchEndBlock < currentL1BlockNumber); } private async handleL2blocks( - blockUntilSynced: boolean, blocksSynchedTo: bigint, currentL1BlockNumber: bigint, ): Promise<{ provenBlockNumber: bigint }> { @@ -371,7 +376,10 @@ export class Archiver implements ArchiveSource { await this.store.setProvenL2BlockNumber(Number(provenBlockNumber)); // if we are here then we must have a valid proven epoch number await this.store.setProvenL2EpochNumber(Number(provenEpochNumber)); - this.log.info(`Updated proven chain`, { provenBlockNumber, provenEpochNumber }); + this.log.info(`Updated proven chain to block ${provenBlockNumber} (epoch ${provenEpochNumber})`, { + provenBlockNumber, + provenEpochNumber, + }); } this.instrumentation.updateLastProvenBlock(Number(provenBlockNumber)); }; @@ -436,56 +444,60 @@ export class Archiver implements ArchiveSource { } } - // TODO(palla/log) Downgrade to trace - this.log.debug(`Retrieving L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`); - const retrievedBlocks = await retrieveBlockFromRollup( - this.rollup, - this.publicClient, - blockUntilSynced, - blocksSynchedTo + 1n, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier - currentL1BlockNumber, - this.log, - ); + // Retrieve L2 blocks in batches. Each batch is estimated to acommodate up to L2 'blockBatchSize' blocks, + // computed using the L2 block time vs the L1 block time. + let searchStartBlock: bigint = blocksSynchedTo; + let searchEndBlock: bigint = blocksSynchedTo; + + do { + [searchStartBlock, searchEndBlock] = this.nextRange(searchEndBlock, currentL1BlockNumber); + + this.log.trace(`Retrieving L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`); + const retrievedBlocks = await retrieveBlocksFromRollup( + this.rollup, + this.publicClient, + searchStartBlock, // TODO(palla/reorg): If the L2 reorg was due to an L1 reorg, we need to start search earlier + searchEndBlock, + this.log, + ); - if (retrievedBlocks.length === 0) { - // We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura. - // See further details in earlier comments. - // TODO(palla/log) Downgrade to trace - this.log.debug(`Retrieved no new L2 blocks from L1 block ${blocksSynchedTo + 1n} to ${currentL1BlockNumber}`); - return { provenBlockNumber }; - } + if (retrievedBlocks.length === 0) { + // We are not calling `setBlockSynchedL1BlockNumber` because it may cause sync issues if based off infura. + // See further details in earlier comments. + this.log.trace(`Retrieved no new L2 blocks from L1 block ${searchStartBlock} to ${searchEndBlock}`); + continue; + } - const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber; - this.log.debug( - `Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${ - blocksSynchedTo + 1n - } and ${currentL1BlockNumber} with last processed L1 block ${lastProcessedL1BlockNumber}.`, - ); + const lastProcessedL1BlockNumber = retrievedBlocks[retrievedBlocks.length - 1].l1.blockNumber; + this.log.debug( + `Retrieved ${retrievedBlocks.length} new L2 blocks between L1 blocks ${searchStartBlock} and ${searchEndBlock} with last processed L1 block ${lastProcessedL1BlockNumber}.`, + ); - for (const block of retrievedBlocks) { - this.log.debug(`Ingesting new L2 block ${block.data.number}`, { - ...block.data.header.globalVariables.toInspect(), - blockHash: block.data.hash, - l1BlockNumber: block.l1.blockNumber, - }); - } + for (const block of retrievedBlocks) { + this.log.debug(`Ingesting new L2 block ${block.data.number} with ${block.data.body.txEffects.length} txs`, { + blockHash: block.data.hash(), + l1BlockNumber: block.l1.blockNumber, + ...block.data.header.globalVariables.toInspect(), + ...block.data.getStats(), + }); + } - const timer = new Timer(); - await this.store.addBlocks(retrievedBlocks); + const [processDuration] = await elapsed(() => this.store.addBlocks(retrievedBlocks)); + this.instrumentation.processNewBlocks( + processDuration / retrievedBlocks.length, + retrievedBlocks.map(b => b.data), + ); - for (const block of retrievedBlocks) { - this.log.info(`Downloaded L2 block ${block.data.number}`, { - blockHash: block.data.hash(), - blockNumber: block.data.number, - }); - } + for (const block of retrievedBlocks) { + this.log.info(`Downloaded L2 block ${block.data.number}`, { + blockHash: block.data.hash(), + blockNumber: block.data.number, + }); + } + } while (searchEndBlock < currentL1BlockNumber); // Important that we update AFTER inserting the blocks. await updateProvenBlock(); - this.instrumentation.processNewBlocks( - timer.ms() / retrievedBlocks.length, - retrievedBlocks.map(b => b.data), - ); return { provenBlockNumber }; } @@ -503,11 +515,11 @@ export class Archiver implements ArchiveSource { } public getRollupAddress(): Promise { - return Promise.resolve(this.rollupAddress); + return Promise.resolve(this.l1Addresses.rollupAddress); } public getRegistryAddress(): Promise { - return Promise.resolve(this.registryAddress); + return Promise.resolve(this.l1Addresses.registryAddress); } public getL1BlockNumber(): bigint { @@ -1096,11 +1108,3 @@ type L1RollupConstants = { epochDuration: number; ethereumSlotDuration: number; }; - -const EmptyL1RollupConstants: L1RollupConstants = { - l1StartBlock: 0n, - l1GenesisTime: 0n, - epochDuration: 0, - slotDuration: 0, - ethereumSlotDuration: 0, -}; diff --git a/yarn-project/archiver/src/archiver/config.ts b/yarn-project/archiver/src/archiver/config.ts index 6aa953cd087..d739314d468 100644 --- a/yarn-project/archiver/src/archiver/config.ts +++ b/yarn-project/archiver/src/archiver/config.ts @@ -18,24 +18,19 @@ import { type ConfigMappingsType, getConfigFromMappings, numberConfigHelper } fr * The archiver configuration. */ export type ArchiverConfig = { - /** - * URL for an archiver service. If set, will return an archiver client as opposed to starting a new one. - */ + /** URL for an archiver service. If set, will return an archiver client as opposed to starting a new one. */ archiverUrl?: string; - /** - * The polling interval in ms for retrieving new L2 blocks and encrypted logs. - */ + /** The polling interval in ms for retrieving new L2 blocks and encrypted logs. */ archiverPollingIntervalMS?: number; - /** - * The polling interval viem uses in ms - */ + /** The number of L2 blocks the archiver will attempt to download at a time. */ + archiverBatchSize?: number; + + /** The polling interval viem uses in ms */ viemPollingIntervalMS?: number; - /** - * The deployed L1 contract addresses - */ + /** The deployed L1 contract addresses */ l1Contracts: L1ContractAddresses; /** The max number of logs that can be obtained in 1 "getUnencryptedLogs" call. */ @@ -54,6 +49,11 @@ export const archiverConfigMappings: ConfigMappingsType = { description: 'The polling interval in ms for retrieving new L2 blocks and encrypted logs.', ...numberConfigHelper(1_000), }, + archiverBatchSize: { + env: 'ARCHIVER_BATCH_SIZE', + description: 'The number of L2 blocks the archiver will attempt to download at a time.', + ...numberConfigHelper(100), + }, maxLogs: { env: 'ARCHIVER_MAX_LOGS', description: 'The max number of logs that can be obtained in 1 "getUnencryptedLogs" call.', diff --git a/yarn-project/archiver/src/archiver/data_retrieval.ts b/yarn-project/archiver/src/archiver/data_retrieval.ts index 3249a5fc541..de09a98ac23 100644 --- a/yarn-project/archiver/src/archiver/data_retrieval.ts +++ b/yarn-project/archiver/src/archiver/data_retrieval.ts @@ -1,5 +1,6 @@ import { Body, InboxLeaf, L2Block } from '@aztec/circuit-types'; import { AppendOnlyTreeSnapshot, Fr, Header, Proof } from '@aztec/circuits.js'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { type EthAddress } from '@aztec/foundation/eth-address'; import { type ViemSignature } from '@aztec/foundation/eth-signature'; import { type DebugLogger, createDebugLogger } from '@aztec/foundation/log'; @@ -25,16 +26,14 @@ import { type L1Published, type L1PublishedData } from './structs/published.js'; * Fetches new L2 blocks. * @param publicClient - The viem public client to use for transaction retrieval. * @param rollupAddress - The address of the rollup contract. - * @param blockUntilSynced - If true, blocks until the archiver has fully synced. * @param searchStartBlock - The block number to use for starting the search. * @param searchEndBlock - The highest block number that we should search up to. * @param expectedNextL2BlockNum - The next L2 block number that we expect to find. * @returns An array of block; as well as the next eth block to search from. */ -export async function retrieveBlockFromRollup( +export async function retrieveBlocksFromRollup( rollup: GetContractReturnType>, publicClient: PublicClient, - blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, logger: DebugLogger = createDebugLogger('aztec:archiver'), @@ -58,13 +57,13 @@ export async function retrieveBlockFromRollup( const lastLog = l2BlockProposedLogs[l2BlockProposedLogs.length - 1]; logger.debug( - `Got L2 block processed logs for ${l2BlockProposedLogs[0].blockNumber}-${lastLog.blockNumber} between ${searchStartBlock}-${searchEndBlock} L1 blocks`, + `Got ${l2BlockProposedLogs.length} L2 block processed logs for L2 blocks ${l2BlockProposedLogs[0].args.blockNumber}-${lastLog.args.blockNumber} between L1 blocks ${searchStartBlock}-${searchEndBlock}`, ); const newBlocks = await processL2BlockProposedLogs(rollup, publicClient, l2BlockProposedLogs, logger); retrievedBlocks.push(...newBlocks); searchStartBlock = lastLog.blockNumber! + 1n; - } while (blockUntilSynced && searchStartBlock <= searchEndBlock); + } while (searchStartBlock <= searchEndBlock); return retrievedBlocks; } @@ -82,14 +81,13 @@ export async function processL2BlockProposedLogs( logger: DebugLogger, ): Promise[]> { const retrievedBlocks: L1Published[] = []; - for (const log of logs) { + await asyncPool(10, logs, async log => { const l2BlockNumber = log.args.blockNumber!; const archive = log.args.archive!; const archiveFromChain = await rollup.read.archiveAt([l2BlockNumber]); // The value from the event and contract will match only if the block is in the chain. if (archive === archiveFromChain) { - // TODO: Fetch blocks from calldata in parallel const block = await getBlockFromRollupTx(publicClient, log.transactionHash!, l2BlockNumber); const l1: L1PublishedData = { @@ -100,11 +98,12 @@ export async function processL2BlockProposedLogs( retrievedBlocks.push({ data: block, l1 }); } else { - logger.warn( - `Archive mismatch matching, ignoring block ${l2BlockNumber} with archive: ${archive}, expected ${archiveFromChain}`, - ); + logger.warn(`Ignoring L2 block ${l2BlockNumber} due to archive root mismatch`, { + actual: archive, + expected: archiveFromChain, + }); } - } + }); return retrievedBlocks; } @@ -129,10 +128,7 @@ async function getBlockFromRollupTx( l2BlockNum: bigint, ): Promise { const { input: data } = await publicClient.getTransaction({ hash: txHash }); - const { functionName, args } = decodeFunctionData({ - abi: RollupAbi, - data, - }); + const { functionName, args } = decodeFunctionData({ abi: RollupAbi, data }); const allowedMethods = ['propose', 'proposeAndClaim']; @@ -184,7 +180,6 @@ async function getBlockFromRollupTx( */ export async function retrieveL1ToL2Messages( inbox: GetContractReturnType>, - blockUntilSynced: boolean, searchStartBlock: bigint, searchEndBlock: bigint, ): Promise> { @@ -213,7 +208,7 @@ export async function retrieveL1ToL2Messages( // handles the case when there are no new messages: searchStartBlock = (messageSentLogs.findLast(msgLog => !!msgLog)?.blockNumber || searchStartBlock) + 1n; - } while (blockUntilSynced && searchStartBlock <= searchEndBlock); + } while (searchStartBlock <= searchEndBlock); return { lastProcessedL1BlockNumber: searchStartBlock - 1n, retrievedData: retrievedL1ToL2Messages }; } diff --git a/yarn-project/archiver/src/index.ts b/yarn-project/archiver/src/index.ts index 24112863fc1..4aa32e6d591 100644 --- a/yarn-project/archiver/src/index.ts +++ b/yarn-project/archiver/src/index.ts @@ -1,62 +1,8 @@ -import { jsonStringify } from '@aztec/foundation/json-rpc'; -import { createDebugLogger } from '@aztec/foundation/log'; -import { fileURLToPath } from '@aztec/foundation/url'; -import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; - -import { createPublicClient, http } from 'viem'; -import { localhost } from 'viem/chains'; - -import { Archiver, getArchiverConfigFromEnv } from './archiver/index.js'; -import { ArchiverInstrumentation } from './archiver/instrumentation.js'; -import { MemoryArchiverStore } from './archiver/memory_archiver_store/memory_archiver_store.js'; - export * from './archiver/index.js'; -export * from './rpc/index.js'; export * from './factory.js'; +export * from './rpc/index.js'; -export { retrieveL2ProofVerifiedEvents, retrieveBlockFromRollup } from './archiver/data_retrieval.js'; - -const log = createDebugLogger('aztec:archiver'); - -/** - * A function which instantiates and starts Archiver. - */ -// eslint-disable-next-line require-await -async function main() { - const config = getArchiverConfigFromEnv(); - const { l1RpcUrl: rpcUrl, l1Contracts } = config; - - log.info(`Starting archiver in main(): ${jsonStringify(config)}`); - const publicClient = createPublicClient({ - chain: localhost, - transport: http(rpcUrl), - }); - - const archiverStore = new MemoryArchiverStore(1000); - - const archiver = new Archiver( - publicClient, - l1Contracts.rollupAddress, - l1Contracts.inboxAddress, - l1Contracts.registryAddress, - archiverStore, - 1000, - new ArchiverInstrumentation(new NoopTelemetryClient()), - ); - - const shutdown = async () => { - await archiver.stop(); - process.exit(0); - }; - process.once('SIGINT', shutdown); - process.once('SIGTERM', shutdown); -} - -// See https://twitter.com/Rich_Harris/status/1355289863130673153 -if (process.argv[1] === fileURLToPath(import.meta.url).replace(/\/index\.js$/, '')) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - main().catch(err => { - log.error(err); - process.exit(1); - }); -} +export { + retrieveBlocksFromRollup as retrieveBlockFromRollup, + retrieveL2ProofVerifiedEvents, +} from './archiver/data_retrieval.js'; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index b12e7c3ea78..0d36c4cd300 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -8,6 +8,7 @@ export type EnvVar = | 'ARCHIVER_POLLING_INTERVAL_MS' | 'ARCHIVER_URL' | 'ARCHIVER_VIEM_POLLING_INTERVAL_MS' + | 'ARCHIVER_BATCH_SIZE' | 'ASSUME_PROVEN_THROUGH_BLOCK_NUMBER' | 'AZTEC_NODE_URL' | 'AZTEC_PORT'