From c8120a45e477bcfc4d052c4ea6abdcd0e9173326 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Oct 2023 09:06:40 +0100 Subject: [PATCH] fix: catch up synchronizer with node --- .../end-to-end/src/pxe_sandbox.test.ts | 6 +- .../pxe/src/pxe_service/create_pxe_service.ts | 4 +- .../pxe/src/pxe_service/pxe_service.ts | 21 ++++-- .../src/pxe_service/test/pxe_service.test.ts | 26 ++++++-- .../src/pxe_service/test/pxe_test_suite.ts | 11 +++- .../pxe/src/synchronizer/synchronizer.test.ts | 66 ++++++++++++++----- .../pxe/src/synchronizer/synchronizer.ts | 34 ++++++---- 7 files changed, 118 insertions(+), 50 deletions(-) diff --git a/yarn-project/end-to-end/src/pxe_sandbox.test.ts b/yarn-project/end-to-end/src/pxe_sandbox.test.ts index c39f0edfd142..ea0813b6a490 100644 --- a/yarn-project/end-to-end/src/pxe_sandbox.test.ts +++ b/yarn-project/end-to-end/src/pxe_sandbox.test.ts @@ -6,7 +6,11 @@ const { PXE_URL = 'http://localhost:8080' } = process.env; const setup = async () => { const pxe = createPXEClient(PXE_URL); await waitForSandbox(pxe); - return pxe; + return { + pxe, + // don't stop remote PXE at the end of the test + teardown: async () => {}, + }; }; pxeTestSuite('pxe_sandbox', setup); diff --git a/yarn-project/pxe/src/pxe_service/create_pxe_service.ts b/yarn-project/pxe/src/pxe_service/create_pxe_service.ts index 4fc15e58f6f6..32d878166b92 100644 --- a/yarn-project/pxe/src/pxe_service/create_pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/create_pxe_service.ts @@ -46,7 +46,5 @@ export async function createPXEService( keyStore = keyStore || new TestKeyStore(await Grumpkin.new()); db = db || new MemoryDB(logSuffix); - const server = new PXEService(keyStore, aztecNode, db, config, logSuffix); - await server.start(); - return server; + return PXEService.new(keyStore, aztecNode, db, config, logSuffix); } diff --git a/yarn-project/pxe/src/pxe_service/pxe_service.ts b/yarn-project/pxe/src/pxe_service/pxe_service.ts index e911469fe8fb..f5a1e27ba08e 100644 --- a/yarn-project/pxe/src/pxe_service/pxe_service.ts +++ b/yarn-project/pxe/src/pxe_service/pxe_service.ts @@ -71,7 +71,7 @@ export class PXEService implements PXE { private log: DebugLogger; private sandboxVersion: string; - constructor( + private constructor( private keyStore: KeyStore, private node: AztecNode, private db: Database, @@ -86,12 +86,19 @@ export class PXEService implements PXE { this.sandboxVersion = getPackageInfo().version; } - /** - * Starts the PXE Service by beginning the synchronisation process between the Aztec node and the database. - * - * @returns A promise that resolves when the server has started successfully. - */ - public async start() { + public static async new( + keyStore: KeyStore, + node: AztecNode, + db: Database, + config: PXEServiceConfig, + logSuffix?: string, + ) { + const pxe = new PXEService(keyStore, node, db, config, logSuffix); + await pxe.#start(); + return pxe; + } + + async #start() { const { l2BlockPollingIntervalMS, l2StartingBlock } = this.config; await this.synchronizer.start(l2StartingBlock, 1, l2BlockPollingIntervalMS); const info = await this.getNodeInfo(); diff --git a/yarn-project/pxe/src/pxe_service/test/pxe_service.test.ts b/yarn-project/pxe/src/pxe_service/test/pxe_service.test.ts index a980b0efa187..5c26d6987c5c 100644 --- a/yarn-project/pxe/src/pxe_service/test/pxe_service.test.ts +++ b/yarn-project/pxe/src/pxe_service/test/pxe_service.test.ts @@ -1,23 +1,25 @@ +import { HistoricBlockData } from '@aztec/circuits.js'; import { Grumpkin } from '@aztec/circuits.js/barretenberg'; import { L1ContractAddresses } from '@aztec/ethereum'; import { EthAddress } from '@aztec/foundation/eth-address'; import { TestKeyStore } from '@aztec/key-store'; -import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Tx, PXE, mockTx } from '@aztec/types'; +import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Tx, mockTx } from '@aztec/types'; -import { MockProxy, mock } from 'jest-mock-extended'; +import { MockProxy, mock, mockFn } from 'jest-mock-extended'; import { MemoryDB } from '../../database/memory_db.js'; import { PXEServiceConfig } from '../../index.js'; import { PXEService } from '../pxe_service.js'; import { pxeTestSuite } from './pxe_test_suite.js'; -async function createPXEService(): Promise { +async function createPXEService() { const keyStore = new TestKeyStore(await Grumpkin.new()); const node = mock(); const db = new MemoryDB(); const config: PXEServiceConfig = { l2BlockPollingIntervalMS: 100, l2StartingBlock: INITIAL_L2_BLOCK_NUM }; // Setup the relevant mocks + node.getHistoricBlockData.mockResolvedValue(HistoricBlockData.empty()); node.getBlockNumber.mockResolvedValue(2); node.getVersion.mockResolvedValue(1); node.getChainId.mockResolvedValue(1); @@ -31,7 +33,11 @@ async function createPXEService(): Promise { }; node.getL1ContractAddresses.mockResolvedValue(mockedContracts); - return new PXEService(keyStore, node, db, config); + const pxe = await PXEService.new(keyStore, node, db, config); + return { + pxe, + teardown: () => pxe.stop(), + }; } pxeTestSuite('PXEService', createPXEService); @@ -44,7 +50,9 @@ describe('PXEService', () => { beforeEach(async () => { keyStore = new TestKeyStore(await Grumpkin.new()); - node = mock(); + node = mock({ + getHistoricBlockData: mockFn().mockResolvedValue(HistoricBlockData.empty()), + }); db = new MemoryDB(); config = { l2BlockPollingIntervalMS: 100, l2StartingBlock: INITIAL_L2_BLOCK_NUM }; }); @@ -55,7 +63,11 @@ describe('PXEService', () => { node.getTx.mockResolvedValue(settledTx); - const rpc = new PXEService(keyStore, node, db, config); - await expect(rpc.sendTx(duplicateTx)).rejects.toThrowError(/A settled tx with equal hash/); + const pxe = await PXEService.new(keyStore, node, db, config); + try { + await expect(pxe.sendTx(duplicateTx)).rejects.toThrowError(/A settled tx with equal hash/); + } finally { + await pxe.stop(); + } }); }); diff --git a/yarn-project/pxe/src/pxe_service/test/pxe_test_suite.ts b/yarn-project/pxe/src/pxe_service/test/pxe_test_suite.ts index cbb772b36faa..8c04cec21313 100644 --- a/yarn-project/pxe/src/pxe_service/test/pxe_test_suite.ts +++ b/yarn-project/pxe/src/pxe_service/test/pxe_test_suite.ts @@ -3,14 +3,21 @@ import { Grumpkin } from '@aztec/circuits.js/barretenberg'; import { ConstantKeyPair } from '@aztec/key-store'; import { DeployedContract, INITIAL_L2_BLOCK_NUM, PXE, TxExecutionRequest, randomDeployedContract } from '@aztec/types'; -export const pxeTestSuite = (testName: string, pxeSetup: () => Promise) => { +export const pxeTestSuite = ( + testName: string, + // eslint-disable-next-line jsdoc/require-jsdoc + pxeSetup: () => Promise<{ pxe: PXE; teardown: () => Promise }>, +) => { describe(testName, () => { let pxe: PXE; + let teardown: () => Promise; beforeAll(async () => { - pxe = await pxeSetup(); + ({ pxe, teardown } = await pxeSetup()); }, 120_000); + afterAll(() => teardown()); + it('registers an account and returns it as an account only and not as a recipient', async () => { const keyPair = ConstantKeyPair.random(await Grumpkin.new()); const completeAddress = await CompleteAddress.fromPrivateKeyAndPartialAddress( diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts index d4e90b53aad8..7dcbc819671b 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.test.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.test.ts @@ -1,7 +1,7 @@ import { CompleteAddress, Fr, GrumpkinScalar, HistoricBlockData } from '@aztec/circuits.js'; import { Grumpkin } from '@aztec/circuits.js/barretenberg'; import { TestKeyStore } from '@aztec/key-store'; -import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Block, MerkleTreeId } from '@aztec/types'; +import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Block, LogType, MerkleTreeId } from '@aztec/types'; import { MockProxy, mock } from 'jest-mock-extended'; import omit from 'lodash.omit'; @@ -15,9 +15,32 @@ describe('Synchronizer', () => { let synchronizer: TestSynchronizer; let roots: Record; let blockData: HistoricBlockData; + let block: L2Block; beforeEach(() => { - blockData = HistoricBlockData.random(); + // start on a block height of 3 + block = L2Block.random(3, 4); + + aztecNode = mock(); + aztecNode.getBlockNumber.mockResolvedValue(block.number); + aztecNode.getBlocks.mockResolvedValue([block]); + aztecNode.getLogs.mockImplementation((_1, _2, type) => { + return Promise.resolve([type === LogType.ENCRYPTED ? block.newEncryptedLogs! : block.newUnencryptedLogs!]); + }); + + blockData = new HistoricBlockData( + block.endPrivateDataTreeSnapshot.root, + block.endNullifierTreeSnapshot.root, + block.endContractTreeSnapshot.root, + block.endL1ToL2MessagesTreeSnapshot.root, + block.endHistoricBlocksTreeSnapshot.root, + Fr.random(), + block.endPublicDataTreeRoot, + Fr.random(), + ); + + aztecNode.getHistoricBlockData.mockResolvedValue(blockData); + roots = { [MerkleTreeId.CONTRACT_TREE]: blockData.contractTreeRoot, [MerkleTreeId.PRIVATE_DATA_TREE]: blockData.privateDataTreeRoot, @@ -27,25 +50,23 @@ describe('Synchronizer', () => { [MerkleTreeId.BLOCKS_TREE]: blockData.blocksTreeRoot, }; - aztecNode = mock(); database = new MemoryDB(); synchronizer = new TestSynchronizer(aztecNode, database); }); it('sets tree roots from aztec node on initial sync', async () => { - aztecNode.getBlockNumber.mockResolvedValue(3); - aztecNode.getHistoricBlockData.mockResolvedValue(blockData); + await synchronizer.initialSync(4); + expect(database.getTreeRoots()).toEqual(roots); + }); - await synchronizer.initialSync(); + it('catches up with node on initial sync', async () => { + await synchronizer.initialSync(3); - expect(database.getTreeRoots()).toEqual(roots); + const roots = database.getTreeRoots(); + expect(roots[MerkleTreeId.CONTRACT_TREE]).toEqual(block.endContractTreeSnapshot.root); }); it('sets tree roots from latest block', async () => { - const block = L2Block.random(1, 4); - aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]); - aztecNode.getLogs.mockResolvedValueOnce([block.newEncryptedLogs!]).mockResolvedValue([block.newUnencryptedLogs!]); - await synchronizer.work(); const roots = database.getTreeRoots(); @@ -53,11 +74,24 @@ describe('Synchronizer', () => { }); it('overrides tree roots from initial sync once current block number is larger', async () => { - // Initial sync is done on block with height 3 - aztecNode.getBlockNumber.mockResolvedValue(3); + // set up some random historical data + blockData = HistoricBlockData.random(); aztecNode.getHistoricBlockData.mockResolvedValue(blockData); - await synchronizer.initialSync(); + roots = { + [MerkleTreeId.CONTRACT_TREE]: blockData.contractTreeRoot, + [MerkleTreeId.PRIVATE_DATA_TREE]: blockData.privateDataTreeRoot, + [MerkleTreeId.NULLIFIER_TREE]: blockData.nullifierTreeRoot, + [MerkleTreeId.PUBLIC_DATA_TREE]: blockData.publicDataTreeRoot, + [MerkleTreeId.L1_TO_L2_MESSAGES_TREE]: blockData.l1ToL2MessagesTreeRoot, + [MerkleTreeId.BLOCKS_TREE]: blockData.blocksTreeRoot, + }; + + // take current state of node and sync from block 4 onwards + aztecNode.getBlock.mockResolvedValue(undefined); + aztecNode.getBlockNumber.mockResolvedValue(3); + await synchronizer.initialSync(4); + const roots0 = database.getTreeRoots(); expect(roots0[MerkleTreeId.CONTRACT_TREE]).toEqual(roots[MerkleTreeId.CONTRACT_TREE]); @@ -122,8 +156,8 @@ class TestSynchronizer extends Synchronizer { return super.work(); } - public initialSync(): Promise { - return super.initialSync(); + public initialSync(from: number, limit: number = 1): Promise { + return super.initialSync(from, limit); } public workNoteProcessorCatchUp(): Promise { diff --git a/yarn-project/pxe/src/synchronizer/synchronizer.ts b/yarn-project/pxe/src/synchronizer/synchronizer.ts index 8c01efc4fda8..f8ff069cb6ec 100644 --- a/yarn-project/pxe/src/synchronizer/synchronizer.ts +++ b/yarn-project/pxe/src/synchronizer/synchronizer.ts @@ -45,9 +45,8 @@ export class Synchronizer { if (from < this.synchedToBlock + 1) { throw new Error(`From block ${from} is smaller than the currently synched block ${this.synchedToBlock}`); } - this.synchedToBlock = from - 1; - await this.initialSync(); + await this.initialSync(from, limit); const run = async () => { while (this.running) { @@ -56,7 +55,9 @@ export class Synchronizer { await this.workNoteProcessorCatchUp(limit, retryInterval); } else { // No note processor needs to catch up. We continue with the normal flow. - await this.work(limit, retryInterval); + if (!(await this.work(limit))) { + await this.interruptableSleep.sleep(retryInterval); + } } } }; @@ -65,36 +66,40 @@ export class Synchronizer { this.log('Started'); } - protected async initialSync() { + protected async initialSync(from: number, limit: number) { const [blockNumber, historicBlockData] = await Promise.all([ this.node.getBlockNumber(), this.node.getHistoricBlockData(), ]); this.initialSyncBlockNumber = blockNumber; - this.synchedToBlock = this.initialSyncBlockNumber; await this.db.setHistoricBlockData(historicBlockData); + + this.synchedToBlock = from - 1; + while (this.synchedToBlock < this.initialSyncBlockNumber) { + // once we have caught up, don't sleep at all + if (!(await this.work(limit))) { + throw new Error("Initial sync didn't reach the latest block"); + } + } } - protected async work(limit = 1, retryInterval = 1000): Promise { + protected async work(limit = 1): Promise { const from = this.synchedToBlock + 1; try { let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED); if (!encryptedLogs.length) { - await this.interruptableSleep.sleep(retryInterval); - return; + return false; } let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED); if (!unencryptedLogs.length) { - await this.interruptableSleep.sleep(retryInterval); - return; + return false; } // Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks. const blocks = await this.node.getBlocks(from, encryptedLogs.length); if (!blocks.length) { - await this.interruptableSleep.sleep(retryInterval); - return; + return false; } if (blocks.length !== encryptedLogs.length) { @@ -127,9 +132,10 @@ export class Synchronizer { } this.synchedToBlock = latestBlock.block.number; + return true; } catch (err) { - this.log.error(`Error in synchronizer work`, err); - await this.interruptableSleep.sleep(retryInterval); + this.log.error(`Error in synchronizer work`, (err as Error).stack); + return false; } }