Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prover): Handle starting blocks out of order in prover #10350

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion yarn-project/circuit-types/src/interfaces/epoch-prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ export interface EpochProver extends Omit<BlockBuilder, 'setBlockCompleted'> {
/**
* Starts a new epoch. Must be the first method to be called.
* @param epochNumber - The epoch number.
* @param firstBlockNumber - The block number of the first block in the epoch.
* @param totalNumBlocks - The total number of blocks expected in the epoch (must be at least one).
**/
startNewEpoch(epochNumber: number, totalNumBlocks: number): void;
startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number): void;

/** Pads the block with empty txs if it hasn't reached the declared number of txs. */
setBlockCompleted(blockNumber: number, expectedBlockHeader?: Header): Promise<L2Block>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ export class EpochProvingState {
private mergeRollupInputs: BlockMergeRollupInputData[] = [];
public rootRollupPublicInputs: RootRollupPublicInputs | undefined;
public finalProof: Proof | undefined;
public blocks: BlockProvingState[] = [];
public blocks: (BlockProvingState | undefined)[] = [];

constructor(
public readonly epochNumber: number,
public readonly firstBlockNumber: number,
public readonly totalNumBlocks: number,
private completionCallback: (result: ProvingResult) => void,
private rejectionCallback: (reason: string) => void,
Expand Down Expand Up @@ -106,8 +107,9 @@ export class EpochProvingState {
archiveTreeRootSiblingPath: Tuple<Fr, typeof ARCHIVE_HEIGHT>,
previousBlockHash: Fr,
): BlockProvingState {
const index = globalVariables.blockNumber.toNumber() - this.firstBlockNumber;
const block = new BlockProvingState(
this.blocks.length,
index,
numTxs,
globalVariables,
padArrayEnd(l1ToL2Messages, Fr.ZERO, NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP),
Expand All @@ -119,8 +121,8 @@ export class EpochProvingState {
previousBlockHash,
this,
);
this.blocks.push(block);
if (this.blocks.length === this.totalNumBlocks) {
this.blocks[index] = block;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CI is failing because there are holes in the array now and this function can't deal with that 😢

public getBlockProvingStateByBlockNumber(blockNumber: number) {
return this.blocks.find(block => block.blockNumber === blockNumber);
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh good catch, thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 084c68a

if (this.blocks.filter(b => !!b).length === this.totalNumBlocks) {
this.provingStateLifecycle = PROVING_STATE_LIFECYCLE.PROVING_STATE_FULL;
}
return block;
Expand Down Expand Up @@ -176,7 +178,7 @@ export class EpochProvingState {

// Returns a specific transaction proving state
public getBlockProvingStateByBlockNumber(blockNumber: number) {
return this.blocks.find(block => block.blockNumber === blockNumber);
return this.blocks.find(block => block?.blockNumber === blockNumber);
}

// Returns a set of merge rollup inputs
Expand Down
11 changes: 7 additions & 4 deletions yarn-project/prover-client/src/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ export class ProvingOrchestrator implements EpochProver {
this.paddingTxProof = undefined;
}

public startNewEpoch(epochNumber: number, totalNumBlocks: number) {
public startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number) {
const { promise: _promise, resolve, reject } = promiseWithResolvers<ProvingResult>();
const promise = _promise.catch((reason): ProvingResult => ({ status: 'failure', reason }));
if (totalNumBlocks <= 0 || !Number.isInteger(totalNumBlocks)) {
throw new Error(`Invalid number of blocks for epoch (got ${totalNumBlocks})`);
}
logger.info(`Starting epoch ${epochNumber} with ${totalNumBlocks} blocks`);
this.provingState = new EpochProvingState(epochNumber, totalNumBlocks, resolve, reject);
this.provingState = new EpochProvingState(epochNumber, firstBlockNumber, totalNumBlocks, resolve, reject);
this.provingPromise = promise;
}

Expand Down Expand Up @@ -336,7 +336,7 @@ export class ProvingOrchestrator implements EpochProver {

/** Returns the block as built for a given index. */
public getBlock(index: number): L2Block {
const block = this.provingState?.blocks[index].block;
const block = this.provingState?.blocks[index]?.block;
if (!block) {
throw new Error(`Block at index ${index} not available`);
}
Expand All @@ -354,7 +354,10 @@ export class ProvingOrchestrator implements EpochProver {
})
private padEpoch(): Promise<void> {
const provingState = this.provingState!;
const lastBlock = maxBy(provingState.blocks, b => b.blockNumber)?.block;
const lastBlock = maxBy(
provingState.blocks.filter(b => !!b),
b => b!.blockNumber,
)?.block;
if (!lastBlock) {
return Promise.reject(new Error(`Epoch needs at least one completed block in order to be padded`));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('prover/orchestrator/errors', () => {
it('throws if adding too many transactions', async () => {
const txs = times(4, i => context.makeProcessedTx(i + 1));

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(txs.length, context.globalVariables, []);

for (const tx of txs) {
Expand All @@ -43,7 +43,7 @@ describe('prover/orchestrator/errors', () => {
});

it('throws if adding too many blocks', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);
await orchestrator.setBlockCompleted(context.blockNumber);

Expand All @@ -59,29 +59,29 @@ describe('prover/orchestrator/errors', () => {
});

it('throws if adding a transaction before starting block', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(async () => await orchestrator.addNewTx(context.makeProcessedTx())).rejects.toThrow(
/Block proving state for 1 not found/,
);
});

it('throws if completing a block before start', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(async () => await orchestrator.setBlockCompleted(context.blockNumber)).rejects.toThrow(
/Block proving state for 1 not found/,
);
});

it('throws if setting an incomplete block as completed', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(3, context.globalVariables, []);
await expect(async () => await orchestrator.setBlockCompleted(context.blockNumber)).rejects.toThrow(
`Block not ready for completion: expecting ${3} more transactions.`,
);
});

it('throws if adding to a cancelled block', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);
orchestrator.cancel();

Expand All @@ -93,23 +93,23 @@ describe('prover/orchestrator/errors', () => {
it.each([[-4], [0], [1], [8.1]] as const)(
'fails to start a block with %i transactions',
async (blockSize: number) => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(
async () => await orchestrator.startNewBlock(blockSize, context.globalVariables, []),
).rejects.toThrow(`Invalid number of txs for block (got ${blockSize})`);
},
);

it.each([[-4], [0], [8.1]] as const)('fails to start an epoch with %i blocks', (epochSize: number) => {
orchestrator.startNewEpoch(1, 1);
expect(() => orchestrator.startNewEpoch(1, epochSize)).toThrow(
orchestrator.startNewEpoch(1, 1, 1);
expect(() => orchestrator.startNewEpoch(1, 1, epochSize)).toThrow(
`Invalid number of blocks for epoch (got ${epochSize})`,
);
});

it('rejects if too many l1 to l2 messages are provided', async () => {
const l1ToL2Messages = new Array(100).fill(new Fr(0n));
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(
async () => await orchestrator.startNewBlock(2, context.globalVariables, l1ToL2Messages),
).rejects.toThrow('Too many L1 to L2 messages');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('prover/orchestrator/failures', () => {
// We generate them and add them as part of the pending chain
const blocks = await timesAsync(3, i => context.makePendingBlock(3, 1, i + 1, j => ({ privateOnly: j === 1 })));

orchestrator.startNewEpoch(1, 3);
orchestrator.startNewEpoch(1, 1, 3);

for (const { block, txs, msgs } of blocks) {
// these operations could fail if the target circuit fails before adding all blocks or txs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('prover/orchestrator/lifecycle', () => {
return deferred.promise;
});

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);

await sleep(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('prover/orchestrator/mixed-blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(3, context.globalVariables, l1ToL2Messages);
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
Expand All @@ -41,7 +41,7 @@ describe('prover/orchestrator/mixed-blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(txs.length, context.globalVariables, l1ToL2Messages);

for (const tx of txs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('prover/orchestrator/public-functions', () => {
tx.data.constants.protocolContractTreeRoot = protocolContractTreeRoot;
}

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(numTransactions, context.globalVariables, []);

const [processed, failed] = await context.processPublicFunctions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ describe('prover/orchestrator/multi-block', () => {
describe('multiple blocks', () => {
it.each([1, 4, 5])('builds an epoch with %s blocks in sequence', async (numBlocks: number) => {
logger.info(`Seeding world state with ${numBlocks} blocks`);
const txCount = 1;
const txCount = 2;
const blocks = await timesAsync(numBlocks, i => context.makePendingBlock(txCount, 0, i + 1));

logger.info(`Starting new epoch with ${numBlocks}`);
context.orchestrator.startNewEpoch(1, numBlocks);
context.orchestrator.startNewEpoch(1, 1, numBlocks);
for (const { block, txs } of blocks) {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
for (const tx of txs) {
Expand All @@ -41,15 +41,17 @@ describe('prover/orchestrator/multi-block', () => {

it.each([1, 4, 5])('builds an epoch with %s blocks in parallel', async (numBlocks: number) => {
logger.info(`Seeding world state with ${numBlocks} blocks`);
const txCount = 1;
const txCount = 2;
const blocks = await timesAsync(numBlocks, i => context.makePendingBlock(txCount, 0, i + 1));

logger.info(`Starting new epoch with ${numBlocks}`);
context.orchestrator.startNewEpoch(1, numBlocks);
context.orchestrator.startNewEpoch(1, 1, numBlocks);
await Promise.all(
blocks.map(async ({ block, txs }) => {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
await Promise.all(txs.map(tx => context.orchestrator.addNewTx(tx)));
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
}
await context.orchestrator.setBlockCompleted(block.number);
}),
);
Expand All @@ -59,5 +61,32 @@ describe('prover/orchestrator/multi-block', () => {
expect(epoch.publicInputs.endBlockNumber.toNumber()).toEqual(numBlocks);
expect(epoch.proof).toBeDefined();
});

it('builds two consecutive epochs', async () => {
const numEpochs = 2;
const numBlocks = 4;
const txCount = 2;
logger.info(`Seeding world state with ${numBlocks * numEpochs} blocks`);
const blocks = await timesAsync(numBlocks * numEpochs, i => context.makePendingBlock(txCount, 0, i + 1));

for (let epochIndex = 0; epochIndex < numEpochs; epochIndex++) {
logger.info(`Starting epoch ${epochIndex + 1} with ${numBlocks} blocks`);
context.orchestrator.startNewEpoch(epochIndex + 1, epochIndex * numBlocks + 1, numBlocks);
await Promise.all(
blocks.slice(epochIndex * numBlocks, (epochIndex + 1) * numBlocks).map(async ({ block, txs }) => {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
}
await context.orchestrator.setBlockCompleted(block.number);
}),
);

logger.info('Finalising epoch');
const epoch = await context.orchestrator.finaliseEpoch();
expect(epoch.publicInputs.endBlockNumber.toNumber()).toEqual(numBlocks + epochIndex * numBlocks);
expect(epoch.proof).toBeDefined();
}
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe('prover/orchestrator/public-functions', () => {
const [processed, _] = await context.processPublicFunctions([tx], 1, undefined);

// This will need to be a 2 tx block
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

for (const processedTx of processed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('prover/orchestrator/blocks', () => {

describe('blocks', () => {
it('builds an empty L2 block', async () => {
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

const block = await context.orchestrator.setBlockCompleted(context.blockNumber);
Expand All @@ -34,7 +34,7 @@ describe('prover/orchestrator/blocks', () => {
const txs = [context.makeProcessedTx(1)];

// This will need to be a 2 tx block
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

for (const tx of txs) {
Expand All @@ -51,7 +51,7 @@ describe('prover/orchestrator/blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(txs.length, context.globalVariables, l1ToL2Messages);

for (const tx of txs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('prover/orchestrator', () => {
}
});

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, globalVariables, [message]);

await sleep(10);
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('prover/orchestrator', () => {
});

it('waits for block to be completed before enqueueing block root proof', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, globalVariables, []);
await orchestrator.addNewTx(context.makeProcessedTx(1));
await orchestrator.addNewTx(context.makeProcessedTx(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export class ProvingAgent {
) => {
if (err) {
const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false;
this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`);
this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err);
return this.broker.reportProvingJobError(jobId, err.message, retry);
} else if (result) {
const outputUri = await this.proofStore.saveProofOutput(jobId, type, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('prover/bb_prover/full-rollup', () => {
log.info(`Proving epoch with ${blockCount}/${totalBlocks} blocks with ${nonEmptyTxs}/${totalTxs} non-empty txs`);

const initialHeader = context.getHeader(0);
context.orchestrator.startNewEpoch(1, totalBlocks);
context.orchestrator.startNewEpoch(1, 1, totalBlocks);

for (let blockNum = 1; blockNum <= blockCount; blockNum++) {
const globals = makeGlobals(blockNum);
Expand Down Expand Up @@ -102,7 +102,7 @@ describe('prover/bb_prover/full-rollup', () => {
Fr.random,
);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);

await context.orchestrator.startNewBlock(numTransactions, context.globalVariables, l1ToL2Messages);

Expand Down
5 changes: 3 additions & 2 deletions yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ export class EpochProvingJob {
public async run() {
const epochNumber = Number(this.epochNumber);
const epochSize = this.blocks.length;
this.log.info(`Starting epoch proving job`, { epochSize, epochNumber, uuid: this.uuid });
const firstBlockNumber = this.blocks[0].number;
this.log.info(`Starting epoch proving job`, { firstBlockNumber, epochSize, epochNumber, uuid: this.uuid });
this.state = 'processing';
const timer = new Timer();

const { promise, resolve } = promiseWithResolvers<void>();
this.runPromise = promise;

try {
this.prover.startNewEpoch(epochNumber, epochSize);
this.prover.startNewEpoch(epochNumber, firstBlockNumber, epochSize);

await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => {
const globalVariables = block.header.globalVariables;
Expand Down
Loading