Skip to content

Commit

Permalink
refactor: pxe synch from last known state
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Dec 8, 2023
1 parent 746b54b commit 7de4ec0
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 112 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node';
import { Fr, GrumpkinScalar, INITIAL_L2_BLOCK_NUM, elapsed, sleep } from '@aztec/aztec.js';
import { Fr, GrumpkinScalar, elapsed, sleep } from '@aztec/aztec.js';
import { BenchmarkingContract } from '@aztec/noir-contracts/types';
import { SequencerClient } from '@aztec/sequencer-client';
import {
Expand Down Expand Up @@ -70,9 +70,8 @@ describe('benchmarks/process_history', () => {
} satisfies NodeSyncedChainHistoryStats);

// Create a new pxe and measure how much time it takes it to sync with failed and successful decryption
// Skip the first two blocks used for setup (create account contract and deploy benchmarking contract)
context.logger(`Starting new pxe`);
const pxe = await waitNewPXESynced(node, contract, INITIAL_L2_BLOCK_NUM + SETUP_BLOCK_COUNT);
const pxe = await waitNewPXESynced(node, contract);

// Register the owner account and wait until it's synced so we measure how much time it took
context.logger(`Registering owner account on new pxe`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('benchmarks/publish_rollup', () => {
sequencer.restart();

// Wait for the last tx to be processed and stop the current node
const { blockNumber } = await sentTxs[sentTxs.length - 1].wait({ timeout: 5 * 60_000 });
await sentTxs[sentTxs.length - 1].wait({ timeout: 5 * 60_000 });
await context.teardown();

// Create a new aztec node to measure sync time of the block
Expand All @@ -40,7 +40,7 @@ describe('benchmarks/publish_rollup', () => {

// Spin up a new pxe and sync it, we'll use it to test sync times of new accounts for the last block
context.logger(`Starting new pxe`);
const pxe = await waitNewPXESynced(node, contract, blockNumber! - 1);
const pxe = await waitNewPXESynced(node, contract);

// Register the owner account and wait until it's synced so we measure how much time it took
context.logger(`Registering owner account on new pxe`);
Expand Down
21 changes: 3 additions & 18 deletions yarn-project/end-to-end/src/benchmarks/utils.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
import { AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node';
import {
AztecNode,
BatchCall,
GrumpkinScalar,
INITIAL_L2_BLOCK_NUM,
PXE,
PartialAddress,
SentTx,
retryUntil,
sleep,
} from '@aztec/aztec.js';
import { AztecNode, BatchCall, GrumpkinScalar, PXE, PartialAddress, SentTx, retryUntil, sleep } from '@aztec/aztec.js';
import { BenchmarkingContract } from '@aztec/noir-contracts/types';
import { PXEService, createPXEService } from '@aztec/pxe';

Expand Down Expand Up @@ -102,15 +92,10 @@ export async function sendTxs(
* Creates a new PXE and awaits until it's synced with the node.
* @param node - Node to connect the pxe to.
* @param contract - Benchmark contract to add to the pxe.
* @param startingBlock - First l2 block to process.
* @returns The new PXE.
*/
export async function waitNewPXESynced(
node: AztecNode,
contract: BenchmarkingContract,
startingBlock: number = INITIAL_L2_BLOCK_NUM,
): Promise<PXEService> {
const pxe = await createPXEService(node, { l2BlockPollingIntervalMS: 100, l2StartingBlock: startingBlock });
export async function waitNewPXESynced(node: AztecNode, contract: BenchmarkingContract): Promise<PXEService> {
const pxe = await createPXEService(node, { l2BlockPollingIntervalMS: 100 });
await pxe.addContracts([contract]);
await retryUntil(() => pxe.isGlobalStateSynchronized(), 'pxe-global-sync');
return pxe;
Expand Down
8 changes: 7 additions & 1 deletion yarn-project/kv-store/src/lmdb/store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EthAddress } from '@aztec/foundation/eth-address';
import { createDebugLogger } from '@aztec/foundation/log';

import { Database, Key, RootDatabase, open } from 'lmdb';

Expand All @@ -18,6 +19,7 @@ export class AztecLmdbStore implements AztecKVStore {
#data: Database<unknown, Key>;
#multiMapData: Database<unknown, Key>;
#rollupAddress: AztecSingleton<string>;
#log = createDebugLogger('aztec:kv-store:lmdb');

constructor(rootDb: RootDatabase) {
this.#rootDb = rootDb;
Expand Down Expand Up @@ -110,7 +112,11 @@ export class AztecLmdbStore implements AztecKVStore {
const rollupAddressString = rollupAddress.toString();

if (typeof storedRollupAddress === 'string' && rollupAddressString !== storedRollupAddress) {
await this.#rootDb.clearAsync();
this.#log.warn(
`Rollup address mismatch stored=${storedRollupAddress} deployed=${rollupAddressString}. Clearing database`,
);
await this.#data.clearAsync();
await this.#multiMapData.clearAsync();
}

await this.#rollupAddress.set(rollupAddressString);
Expand Down
7 changes: 1 addition & 6 deletions yarn-project/pxe/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { INITIAL_L2_BLOCK_NUM } from '@aztec/types';

import { readFileSync } from 'fs';
import { dirname, resolve } from 'path';
import { fileURLToPath } from 'url';
Expand All @@ -10,8 +8,6 @@ import { fileURLToPath } from 'url';
export interface PXEServiceConfig {
/** The interval to wait between polling for new blocks. */
l2BlockPollingIntervalMS: number;
/** L2 block to start scanning from */
l2StartingBlock: number;

/** Where to store PXE data. If not set will store in memory */
dataDirectory?: string;
Expand All @@ -21,11 +17,10 @@ export interface PXEServiceConfig {
* Creates an instance of PXEServiceConfig out of environment variables using sensible defaults for integration testing if not set.
*/
export function getPXEServiceConfig(): PXEServiceConfig {
const { PXE_BLOCK_POLLING_INTERVAL_MS, PXE_L2_STARTING_BLOCK, DATA_DIRECTORY } = process.env;
const { PXE_BLOCK_POLLING_INTERVAL_MS, DATA_DIRECTORY } = process.env;

return {
l2BlockPollingIntervalMS: PXE_BLOCK_POLLING_INTERVAL_MS ? +PXE_BLOCK_POLLING_INTERVAL_MS : 1000,
l2StartingBlock: PXE_L2_STARTING_BLOCK ? +PXE_L2_STARTING_BLOCK : INITIAL_L2_BLOCK_NUM,
dataDirectory: DATA_DIRECTORY,
};
}
Expand Down
27 changes: 22 additions & 5 deletions yarn-project/pxe/src/database/kv_pxe_database.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { AztecAddress, BlockHeader, CompleteAddress } from '@aztec/circuits.js';
import { Fr } from '@aztec/foundation/fields';
import { Fr, Point } from '@aztec/foundation/fields';
import { AztecArray, AztecKVStore, AztecMap, AztecMultiMap, AztecSingleton } from '@aztec/kv-store';
import { ContractDao, MerkleTreeId, NoteFilter, PublicKey } from '@aztec/types';
import { ContractDao, INITIAL_L2_BLOCK_NUM, MerkleTreeId, NoteFilter, PublicKey } from '@aztec/types';

import { NoteDao } from './note_dao.js';
import { PxeDatabase } from './pxe_database.js';

/** Serialized structure of a block header */
type SerializedBlockHeader = {
type SynchronizedBlock = {
/** The tree roots when the block was created */
roots: Record<MerkleTreeId, string>;
/** The hash of the global variables */
globalVariablesHash: string;
/** The block number */
blockNumber: number;
};

/**
* A PXE database backed by LMDB.
*/
export class KVPxeDatabase implements PxeDatabase {
#blockHeader: AztecSingleton<SerializedBlockHeader>;
#blockHeader: AztecSingleton<SynchronizedBlock>;
#addresses: AztecMap<string, Buffer>;
#authWitnesses: AztecMap<string, Buffer[]>;
#capsules: AztecArray<Buffer[]>;
Expand All @@ -29,6 +31,7 @@ export class KVPxeDatabase implements PxeDatabase {
#notesByStorageSlot: AztecMultiMap<string, number>;
#notesByTxHash: AztecMultiMap<string, number>;
#notesByOwner: AztecMultiMap<string, number>;
#syncedBlockPerPublicKey: AztecMap<string, number>;
#db: AztecKVStore;

constructor(db: AztecKVStore) {
Expand All @@ -40,6 +43,7 @@ export class KVPxeDatabase implements PxeDatabase {
this.#contracts = db.createMap('contracts');
this.#notes = db.createArray('notes');
this.#nullifiedNotes = db.createMap('nullified_notes');
this.#syncedBlockPerPublicKey = db.createMap('synced_block_per_public_key');

this.#notesByContract = db.createMultiMap('notes_by_contract');
this.#notesByStorageSlot = db.createMultiMap('notes_by_storage_slot');
Expand Down Expand Up @@ -183,8 +187,9 @@ export class KVPxeDatabase implements PxeDatabase {
};
}

async setBlockHeader(blockHeader: BlockHeader): Promise<void> {
async setSynchronizedBlock(blockNumber: number, blockHeader: BlockHeader): Promise<void> {
await this.#blockHeader.set({
blockNumber,
globalVariablesHash: blockHeader.globalVariablesHash.toString(),
roots: {
[MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot.toString(),
Expand Down Expand Up @@ -217,6 +222,10 @@ export class KVPxeDatabase implements PxeDatabase {
return blockHeader;
}

getBlockNumber(): number {
return this.#blockHeader.get()?.blockNumber ?? INITIAL_L2_BLOCK_NUM - 1;
}

addCompleteAddress(completeAddress: CompleteAddress): Promise<boolean> {
return this.#addresses.setIfNotExists(completeAddress.address.toString(), completeAddress.toBuffer());
}
Expand All @@ -230,6 +239,14 @@ export class KVPxeDatabase implements PxeDatabase {
return Promise.resolve(Array.from(this.#addresses.values()).map(v => CompleteAddress.fromBuffer(v)));
}

getSynchedBlockNumberForPublicKey(publicKey: Point): number {
return this.#syncedBlockPerPublicKey.get(publicKey.toString()) ?? INITIAL_L2_BLOCK_NUM - 1;
}

setSynchedBlockNumberForPublicKey(publicKey: Point, blockNumber: number): Promise<boolean> {
return this.#syncedBlockPerPublicKey.set(publicKey.toString(), blockNumber);
}

estimateSize(): number {
// TODO
return 0;
Expand Down
21 changes: 18 additions & 3 deletions yarn-project/pxe/src/database/memory_db.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { BlockHeader, CompleteAddress, PublicKey } from '@aztec/circuits.js';
import { AztecAddress } from '@aztec/foundation/aztec-address';
import { Fr } from '@aztec/foundation/fields';
import { Fr, Point } from '@aztec/foundation/fields';
import { createDebugLogger } from '@aztec/foundation/log';
import { MerkleTreeId, NoteFilter } from '@aztec/types';
import { INITIAL_L2_BLOCK_NUM, MerkleTreeId, NoteFilter } from '@aztec/types';

import { MemoryContractDatabase } from '../contract_database/index.js';
import { NoteDao } from './note_dao.js';
Expand All @@ -18,8 +18,10 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase {
private notesTable: NoteDao[] = [];
private treeRoots: Record<MerkleTreeId, Fr> | undefined;
private globalVariablesHash: Fr | undefined;
private blockNumber: number | undefined;
private addresses: CompleteAddress[] = [];
private authWitnesses: Record<string, Fr[]> = {};
private syncedBlockPerPublicKey = new Map<string, number>();
// A capsule is a "blob" of data that is passed to the contract through an oracle.
// We are using a stack to keep track of the capsules that are passed to the contract.
private capsuleStack: Fr[][] = [];
Expand Down Expand Up @@ -134,7 +136,7 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase {
);
}

public setBlockHeader(blockHeader: BlockHeader): Promise<void> {
public setSynchronizedBlock(blockNumber: number, blockHeader: BlockHeader): Promise<void> {
this.globalVariablesHash = blockHeader.globalVariablesHash;
this.setTreeRoots({
[MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot,
Expand All @@ -148,6 +150,10 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase {
return Promise.resolve();
}

public getBlockNumber(): number {
return this.blockNumber ?? INITIAL_L2_BLOCK_NUM - 1;
}

public addCompleteAddress(completeAddress: CompleteAddress): Promise<boolean> {
const accountIndex = this.addresses.findIndex(r => r.address.equals(completeAddress.address));
if (accountIndex !== -1) {
Expand All @@ -174,6 +180,15 @@ export class MemoryDB extends MemoryContractDatabase implements PxeDatabase {
return Promise.resolve(this.addresses);
}

getSynchedBlockNumberForPublicKey(publicKey: Point): number {
return this.syncedBlockPerPublicKey.get(publicKey.toString()) ?? INITIAL_L2_BLOCK_NUM - 1;
}

setSynchedBlockNumberForPublicKey(publicKey: Point, blockNumber: number): Promise<boolean> {
this.syncedBlockPerPublicKey.set(publicKey.toString(), blockNumber);
return Promise.resolve(true);
}

public estimateSize() {
const notesSize = this.notesTable.reduce((sum, note) => sum + note.getSize(), 0);
const treeRootsSize = this.treeRoots ? Object.entries(this.treeRoots).length * Fr.SIZE_IN_BYTES : 0;
Expand Down
21 changes: 20 additions & 1 deletion yarn-project/pxe/src/database/pxe_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ export interface PxeDatabase extends ContractDatabase {
*/
getTreeRoots(): Record<MerkleTreeId, Fr>;

/**
* Gets the most recently processed block number.
*/
getBlockNumber(): number;

/**
* Retrieve the stored Block Header from the database.
* The function returns a Promise that resolves to the Block Header.
Expand All @@ -94,10 +99,11 @@ export interface PxeDatabase extends ContractDatabase {
* This function updates the 'global variables hash' and `tree roots` property of the instance
* Note that this will overwrite any existing hash or roots in the database.
*
* @param blockNumber - The block number of the most recent block
* @param blockHeader - An object containing the most recent block header.
* @returns A Promise that resolves when the hash has been successfully updated in the database.
*/
setBlockHeader(blockHeader: BlockHeader): Promise<void>;
setSynchronizedBlock(blockNumber: number, blockHeader: BlockHeader): Promise<void>;

/**
* Adds complete address to the database.
Expand All @@ -121,6 +127,19 @@ export interface PxeDatabase extends ContractDatabase {
*/
getCompleteAddresses(): Promise<CompleteAddress[]>;

/**
* Updates up to which block number we have processed notes for a given public key.
* @param publicKey - The public key to set the synched block number for.
* @param blockNumber - The block number to set.
*/
setSynchedBlockNumberForPublicKey(publicKey: PublicKey, blockNumber: number): Promise<boolean>;

/**
* Get the synched block number for a given public key.
* @param publicKey - The public key to get the synched block number for.
*/
getSynchedBlockNumberForPublicKey(publicKey: PublicKey): number;

/**
* Returns the estimated size in bytes of this db.
* @returns The estimated size in bytes of this db.
Expand Down
6 changes: 3 additions & 3 deletions yarn-project/pxe/src/database/pxe_database_test_suite.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AztecAddress, BlockHeader, CompleteAddress } from '@aztec/circuits.js';
import { Fr, Point } from '@aztec/foundation/fields';
import { MerkleTreeId, NoteFilter, randomTxHash } from '@aztec/types';
import { INITIAL_L2_BLOCK_NUM, MerkleTreeId, NoteFilter, randomTxHash } from '@aztec/types';

import { NoteDao } from './note_dao.js';
import { randomNoteDao } from './note_dao.test.js';
Expand Down Expand Up @@ -155,13 +155,13 @@ export function describePxeDatabase(getDatabase: () => PxeDatabase) {
const blockHeader = BlockHeader.random();
blockHeader.privateKernelVkTreeRoot = Fr.zero();

await database.setBlockHeader(blockHeader);
await database.setSynchronizedBlock(INITIAL_L2_BLOCK_NUM, blockHeader);
expect(database.getBlockHeader()).toEqual(blockHeader);
});

it('retrieves the merkle tree roots from the block', async () => {
const blockHeader = BlockHeader.random();
await database.setBlockHeader(blockHeader);
await database.setSynchronizedBlock(INITIAL_L2_BLOCK_NUM, blockHeader);
expect(database.getTreeRoots()).toEqual({
[MerkleTreeId.NOTE_HASH_TREE]: blockHeader.noteHashTreeRoot,
[MerkleTreeId.NULLIFIER_TREE]: blockHeader.nullifierTreeRoot,
Expand Down
10 changes: 1 addition & 9 deletions yarn-project/pxe/src/note_processor/note_processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { ConstantKeyPair } from '@aztec/key-store';
import {
AztecNode,
FunctionL2Logs,
INITIAL_L2_BLOCK_NUM,
KeyPair,
KeyStore,
L1NotePayload,
Expand Down Expand Up @@ -122,14 +121,7 @@ describe('Note Processor', () => {
keyStore = mock<KeyStore>();
simulator = mock<AcirSimulator>();
keyStore.getAccountPrivateKey.mockResolvedValue(owner.getPrivateKey());
noteProcessor = new NoteProcessor(
owner.getPublicKey(),
keyStore,
database,
aztecNode,
INITIAL_L2_BLOCK_NUM,
simulator,
);
noteProcessor = new NoteProcessor(owner.getPublicKey(), keyStore, database, aztecNode, simulator);

simulator.computeNoteHashAndNullifier.mockImplementation((...args) =>
Promise.resolve({
Expand Down
Loading

0 comments on commit 7de4ec0

Please sign in to comment.