Skip to content

Commit

Permalink
feat: Prover node sends quotes on new epochs
Browse files Browse the repository at this point in the history
Modifies the prover node so it no longer automatically attempts to prove
unproven blocks, but instead sends quotes to the proof coordinator. Also
monitors for claims, and if one that matches the prover address is
posted, only then starts a new epoch proving job.
  • Loading branch information
spalladino committed Oct 1, 2024
1 parent 2f1d19a commit 5901dc2
Show file tree
Hide file tree
Showing 48 changed files with 1,114 additions and 269 deletions.
3 changes: 2 additions & 1 deletion yarn-project/archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"type": "module",
"exports": {
".": "./dest/index.js",
"./data-retrieval": "./dest/archiver/data_retrieval.js"
"./data-retrieval": "./dest/archiver/data_retrieval.js",
"./test": "./dest/test/index.js"
},
"typedocOptions": {
"entryPoints": [
Expand Down
118 changes: 109 additions & 9 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
ContractClassRegisteredEvent,
ContractInstanceDeployedEvent,
type FunctionSelector,
type Header,
PrivateFunctionBroadcastedEvent,
UnconstrainedFunctionBroadcastedEvent,
isValidPrivateFunctionMembershipProof,
Expand Down Expand Up @@ -57,6 +58,12 @@ import {
import { type ArchiverDataStore, type ArchiverL1SynchPoint } from './archiver_store.js';
import { type ArchiverConfig } from './config.js';
import { retrieveBlockFromRollup, retrieveL1ToL2Messages } from './data_retrieval.js';
import {
getEpochNumberAtTimestamp,
getSlotAtTimestamp,
getSlotRangeForEpoch,
getTimestampRangeForEpoch,
} from './epoch_helpers.js';
import { ArchiverInstrumentation } from './instrumentation.js';
import { type DataRetrieval } from './structs/data_retrieval.js';
import { type L1Published } from './structs/published.js';
Expand All @@ -82,6 +89,9 @@ export class Archiver implements ArchiveSource {

private store: ArchiverStoreHelper;

public l1BlockNumber: bigint | undefined;
public l1Timestamp: bigint | undefined;

/**
* Creates a new instance of the Archiver.
* @param publicClient - A client for interacting with the Ethereum node.
Expand All @@ -98,9 +108,9 @@ export class Archiver implements ArchiveSource {
readonly inboxAddress: EthAddress,
private readonly registryAddress: EthAddress,
readonly dataStore: ArchiverDataStore,
private readonly pollingIntervalMs = 10_000,
private readonly pollingIntervalMs: number,
private readonly instrumentation: ArchiverInstrumentation,
private readonly l1StartBlock: bigint = 0n,
private readonly l1constants: L1RollupConstants = EmptyL1RollupConstants,
private readonly log: DebugLogger = createDebugLogger('aztec:archiver'),
) {
this.store = new ArchiverStoreHelper(dataStore);
Expand Down Expand Up @@ -144,17 +154,20 @@ export class Archiver implements ArchiveSource {
client: publicClient,
});

const l1StartBlock = await rollup.read.L1_BLOCK_AT_GENESIS();
const [l1StartBlock, l1GenesisTime] = await Promise.all([
rollup.read.L1_BLOCK_AT_GENESIS(),
rollup.read.GENESIS_TIME(),
] as const);

const archiver = new Archiver(
publicClient,
config.l1Contracts.rollupAddress,
config.l1Contracts.inboxAddress,
config.l1Contracts.registryAddress,
archiverStore,
config.archiverPollingIntervalMS,
config.archiverPollingIntervalMS ?? 10_000,
new ArchiverInstrumentation(telemetry),
BigInt(l1StartBlock),
{ l1StartBlock, l1GenesisTime },
);
await archiver.start(blockUntilSynced);
return archiver;
Expand Down Expand Up @@ -206,8 +219,8 @@ export class Archiver implements ArchiveSource {
*
* This code does not handle reorgs.
*/
const { blocksSynchedTo = this.l1StartBlock, messagesSynchedTo = this.l1StartBlock } =
await this.store.getSynchPoint();
const { l1StartBlock } = this.l1constants;
const { blocksSynchedTo = l1StartBlock, messagesSynchedTo = l1StartBlock } = await this.store.getSynchPoint();
const currentL1BlockNumber = await this.publicClient.getBlockNumber();

// ********** Ensuring Consistency of data pulled from L1 **********
Expand All @@ -234,6 +247,12 @@ export class Archiver implements ArchiveSource {

// ********** Events that are processed per L2 block **********
await this.handleL2blocks(blockUntilSynced, blocksSynchedTo, currentL1BlockNumber);

// Store latest l1 block number and timestamp seen. Used for epoch and slots calculations.
if (!this.l1BlockNumber || this.l1BlockNumber < currentL1BlockNumber) {
this.l1Timestamp = await this.publicClient.getBlock({ blockNumber: currentL1BlockNumber }).then(b => b.timestamp);
this.l1BlockNumber = currentL1BlockNumber;
}
}

private async handleL1ToL2Messages(
Expand Down Expand Up @@ -421,6 +440,68 @@ export class Archiver implements ArchiveSource {
return Promise.resolve(this.registryAddress);
}

public getL1BlockNumber(): bigint {
const l1BlockNumber = this.l1BlockNumber;
if (!l1BlockNumber) {
throw new Error('L1 block number not yet available. Complete an initial sync first.');
}
return l1BlockNumber;
}

public getL1Timestamp(): bigint {
const l1Timestamp = this.l1Timestamp;
if (!l1Timestamp) {
throw new Error('L1 timestamp not yet available. Complete an initial sync first.');
}
return l1Timestamp;
}

public getL2SlotNumber(): Promise<bigint> {
return Promise.resolve(getSlotAtTimestamp(this.getL1Timestamp(), this.l1constants));
}

public getL2EpochNumber(): Promise<bigint> {
return Promise.resolve(getEpochNumberAtTimestamp(this.getL1Timestamp(), this.l1constants));
}

public async getBlocksForEpoch(epochNumber: bigint): Promise<L2Block[]> {
const [start, end] = getSlotRangeForEpoch(epochNumber);
const blocks: L2Block[] = [];

// Walk the list of blocks backwards and filter by slots matching the requested epoch.
// We'll typically ask for blocks for a very recent epoch, so we shouldn't need an index here.
let block = await this.getBlock(await this.store.getSynchedL2BlockNumber());
const slot = (b: L2Block) => b.header.globalVariables.slotNumber.toBigInt();
while (block && slot(block) >= start) {
if (slot(block) <= end) {
blocks.push(block);
}
block = await this.getBlock(block.number - 1);
}

return blocks;
}

public async isEpochComplete(epochNumber: bigint): Promise<boolean> {
// The epoch is complete if the current L2 block is the last one in the epoch (or later)
const header = await this.getBlockHeader('latest');
const slot = header?.globalVariables.slotNumber.toBigInt();
const [_startSlot, endSlot] = getSlotRangeForEpoch(epochNumber);
if (slot && slot >= endSlot) {
return true;
}

// If not, the epoch may also be complete if the L2 slot has passed without a block
// We compute this based on the timestamp for the given epoch and the timestamp of the last L1 block
const l1Timestamp = this.getL1Timestamp();
const [_startTimestamp, endTimestamp] = getTimestampRangeForEpoch(epochNumber, this.l1constants);

// For this computation, we throw in a few extra seconds just for good measure,
// since we know the next L1 block won't be mined within this range
const leeway = 3n;
return l1Timestamp + leeway >= endTimestamp;
}

/**
* Gets up to `limit` amount of L2 blocks starting from `from`.
* @param from - Number of the first block to return (inclusive).
Expand Down Expand Up @@ -452,6 +533,14 @@ export class Archiver implements ArchiveSource {
return blocks.length === 0 ? undefined : blocks[0].data;
}

public async getBlockHeader(number: number | 'latest'): Promise<Header | undefined> {
if (number === 'latest') {
number = await this.store.getSynchedL2BlockNumber();
}
const headers = await this.store.getBlockHeaders(number, 1);
return headers.length === 0 ? undefined : headers[0];
}

public getTxEffect(txHash: TxHash): Promise<TxEffect | undefined> {
return this.store.getTxEffect(txHash);
}
Expand Down Expand Up @@ -735,11 +824,12 @@ class ArchiverStoreHelper
getBlocks(from: number, limit: number): Promise<L1Published<L2Block>[]> {
return this.store.getBlocks(from, limit);
}

getBlockHeaders(from: number, limit: number): Promise<Header[]> {
return this.store.getBlockHeaders(from, limit);
}
getTxEffect(txHash: TxHash): Promise<TxEffect | undefined> {
return this.store.getTxEffect(txHash);
}

getSettledTxReceipt(txHash: TxHash): Promise<TxReceipt | undefined> {
return this.store.getSettledTxReceipt(txHash);
}
Expand Down Expand Up @@ -805,3 +895,13 @@ class ArchiverStoreHelper
return this.store.getTotalL1ToL2MessageCount();
}
}

type L1RollupConstants = {
l1StartBlock: bigint;
l1GenesisTime: bigint;
};

const EmptyL1RollupConstants: L1RollupConstants = {
l1StartBlock: 0n,
l1GenesisTime: 0n,
};
10 changes: 9 additions & 1 deletion yarn-project/archiver/src/archiver/archiver_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
type TxHash,
type TxReceipt,
} from '@aztec/circuit-types';
import { type Fr } from '@aztec/circuits.js';
import { type Fr, type Header } from '@aztec/circuits.js';
import { type ContractArtifact } from '@aztec/foundation/abi';
import { type AztecAddress } from '@aztec/foundation/aztec-address';
import {
Expand Down Expand Up @@ -64,6 +64,14 @@ export interface ArchiverDataStore {
*/
getBlocks(from: number, limit: number): Promise<L1Published<L2Block>[]>;

/**
* Gets up to `limit` amount of L2 block headers starting from `from`.
* @param from - Number of the first block to return (inclusive).
* @param limit - The number of blocks to return.
* @returns The requested L2 block headers.
*/
getBlockHeaders(from: number, limit: number): Promise<Header[]>;

/**
* Gets a tx effect.
* @param txHash - The txHash of the tx corresponding to the tx effect.
Expand Down
26 changes: 26 additions & 0 deletions yarn-project/archiver/src/archiver/epoch_helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AZTEC_EPOCH_DURATION, AZTEC_SLOT_DURATION } from '@aztec/circuits.js';

/** Returns the slot number for a given timestamp. */
export function getSlotAtTimestamp(ts: bigint, constants: { l1GenesisTime: bigint }) {
return ts < constants.l1GenesisTime ? 0n : (ts - constants.l1GenesisTime) / BigInt(AZTEC_SLOT_DURATION);
}

/** Returns the epoch number for a given timestamp. */
export function getEpochNumberAtTimestamp(ts: bigint, constants: { l1GenesisTime: bigint }) {
return getSlotAtTimestamp(ts, constants) / BigInt(AZTEC_EPOCH_DURATION);
}

/** Returns the range of slots (inclusive) for a given epoch number. */
export function getSlotRangeForEpoch(epochNumber: bigint) {
const startSlot = epochNumber * BigInt(AZTEC_EPOCH_DURATION);
return [startSlot, startSlot + BigInt(AZTEC_EPOCH_DURATION) - 1n];
}

/** Returns the range of L1 timestamps (inclusive) for a given epoch number. */
export function getTimestampRangeForEpoch(epochNumber: bigint, constants: { l1GenesisTime: bigint }) {
const [startSlot, endSlot] = getSlotRangeForEpoch(epochNumber);
return [
constants.l1GenesisTime + startSlot * BigInt(AZTEC_SLOT_DURATION),
constants.l1GenesisTime + endSlot * BigInt(AZTEC_SLOT_DURATION),
];
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ export class BlockStore {
return this.getBlockFromBlockStorage(blockStorage);
}

/**
* Gets the headers for a sequence of L2 blocks.
* @param start - Number of the first block to return (inclusive).
* @param limit - The number of blocks to return.
* @returns The requested L2 block headers
*/
*getBlockHeaders(start: number, limit: number): IterableIterator<Header> {
for (const blockStorage of this.#blocks.values(this.#computeBlockRange(start, limit))) {
yield Header.fromBuffer(blockStorage.header);
}
}

private getBlockFromBlockStorage(blockStorage: BlockStorage) {
const header = Header.fromBuffer(blockStorage.header);
const archive = AppendOnlyTreeSnapshot.fromBuffer(blockStorage.archive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
type TxHash,
type TxReceipt,
} from '@aztec/circuit-types';
import { type Fr } from '@aztec/circuits.js';
import { type Fr, type Header } from '@aztec/circuits.js';
import { type ContractArtifact } from '@aztec/foundation/abi';
import { type AztecAddress } from '@aztec/foundation/aztec-address';
import { createDebugLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -136,6 +136,22 @@ export class KVArchiverDataStore implements ArchiverDataStore {
}
}

/**
* Gets up to `limit` amount of L2 blocks headers starting from `from`.
*
* @param start - Number of the first block to return (inclusive).
* @param limit - The number of blocks to return.
* @returns The requested L2 blocks
*/
getBlockHeaders(start: number, limit: number): Promise<Header[]> {
try {
return Promise.resolve(Array.from(this.#blockStore.getBlockHeaders(start, limit)));
} catch (err) {
// this function is sync so if any errors are thrown we need to make sure they're passed on as rejected Promises
return Promise.reject(err);
}
}

/**
* Gets a tx effect.
* @param txHash - The txHash of the tx corresponding to the tx effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
TxReceipt,
type UnencryptedL2BlockL2Logs,
} from '@aztec/circuit-types';
import { Fr, INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js';
import { Fr, type Header, INITIAL_L2_BLOCK_NUM } from '@aztec/circuits.js';
import { type ContractArtifact } from '@aztec/foundation/abi';
import { type AztecAddress } from '@aztec/foundation/aztec-address';
import {
Expand Down Expand Up @@ -262,7 +262,6 @@ export class MemoryArchiverStore implements ArchiverDataStore {
* @remarks When "from" is smaller than genesis block number, blocks from the beginning are returned.
*/
public getBlocks(from: number, limit: number): Promise<L1Published<L2Block>[]> {
// Return an empty array if we are outside of range
if (limit < 1) {
return Promise.reject(new Error(`Invalid limit: ${limit}`));
}
Expand All @@ -280,6 +279,11 @@ export class MemoryArchiverStore implements ArchiverDataStore {
return Promise.resolve(this.l2Blocks.slice(fromIndex, toIndex));
}

public async getBlockHeaders(from: number, limit: number): Promise<Header[]> {
const blocks = await this.getBlocks(from, limit);
return blocks.map(block => block.data.header);
}

/**
* Gets a tx effect.
* @param txHash - The txHash of the tx effect.
Expand Down
1 change: 1 addition & 0 deletions yarn-project/archiver/src/test/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './mock_l2_block_source.js';
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { L2Block, type L2BlockSource, type TxEffect, type TxHash, TxReceipt, TxStatus } from '@aztec/circuit-types';
import { EthAddress } from '@aztec/circuits.js';
import { EthAddress, type Header } from '@aztec/circuits.js';

import { getSlotRangeForEpoch } from '../archiver/epoch_helpers.js';

/**
* A mocked implementation of L2BlockSource to be used in p2p tests.
Expand Down Expand Up @@ -85,6 +87,19 @@ export class MockBlockSource implements L2BlockSource {
);
}

getBlockHeader(number: number | 'latest'): Promise<Header | undefined> {
return Promise.resolve(this.l2Blocks.at(typeof number === 'number' ? number : -1)?.header);
}

getBlocksForEpoch(epochNumber: bigint): Promise<L2Block[]> {
const [start, end] = getSlotRangeForEpoch(epochNumber);
const blocks = this.l2Blocks.filter(b => {
const slot = b.header.globalVariables.slotNumber.toBigInt();
return slot >= start && slot <= end;
});
return Promise.resolve(blocks);
}

/**
* Gets a tx effect.
* @param txHash - The hash of a transaction which resulted in the returned tx effect.
Expand Down Expand Up @@ -120,6 +135,18 @@ export class MockBlockSource implements L2BlockSource {
return Promise.resolve(undefined);
}

getL2EpochNumber(): Promise<bigint> {
throw new Error('Method not implemented.');
}

getL2SlotNumber(): Promise<bigint> {
throw new Error('Method not implemented.');
}

isEpochComplete(_epochNumber: bigint): Promise<boolean> {
throw new Error('Method not implemented.');
}

/**
* Starts the block source. In this mock implementation, this is a noop.
* @returns A promise that signals the initialization of the l2 block source on completion.
Expand Down
Loading

0 comments on commit 5901dc2

Please sign in to comment.