Skip to content

Commit

Permalink
feat: serialize synchronize and simulateTx calls by the pxe via Seria…
Browse files Browse the repository at this point in the history
…lQueue (AztecProtocol#3817)

Serialize calls by the PXE to:
- synchronize with the aztec node
- simulate transactions
by use of a SerialQueue

Synchronization is greedy, meaning it will sync as much as possible
while its job is processing.
  • Loading branch information
just-mitch authored Jan 4, 2024
1 parent 30e47a0 commit e893675
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 46 deletions.
4 changes: 2 additions & 2 deletions yarn-project/foundation/src/fifo/memory_fifo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ export class MemoryFifo<T> {

/**
* Process items from the queue using a provided handler function.
* The function iterates over items in the queue, invoking the handler for each item until the queue is empty or flushing.
* The function iterates over items in the queue, invoking the handler for each item until the queue is empty and flushing.
* If the handler throws an error, it will be caught and logged as 'Queue handler exception:', but the iteration will continue.
* The process function returns a promise that resolves when there are no more items in the queue or the queue is flushing.
* The process function returns a promise that resolves when there are no more items in the queue and the queue is flushing.
*
* @param handler - A function that takes an item of type T and returns a Promise<void> after processing the item.
* @returns A Promise<void> that resolves when the queue is finished processing.
Expand Down
63 changes: 42 additions & 21 deletions yarn-project/pxe/src/pxe_service/pxe_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import {
PublicCallRequest,
} from '@aztec/circuits.js';
import { computeCommitmentNonce, siloNullifier } from '@aztec/circuits.js/abis';
import { encodeArguments } from '@aztec/foundation/abi';
import { DecodedReturn, encodeArguments } from '@aztec/foundation/abi';
import { padArrayEnd } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/fields';
import { SerialQueue } from '@aztec/foundation/fifo';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { NoirWasmVersion } from '@aztec/noir-compiler/versions';
import {
Expand Down Expand Up @@ -70,6 +71,9 @@ export class PXEService implements PXE {
private simulator: AcirSimulator;
private log: DebugLogger;
private sandboxVersion: string;
// serialize synchronizer and calls to simulateTx.
// ensures that state is not changed while simulating
private jobQueue = new SerialQueue();

constructor(
private keyStore: KeyStore,
Expand All @@ -79,7 +83,7 @@ export class PXEService implements PXE {
logSuffix?: string,
) {
this.log = createDebugLogger(logSuffix ? `aztec:pxe_service_${logSuffix}` : `aztec:pxe_service`);
this.synchronizer = new Synchronizer(node, db, logSuffix);
this.synchronizer = new Synchronizer(node, db, this.jobQueue, logSuffix);
this.contractDataOracle = new ContractDataOracle(db, node);
this.simulator = getAcirSimulator(db, node, keyStore, this.contractDataOracle);

Expand All @@ -93,7 +97,11 @@ export class PXEService implements PXE {
*/
public async start() {
const { l2BlockPollingIntervalMS } = this.config;
await this.synchronizer.start(1, l2BlockPollingIntervalMS);
this.synchronizer.start(1, l2BlockPollingIntervalMS);
this.jobQueue.start();
this.log.info('Started Job Queue');
await this.jobQueue.syncPoint();
this.log.info('Synced Job Queue');
await this.restoreNoteProcessors();
const info = await this.getNodeInfo();
this.log.info(`Started PXE connected to chain ${info.chainId} version ${info.protocolVersion}`);
Expand Down Expand Up @@ -121,8 +129,10 @@ export class PXEService implements PXE {
* @returns A Promise resolving once the server has been stopped successfully.
*/
public async stop() {
await this.jobQueue.cancel();
this.log.info('Cancelled Job Queue');
await this.synchronizer.stop();
this.log.info('Stopped');
this.log.info('Stopped Synchronizer');
}

/** Returns an estimate of the db size in bytes. */
Expand Down Expand Up @@ -336,18 +346,21 @@ export class PXEService implements PXE {
throw new Error(`Unspecified internal are not allowed`);
}

// We get the contract address from origin, since contract deployments are signalled as origin from their own address
// TODO: Is this ok? Should it be changed to be from ZERO?
const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined;
const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined;
// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// We get the contract address from origin, since contract deployments are signalled as origin from their own address
// TODO: Is this ok? Should it be changed to be from ZERO?
const deployedContractAddress = txRequest.txContext.isContractDeploymentTx ? txRequest.origin : undefined;
const newContract = deployedContractAddress ? await this.db.getContract(deployedContractAddress) : undefined;

const tx = await this.#simulateAndProve(txRequest, newContract);
if (simulatePublic) {
await this.#simulatePublicCalls(tx);
}
this.log.info(`Executed local simulation for ${await tx.getTxHash()}`);
const tx = await this.#simulateAndProve(txRequest, newContract);
if (simulatePublic) {
await this.#simulatePublicCalls(tx);
}
this.log.info(`Executed local simulation for ${await tx.getTxHash()}`);

return tx;
return tx;
});
}

public async sendTx(tx: Tx): Promise<TxHash> {
Expand All @@ -360,13 +373,21 @@ export class PXEService implements PXE {
return txHash;
}

public async viewTx(functionName: string, args: any[], to: AztecAddress, _from?: AztecAddress) {
// TODO - Should check if `from` has the permission to call the view function.
const functionCall = await this.#getFunctionCall(functionName, args, to);
const executionResult = await this.#simulateUnconstrained(functionCall);

// TODO - Return typed result based on the function artifact.
return executionResult;
public async viewTx(
functionName: string,
args: any[],
to: AztecAddress,
_from?: AztecAddress,
): Promise<DecodedReturn> {
// all simulations must be serialized w.r.t. the synchronizer
return await this.jobQueue.put(async () => {
// TODO - Should check if `from` has the permission to call the view function.
const functionCall = await this.#getFunctionCall(functionName, args, to);
const executionResult = await this.#simulateUnconstrained(functionCall);

// TODO - Return typed result based on the function artifact.
return executionResult;
});
}

public async getTxReceipt(txHash: TxHash): Promise<TxReceipt> {
Expand Down
7 changes: 5 additions & 2 deletions yarn-project/pxe/src/synchronizer/synchronizer.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BlockHeader, CompleteAddress, EthAddress, Fr, GrumpkinScalar } from '@aztec/circuits.js';
import { Grumpkin } from '@aztec/circuits.js/barretenberg';
import { SerialQueue } from '@aztec/foundation/fifo';
import { TestKeyStore } from '@aztec/key-store';
import { AztecLmdbStore } from '@aztec/kv-store';
import { AztecNode, INITIAL_L2_BLOCK_NUM, L2Block, MerkleTreeId } from '@aztec/types';
Expand All @@ -17,6 +18,7 @@ describe('Synchronizer', () => {
let synchronizer: TestSynchronizer;
let roots: Record<MerkleTreeId, Fr>;
let blockHeader: BlockHeader;
let jobQueue: SerialQueue;

beforeEach(async () => {
blockHeader = BlockHeader.random();
Expand All @@ -31,7 +33,8 @@ describe('Synchronizer', () => {

aztecNode = mock<AztecNode>();
database = new KVPxeDatabase(await AztecLmdbStore.create(EthAddress.random()));
synchronizer = new TestSynchronizer(aztecNode, database);
jobQueue = new SerialQueue();
synchronizer = new TestSynchronizer(aztecNode, database, jobQueue);
});

it('sets tree roots from aztec node on initial sync', async () => {
Expand Down Expand Up @@ -128,7 +131,7 @@ class TestSynchronizer extends Synchronizer {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<void> {
public workNoteProcessorCatchUp(): Promise<boolean> {
return super.workNoteProcessorCatchUp();
}
}
68 changes: 47 additions & 21 deletions yarn-project/pxe/src/synchronizer/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AztecAddress, BlockHeader, Fr, PublicKey } from '@aztec/circuits.js';
import { computeGlobalsHash } from '@aztec/circuits.js/abis';
import { SerialQueue } from '@aztec/foundation/fifo';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { InterruptibleSleep } from '@aztec/foundation/sleep';
import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockContext, L2BlockL2Logs, LogType } from '@aztec/types';
Expand All @@ -24,7 +25,7 @@ export class Synchronizer {
private log: DebugLogger;
private noteProcessorsToCatchUp: NoteProcessor[] = [];

constructor(private node: AztecNode, private db: PxeDatabase, logSuffix = '') {
constructor(private node: AztecNode, private db: PxeDatabase, private jobQueue: SerialQueue, logSuffix = '') {
this.log = createDebugLogger(logSuffix ? `aztec:pxe_synchronizer_${logSuffix}` : 'aztec:pxe_synchronizer');
}

Expand All @@ -36,23 +37,35 @@ export class Synchronizer {
* @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @param retryInterval - The time interval (in ms) to wait before retrying if no data is available.
*/
public async start(limit = 1, retryInterval = 1000) {
public start(limit = 1, retryInterval = 1000) {
if (this.running) {
return;
}
this.running = true;

await this.initialSync();
this.jobQueue
.put(() => this.initialSync())
.catch(err => {
this.log.error(`Error in synchronizer initial sync`, err);
this.running = false;
throw err;
});

const run = async () => {
while (this.running) {
if (this.noteProcessorsToCatchUp.length > 0) {
// There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor.
await this.workNoteProcessorCatchUp(limit, retryInterval);
} else {
// No note processor needs to catch up. We continue with the normal flow.
await this.work(limit, retryInterval);
}
await this.jobQueue.put(async () => {
let moreWork = true;
while (moreWork && this.running) {
if (this.noteProcessorsToCatchUp.length > 0) {
// There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor.
moreWork = await this.workNoteProcessorCatchUp(limit);
} else {
// No note processor needs to catch up. We continue with the normal flow.
moreWork = await this.work(limit);
}
}
});
await this.interruptibleSleep.sleep(retryInterval);
}
};

Expand All @@ -70,26 +83,29 @@ export class Synchronizer {
await this.db.setBlockData(latestBlockNumber, latestBlockHeader);
}

protected async work(limit = 1, retryInterval = 1000): Promise<void> {
/**
* Fetches encrypted logs and blocks from the Aztec node and processes them for all note processors.
*
* @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
*/
protected async work(limit = 1): Promise<boolean> {
const from = this.getSynchedBlockNumber() + 1;
try {
let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED);
if (!encryptedLogs.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED);
if (!unencryptedLogs.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

// Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks.
const blocks = await this.node.getBlocks(from, encryptedLogs.length);
if (!blocks.length) {
await this.interruptibleSleep.sleep(retryInterval);
return;
return false;
}

if (blocks.length !== encryptedLogs.length) {
Expand Down Expand Up @@ -120,21 +136,30 @@ export class Synchronizer {
for (const noteProcessor of this.noteProcessors) {
await noteProcessor.process(blockContexts, encryptedLogs);
}
return true;
} catch (err) {
this.log.error(`Error in synchronizer work`, err);
await this.interruptibleSleep.sleep(retryInterval);
return false;
}
}

protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise<void> {
/**
* Catch up a note processor that is lagging behind the main sync,
* e.g. because we just added a new account.
*
* @param limit - the maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @returns true if there could be more work, false if we're caught up or there was an error.
*/
protected async workNoteProcessorCatchUp(limit = 1): Promise<boolean> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const toBlockNumber = this.getSynchedBlockNumber();

if (noteProcessor.status.syncedToBlock >= toBlockNumber) {
// Note processor already synched, nothing to do
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
return;
// could be more work if there are more note processors to catch up
return true;
}

const from = noteProcessor.status.syncedToBlock + 1;
Expand Down Expand Up @@ -184,9 +209,10 @@ export class Synchronizer {
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
}
return true;
} catch (err) {
this.log.error(`Error in synchronizer workNoteProcessorCatchUp`, err);
await this.interruptibleSleep.sleep(retryInterval);
return false;
}
}

Expand Down

0 comments on commit e893675

Please sign in to comment.