Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new NoteProcessor works through all blocks #1404

Merged
merged 8 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
ContractPublicData,
DeployedContract,
FunctionCall,
INITIAL_L2_BLOCK_NUM,
KeyStore,
L2BlockL2Logs,
LogType,
Expand Down Expand Up @@ -69,7 +70,7 @@ export class AztecRPCServer implements AztecRPC {
* @returns A promise that resolves when the server has started successfully.
*/
public async start() {
await this.synchroniser.start(1, 1, this.config.l2BlockPollingIntervalMS);
await this.synchroniser.start(INITIAL_L2_BLOCK_NUM, 1, this.config.l2BlockPollingIntervalMS);
const info = await this.getNodeInfo();
this.log.info(`Started RPC server connected to chain ${info.chainId} version ${info.version}`);
}
Expand Down
39 changes: 38 additions & 1 deletion yarn-project/aztec-rpc/src/synchroniser/synchroniser.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Fr } from '@aztec/circuits.js';
import { AztecAddress, Fr, PrivateKey } from '@aztec/circuits.js';
import { Grumpkin } from '@aztec/circuits.js/barretenberg';
import { TestKeyStore } from '@aztec/key-store';
import { AztecNode, L2Block, MerkleTreeId } from '@aztec/types';

import { MockProxy, mock } from 'jest-mock-extended';
Expand Down Expand Up @@ -83,6 +85,37 @@ describe('Synchroniser', () => {
expect(roots5[MerkleTreeId.CONTRACT_TREE]).not.toEqual(roots[MerkleTreeId.CONTRACT_TREE]);
expect(roots5[MerkleTreeId.CONTRACT_TREE]).toEqual(block5.endContractTreeSnapshot.root);
});

it('note processor successfully catches up', async () => {
const block = L2Block.random(1, 4);

// getBlocks is called by both synchroniser.work and synchroniser.workNoteProcessorCatchUp
aztecNode.getBlocks.mockResolvedValue([L2Block.fromFields(omit(block, 'newEncryptedLogs', 'newUnencryptedLogs'))]);
aztecNode.getLogs
.mockResolvedValueOnce([block.newEncryptedLogs!]) // called by synchroniser.work
.mockResolvedValueOnce([block.newUnencryptedLogs!]) // called by synchroniser.work
.mockResolvedValueOnce([block.newEncryptedLogs!]); // called by synchroniser.workNoteProcessorCatchUp

// Sync the synchroniser so that note processor has something to catch up to
await synchroniser.work();

// Used in synchroniser.isAccountSynchronised
aztecNode.getBlockHeight.mockResolvedValueOnce(1);

// Manually adding account to database so that we can call synchroniser.isAccountSynchronised
const keyStore = new TestKeyStore(await Grumpkin.new());
keyStore.addAccount(PrivateKey.random());
const pubKey = (await keyStore.getAccounts())[0];
const address = AztecAddress.random();
await database.addPublicKeyAndPartialAddress(address, pubKey, new Fr(0));

// Add the account which will add the note processor to the synchroniser
synchroniser.addAccount(pubKey, keyStore);

await synchroniser.workNoteProcessorCatchUp();

expect(await synchroniser.isAccountSynchronised(address)).toBe(true);
});
});

class TestSynchroniser extends Synchroniser {
Expand All @@ -93,4 +126,8 @@ class TestSynchroniser extends Synchroniser {
public initialSync(): Promise<void> {
return super.initialSync();
}

public workNoteProcessorCatchUp(): Promise<void> {
return super.workNoteProcessorCatchUp();
}
}
87 changes: 76 additions & 11 deletions yarn-project/aztec-rpc/src/synchroniser/synchroniser.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AztecAddress, Fr, PublicKey } from '@aztec/circuits.js';
import { DebugLogger, createDebugLogger } from '@aztec/foundation/log';
import { InterruptableSleep } from '@aztec/foundation/sleep';
import { AztecNode, KeyStore, L2BlockContext, LogType, MerkleTreeId } from '@aztec/types';
import { AztecNode, INITIAL_L2_BLOCK_NUM, KeyStore, L2BlockContext, LogType, MerkleTreeId } from '@aztec/types';

import { Database, TxDao } from '../database/index.js';
import { NoteProcessor } from '../note_processor/index.js';
Expand All @@ -21,6 +21,7 @@ export class Synchroniser {
private initialSyncBlockHeight = 0;
private synchedToBlock = 0;
private log: DebugLogger;
private noteProcessorsToCatchUp: NoteProcessor[] = [];

constructor(private node: AztecNode, private db: Database, logSuffix = '') {
this.log = createDebugLogger(
Expand All @@ -37,15 +38,26 @@ export class Synchroniser {
* @param limit - The maximum number of encrypted, unencrypted logs and blocks to fetch in each iteration.
* @param retryInterval - The time interval (in ms) to wait before retrying if no data is available.
*/
public async start(from = 1, limit = 1, retryInterval = 1000) {
public async start(from = INITIAL_L2_BLOCK_NUM, limit = 1, retryInterval = 1000) {
if (this.running) return;
this.running = true;

if (from < this.synchedToBlock + 1) {
throw new Error(`From block ${from} is smaller than the currently synched block ${this.synchedToBlock}`);
}
this.synchedToBlock = from - 1;

await this.initialSync();

const run = async () => {
while (this.running) {
from = await this.work(from, limit, retryInterval);
if (this.noteProcessorsToCatchUp.length > 0) {
// There is a note processor that needs to catch up. We hijack the main loop to catch up the note processor.
await this.workNoteProcessorCatchUp(limit, retryInterval);
} else {
// No note processor needs to catch up. We continue with the normal flow.
await this.work(limit, retryInterval);
}
}
};

Expand All @@ -63,25 +75,26 @@ export class Synchroniser {
await this.db.setTreeRoots(treeRoots);
}

protected async work(from = 1, limit = 1, retryInterval = 1000): Promise<number> {
protected async work(limit = 1, retryInterval = 1000): Promise<void> {
const from = this.synchedToBlock + 1;
try {
let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED);
if (!encryptedLogs.length) {
await this.interruptableSleep.sleep(retryInterval);
return from;
return;
}

let unencryptedLogs = await this.node.getLogs(from, limit, LogType.UNENCRYPTED);
if (!unencryptedLogs.length) {
await this.interruptableSleep.sleep(retryInterval);
return from;
return;
}

// Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks.
const blocks = await this.node.getBlocks(from, encryptedLogs.length);
if (!blocks.length) {
await this.interruptableSleep.sleep(retryInterval);
return from;
return;
}

if (blocks.length !== encryptedLogs.length) {
Expand Down Expand Up @@ -116,13 +129,57 @@ export class Synchroniser {

await this.updateBlockInfoInBlockTxs(blockContexts);

from += encryptedLogs.length;
this.synchedToBlock = latestBlock.block.number;
return from;
} catch (err) {
this.log(err);
await this.interruptableSleep.sleep(retryInterval);
return from;
}
}

protected async workNoteProcessorCatchUp(limit = 1, retryInterval = 1000): Promise<void> {
const noteProcessor = this.noteProcessorsToCatchUp[0];
const from = noteProcessor.status.syncedToBlock + 1;
// Ensuring that the note processor does not sync further than the main sync.
limit = Math.min(limit, this.synchedToBlock - from + 1);

if (limit < 1) {
throw new Error(`Unexpected limit ${limit} for note processor catch up`);
}

try {
let encryptedLogs = await this.node.getLogs(from, limit, LogType.ENCRYPTED);
if (!encryptedLogs.length) {
// This should never happen because this function should only be called when the note processor is lagging
// behind main sync.
throw new Error('No encrypted logs in processor catch up mode');
}

// Note: If less than `limit` encrypted logs is returned, then we fetch only that number of blocks.
const blocks = await this.node.getBlocks(from, encryptedLogs.length);
if (!blocks.length) {
// This should never happen because this function should only be called when the note processor is lagging
// behind main sync.
throw new Error('No blocks in processor catch up mode');
}

if (blocks.length !== encryptedLogs.length) {
// "Trim" the encrypted logs to match the number of blocks.
encryptedLogs = encryptedLogs.slice(0, blocks.length);
}

const blockContexts = blocks.map(block => new L2BlockContext(block));

this.log(`Forwarding ${encryptedLogs.length} encrypted logs and blocks to note processor in catch up mode`);
await noteProcessor.process(blockContexts, encryptedLogs);

if (noteProcessor.status.syncedToBlock === this.synchedToBlock) {
// Note processor caught up, move it to `noteProcessors` from `noteProcessorsToCatchUp`.
this.noteProcessorsToCatchUp.shift();
this.noteProcessors.push(noteProcessor);
}
} catch (err) {
this.log(err);
await this.interruptableSleep.sleep(retryInterval);
}
}

Expand Down Expand Up @@ -172,7 +229,15 @@ export class Synchroniser {
if (processor) {
return;
}
this.noteProcessors.push(new NoteProcessor(publicKey, keyStore, this.db, this.node));

const noteProcessor = new NoteProcessor(publicKey, keyStore, this.db, this.node);
if (this.synchedToBlock === 0) {
// The main sync thread was never started before and for this reason the synchroniser does not have to catch up
this.noteProcessors.push(noteProcessor);
} else {
// The main sync thread was started before and for this reason the synchroniser has to catch up
this.noteProcessorsToCatchUp.push(noteProcessor);
}
Comment on lines +234 to +240
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benesjan sorry for going back to a merged PR, but I think you may have a race condition here: if a noteProcessor is added on the first run of work during await this.updateBlockInfoInBlockTxs(blockContexts), which happens after the note processors are notified but before updating synchedToBlock, then it will be pushed to noteProcessors but will never get the notes from the first iteration.

If the above is correct, maybe you can solve it with a flag isStarted that you set as soon as start is called, and check that instead of synchedToBlock?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @spalladino is right. Perhaps a clean solution is to always add the note processor to noteProcessorsToCatchUp and have workNoteProcessorCatchUp transfer it to noteProcessors in all scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spalladino great catch! Will address

}

/**
Expand Down
5 changes: 4 additions & 1 deletion yarn-project/types/src/logs/function_l2_logs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { sha256 } from '@aztec/foundation/crypto';
import { Point } from '@aztec/foundation/fields';
import { BufferReader, serializeBufferToVector } from '@aztec/foundation/serialize';

import { randomBytes } from 'crypto';
Expand Down Expand Up @@ -69,7 +70,9 @@ export class FunctionL2Logs {
public static random(numLogs: number): FunctionL2Logs {
const logs: Buffer[] = [];
for (let i = 0; i < numLogs; i++) {
logs.push(randomBytes(144));
const randomEphPubKey = Point.random();
const randomLogContent = randomBytes(144 - Point.SIZE_IN_BYTES);
logs.push(Buffer.concat([randomLogContent, randomEphPubKey.toBuffer()]));
}
return new FunctionL2Logs(logs);
}
Expand Down