diff --git a/yarn-project/aztec-rpc/src/aztec_rpc_server/aztec_rpc_server.ts b/yarn-project/aztec-rpc/src/aztec_rpc_server/aztec_rpc_server.ts index 4ca63682994..a12ec1654fa 100644 --- a/yarn-project/aztec-rpc/src/aztec_rpc_server/aztec_rpc_server.ts +++ b/yarn-project/aztec-rpc/src/aztec_rpc_server/aztec_rpc_server.ts @@ -22,6 +22,7 @@ import { ContractPublicData, DeployedContract, FunctionCall, + INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockL2Logs, LogType, @@ -69,7 +70,7 @@ export class AztecRPCServer implements AztecRPC { * @returns A promise that resolves when the server has started successfully. */ public async start() { - await this.synchroniser.start(1, 1, this.config.l2BlockPollingIntervalMS); + await this.synchroniser.start(INITIAL_L2_BLOCK_NUM, 1, this.config.l2BlockPollingIntervalMS); const info = await this.getNodeInfo(); this.log.info(`Started RPC server connected to chain ${info.chainId} version ${info.version}`); } diff --git a/yarn-project/aztec-rpc/src/synchroniser/synchroniser.test.ts b/yarn-project/aztec-rpc/src/synchroniser/synchroniser.test.ts index eab46862b80..249ae57b0bb 100644 --- a/yarn-project/aztec-rpc/src/synchroniser/synchroniser.test.ts +++ b/yarn-project/aztec-rpc/src/synchroniser/synchroniser.test.ts @@ -1,4 +1,6 @@ -import { Fr } from '@aztec/circuits.js'; +import { AztecAddress, Fr, PrivateKey } from '@aztec/circuits.js'; +import { Grumpkin } from '@aztec/circuits.js/barretenberg'; +import { TestKeyStore } from '@aztec/key-store'; import { AztecNode, L2Block, MerkleTreeId } from '@aztec/types'; import { MockProxy, mock } from 'jest-mock-extended'; @@ -83,6 +85,37 @@ describe('Synchroniser', () => { expect(roots5[MerkleTreeId.CONTRACT_TREE]).not.toEqual(roots[MerkleTreeId.CONTRACT_TREE]); expect(roots5[MerkleTreeId.CONTRACT_TREE]).toEqual(block5.endContractTreeSnapshot.root); }); + + it('note processor successfully catches up', async () => { + const block = L2Block.random(1, 4); + + // getBlocks is called by both synchroniser.work and synchroniser.workNoteProcessorCatchUp + aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]); + aztecNode.getLogs + .mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchroniser.work + .mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchroniser.work + .mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchroniser.workNoteProcessorCatchUp + + // Sync the synchroniser so that note processor has something to catch up to + await synchroniser.work(); + + // Used in synchroniser.isAccountSynchronised + aztecNode.getBlockHeight.mockResolvedValueOnce(1); + + // Manually adding account to database so that we can call synchroniser.isAccountSynchronised + const keyStore = new TestKeyStore(await Grumpkin.new()); + keyStore.addAccount(PrivateKey.random()); + const pubKey = (await keyStore.getAccounts())[0]; + const address = AztecAddress.random(); + await database.addPublicKeyAndPartialAddress(address, pubKey, new Fr(0)); + + // Add the account which will add the note processor to the synchroniser + synchroniser.addAccount(pubKey, keyStore); + + await synchroniser.workNoteProcessorCatchUp(); + + expect(await synchroniser.isAccountSynchronised(address)).toBe(true); + }); }); class TestSynchroniser extends Synchroniser { @@ -93,4 +126,8 @@ class TestSynchroniser extends Synchroniser { public initialSync(): Promise { return super.initialSync(); } + + public workNoteProcessorCatchUp(): Promise { + return super.workNoteProcessorCatchUp(); + } } diff --git a/yarn-project/aztec-rpc/src/synchroniser/synchroniser.ts b/yarn-project/aztec-rpc/src/synchroniser/synchroniser.ts index dfd4c64dae8..9e10c49cc14 100644 --- a/yarn-project/aztec-rpc/src/synchroniser/synchroniser.ts +++ b/yarn-project/aztec-rpc/src/synchroniser/synchroniser.ts @@ -1,7 +1,7 @@ import { AztecAddress, Fr, PublicKey } from '@aztec/circuits.js'; import { DebugLogger, createDebugLogger } from '@aztec/foundation/log'; import { InterruptableSleep } from '@aztec/foundation/sleep'; -import { AztecNode, KeyStore, L2BlockContext, LogType, MerkleTreeId } from '@aztec/types'; +import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockContext, LogType, MerkleTreeId } from '@aztec/types'; import { Database, TxDao } from '../database/index.js'; import { NoteProcessor } from '../note_processor/index.js'; @@ -21,6 +21,7 @@ export class Synchroniser { private initialSyncBlockHeight = 0; private synchedToBlock = 0; private log: DebugLogger; + private noteProcessorsToCatchUp: NoteProcessor[] = []; constructor(private node: AztecNode, private db: Database, logSuffix = '') { this.log = createDebugLogger( @@ -37,15 +38,26 @@ export class Synchroniser { * @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration. * @param retryInterval - The time interval (in ms) to wait before retrying if no data is available. */ - public async start(from = 1, limit = 1, retryInterval = 1000) { + public async start(from = INITIAL_L2_BLOCK_NUM, limit = 1, retryInterval = 1000) { if (this.running) return; this.running = true; + if (from < this.synchedToBlock + 1) { + throw new Error(`From block ${from} is smaller than the currently synched block ${this.synchedToBlock}`); + } + this.synchedToBlock = from - 1; + await this.initialSync(); const run = async () => { while (this.running) { - from = await this.work(from, limit, retryInterval); + if (this.noteProcessorsToCatchUp.length > 0) { + // There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor. + await this.workNoteProcessorCatchUp(limit, retryInterval); + } else { + // No note processor needs to catch up. We continue with the normal flow. + await this.work(limit, retryInterval); + } } }; @@ -63,25 +75,26 @@ export class Synchroniser { await this.db.setTreeRoots(treeRoots); } - protected async work(from = 1, limit = 1, retryInterval = 1000): Promise { + protected async work(limit = 1, retryInterval = 1000): Promise { + const from = this.synchedToBlock + 1; try { let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED); if (!encryptedLogs.length) { await this.interruptableSleep.sleep(retryInterval); - return from; + return; } let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED); if (!unencryptedLogs.length) { await this.interruptableSleep.sleep(retryInterval); - return from; + return; } // Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks. const blocks = await this.node.getBlocks(from, encryptedLogs.length); if (!blocks.length) { await this.interruptableSleep.sleep(retryInterval); - return from; + return; } if (blocks.length !== encryptedLogs.length) { @@ -116,13 +129,57 @@ export class Synchroniser { await this.updateBlockInfoInBlockTxs(blockContexts); - from += encryptedLogs.length; this.synchedToBlock = latestBlock.block.number; - return from; } catch (err) { this.log(err); await this.interruptableSleep.sleep(retryInterval); - return from; + } + } + + protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise { + const noteProcessor = this.noteProcessorsToCatchUp[0]; + const from = noteProcessor.status.syncedToBlock + 1; + // Ensuring that the note processor does not sync further than the main sync. + limit = Math.min(limit, this.synchedToBlock - from + 1); + + if (limit < 1) { + throw new Error(`Unexpected limit ${limit} for note processor catch up`); + } + + try { + let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED); + if (!encryptedLogs.length) { + // This should never happen because this function should only be called when the note processor is lagging + // behind main sync. + throw new Error('No encrypted logs in processor catch up mode'); + } + + // Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks. + const blocks = await this.node.getBlocks(from, encryptedLogs.length); + if (!blocks.length) { + // This should never happen because this function should only be called when the note processor is lagging + // behind main sync. + throw new Error('No blocks in processor catch up mode'); + } + + if (blocks.length !== encryptedLogs.length) { + // "Trim" the encrypted logs to match the number of blocks. + encryptedLogs = encryptedLogs.slice(0, blocks.length); + } + + const blockContexts = blocks.map(block => new L2BlockContext(block)); + + this.log(`Forwarding ${encryptedLogs.length} encrypted logs and blocks to note processor in catch up mode`); + await noteProcessor.process(blockContexts, encryptedLogs); + + if (noteProcessor.status.syncedToBlock === this.synchedToBlock) { + // Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`. + this.noteProcessorsToCatchUp.shift(); + this.noteProcessors.push(noteProcessor); + } + } catch (err) { + this.log(err); + await this.interruptableSleep.sleep(retryInterval); } } @@ -172,7 +229,15 @@ export class Synchroniser { if (processor) { return; } - this.noteProcessors.push(new NoteProcessor(publicKey, keyStore, this.db, this.node)); + + const noteProcessor = new NoteProcessor(publicKey, keyStore, this.db, this.node); + if (this.synchedToBlock === 0) { + // The main sync thread was never started before and for this reason the synchroniser does not have to catch up + this.noteProcessors.push(noteProcessor); + } else { + // The main sync thread was started before and for this reason the synchroniser has to catch up + this.noteProcessorsToCatchUp.push(noteProcessor); + } } /** diff --git a/yarn-project/types/src/logs/function_l2_logs.ts b/yarn-project/types/src/logs/function_l2_logs.ts index 1dc179626c6..24e1f1a41fd 100644 --- a/yarn-project/types/src/logs/function_l2_logs.ts +++ b/yarn-project/types/src/logs/function_l2_logs.ts @@ -1,4 +1,5 @@ import { sha256 } from '@aztec/foundation/crypto'; +import { Point } from '@aztec/foundation/fields'; import { BufferReader, serializeBufferToVector } from '@aztec/foundation/serialize'; import { randomBytes } from 'crypto'; @@ -69,7 +70,9 @@ export class FunctionL2Logs { public static random(numLogs: number): FunctionL2Logs { const logs: Buffer[] = []; for (let i = 0; i < numLogs; i++) { - logs.push(randomBytes(144)); + const randomEphPubKey = Point.random(); + const randomLogContent = randomBytes(144 - Point.SIZE_IN_BYTES); + logs.push(Buffer.concat([randomLogContent, randomEphPubKey.toBuffer()])); } return new FunctionL2Logs(logs); }