From 440a8ef524caa1b1c3ac885f7d2ccfb1b93a94e0 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Dec 2024 10:27:23 +0000 Subject: [PATCH] refactor: orchestrator talks directly to broker --- .../src/interfaces/prover-broker.ts | 6 - .../src/interfaces/prover-client.ts | 35 +--- yarn-project/foundation/src/config/env_var.ts | 1 - yarn-project/foundation/src/string/index.ts | 4 + yarn-project/prover-client/src/index.ts | 1 - .../src/prover-client/prover-client.ts | 13 +- .../broker_prover_facade.test.ts | 156 ++++++++++++++++++ ...oker_facade.ts => broker_prover_facade.ts} | 109 +++++------- .../caching_broker_facade.test.ts | 156 ------------------ .../src/proving_broker/prover_cache/memory.ts | 20 --- .../src/proving_broker/proving_agent.ts | 14 +- .../src/proving_broker/proving_broker.test.ts | 90 ++++------ .../src/proving_broker/proving_broker.ts | 129 +++++++++++---- .../proving_job_controller.test.ts | 1 + .../proving_broker/proving_job_controller.ts | 25 +-- .../prover-client/src/proving_broker/rpc.ts | 1 - .../prover-client/src/test/mock_prover.ts | 9 +- yarn-project/prover-node/src/factory.ts | 6 - .../src/prover-cache/cache_manager.ts | 69 -------- .../prover-node/src/prover-cache/kv_cache.ts | 27 --- .../prover-node/src/prover-node.test.ts | 4 - yarn-project/prover-node/src/prover-node.ts | 14 +- 22 files changed, 365 insertions(+), 525 deletions(-) create mode 100644 yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts rename yarn-project/prover-client/src/proving_broker/{caching_broker_facade.ts => broker_prover_facade.ts} (74%) delete mode 100644 yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts delete mode 100644 yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts delete mode 100644 yarn-project/prover-node/src/prover-cache/cache_manager.ts delete mode 100644 yarn-project/prover-node/src/prover-cache/kv_cache.ts diff --git a/yarn-project/circuit-types/src/interfaces/prover-broker.ts b/yarn-project/circuit-types/src/interfaces/prover-broker.ts index fb2fdadef678..546b26aa6725 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-broker.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-broker.ts @@ -61,12 +61,6 @@ export interface ProvingJobProducer { */ cancelProvingJob(id: ProvingJobId): Promise; - /** - * Cleans up after a job has completed. Throws if the job is in-progress - * @param id - The ID of the job to cancel - */ - cleanUpProvingJobState(id: ProvingJobId): Promise; - /** * Returns the current status fof the proving job * @param id - The ID of the job to get the status of diff --git a/yarn-project/circuit-types/src/interfaces/prover-client.ts b/yarn-project/circuit-types/src/interfaces/prover-client.ts index 5dd392d95c6f..384bf8331ec7 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-client.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-client.ts @@ -7,7 +7,6 @@ import { z } from 'zod'; import { type TxHash } from '../tx/tx_hash.js'; import { type EpochProver } from './epoch-prover.js'; import { type ProvingJobConsumer } from './prover-broker.js'; -import { type ProvingJobStatus } from './proving-job.js'; export type ActualProverConfig = { /** Whether to construct real proofs */ @@ -24,9 +23,6 @@ export type ProverConfig = ActualProverConfig & { nodeUrl?: string; /** Identifier of the prover */ proverId: Fr; - /** Where to store temporary data */ - cacheDir?: string; - proverAgentCount: number; }; @@ -35,7 +31,6 @@ export const ProverConfigSchema = z.object({ realProofs: z.boolean(), proverId: schemas.Fr, proverTestDelayMs: z.number(), - cacheDir: z.string().optional(), proverAgentCount: z.number(), }) satisfies ZodFor; @@ -60,11 +55,6 @@ export const proverConfigMappings: ConfigMappingsType = { description: 'Artificial delay to introduce to all operations to the test prover.', ...numberConfigHelper(0), }, - cacheDir: { - env: 'PROVER_CACHE_DIR', - description: 'Where to store cache data generated while proving', - defaultValue: '/tmp/aztec-prover', - }, proverAgentCount: { env: 'PROVER_AGENT_COUNT', description: 'The number of prover agents to start', @@ -76,35 +66,12 @@ function parseProverId(str: string) { return Fr.fromHexString(str.startsWith('0x') ? str : Buffer.from(str, 'utf8').toString('hex')); } -/** - * A database where the proving orchestrator can store intermediate results - */ -export interface ProverCache { - /** - * Saves the status of a proving job - * @param jobId - The job ID - * @param status - The status of the proof - */ - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise; - - /** - * Retrieves the status of a proving job (if known) - * @param jobId - The job ID - */ - getProvingJobStatus(jobId: string): Promise; - - /** - * Closes the cache - */ - close(): Promise; -} - /** * The interface to the prover client. * Provides the ability to generate proofs and build rollups. */ export interface EpochProverManager { - createEpochProver(cache?: ProverCache): EpochProver; + createEpochProver(): EpochProver; start(): Promise; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index ea13f2f97118..b1d0c8ce21ff 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -125,7 +125,6 @@ export type EnvVar = | 'PROVER_REAL_PROOFS' | 'PROVER_REQUIRED_CONFIRMATIONS' | 'PROVER_TEST_DELAY_MS' - | 'PROVER_CACHE_DIR' | 'PXE_L2_STARTING_BLOCK' | 'PXE_PROVER_ENABLED' | 'QUOTE_PROVIDER_BASIS_POINT_FEE' diff --git a/yarn-project/foundation/src/string/index.ts b/yarn-project/foundation/src/string/index.ts index 1b85173fc1a3..c6d0d4d8fc9a 100644 --- a/yarn-project/foundation/src/string/index.ts +++ b/yarn-project/foundation/src/string/index.ts @@ -25,3 +25,7 @@ export function pluralize(str: string, count: number | bigint, plural?: string): export function count(count: number | bigint, str: string, plural?: string): string { return `${count} ${pluralize(str, count, plural)}`; } + +export function truncate(str: string, length: number = 64): string { + return str.length > length ? str.slice(0, length) + '...' : str; +} diff --git a/yarn-project/prover-client/src/index.ts b/yarn-project/prover-client/src/index.ts index 822b565f54a6..60feee5fa195 100644 --- a/yarn-project/prover-client/src/index.ts +++ b/yarn-project/prover-client/src/index.ts @@ -2,4 +2,3 @@ export { EpochProverManager } from '@aztec/circuit-types'; export * from './prover-client/index.js'; export * from './config.js'; -export * from './proving_broker/prover_cache/memory.js'; diff --git a/yarn-project/prover-client/src/prover-client/prover-client.ts b/yarn-project/prover-client/src/prover-client/prover-client.ts index b5d81a349059..115b2a9f7846 100644 --- a/yarn-project/prover-client/src/prover-client/prover-client.ts +++ b/yarn-project/prover-client/src/prover-client/prover-client.ts @@ -4,7 +4,6 @@ import { type EpochProver, type EpochProverManager, type ForkMerkleTreeOperations, - type ProverCache, type ProvingJobBroker, type ProvingJobConsumer, type ProvingJobProducer, @@ -16,13 +15,10 @@ import { createLogger } from '@aztec/foundation/log'; import { NativeACVMSimulator } from '@aztec/simulator'; import { type TelemetryClient } from '@aztec/telemetry-client'; -import { join } from 'path'; - import { type ProverClientConfig } from '../config.js'; import { ProvingOrchestrator } from '../orchestrator/orchestrator.js'; -import { CachingBrokerFacade } from '../proving_broker/caching_broker_facade.js'; +import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; import { InlineProofStore } from '../proving_broker/proof_store.js'; -import { InMemoryProverCache } from '../proving_broker/prover_cache/memory.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; /** Manages proving of epochs by orchestrating the proving of individual blocks relying on a pool of prover agents. */ @@ -30,8 +26,6 @@ export class ProverClient implements EpochProverManager { private running = false; private agents: ProvingAgent[] = []; - private cacheDir?: string; - private constructor( private config: ProverClientConfig, private worldState: ForkMerkleTreeOperations, @@ -42,13 +36,12 @@ export class ProverClient implements EpochProverManager { ) { // TODO(palla/prover-node): Cache the paddingTx here, and not in each proving orchestrator, // so it can be reused across multiple ones and not recomputed every time. - this.cacheDir = this.config.cacheDir ? join(this.config.cacheDir, `tx_prover_${this.config.proverId}`) : undefined; } - public createEpochProver(cache: ProverCache = new InMemoryProverCache()): EpochProver { + public createEpochProver(): EpochProver { return new ProvingOrchestrator( this.worldState, - new CachingBrokerFacade(this.orchestratorClient, cache), + new BrokerCircuitProverFacade(this.orchestratorClient), this.telemetry, this.config.proverId, ); diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts new file mode 100644 index 000000000000..31aa41e8f99b --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts @@ -0,0 +1,156 @@ +import { TestCircuitProver } from '@aztec/bb-prover'; +import { ProvingJobProducer, makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; +import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; +import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; +import { AbortError } from '@aztec/foundation/error'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { sleep } from '@aztec/foundation/sleep'; + +import { jest } from '@jest/globals'; +import { MockProxy } from 'jest-mock-extended'; +import { mock } from 'jest-mock-extended'; + +import { MockProver, TestBroker } from '../test/mock_prover.js'; +import { BrokerCircuitProverFacade } from './broker_prover_facade.js'; +import { InlineProofStore } from './proof_store.js'; + +describe('BrokerCircuitProverFacade', () => { + let facade: BrokerCircuitProverFacade; + let proofStore: InlineProofStore; + let broker: TestBroker; + let prover: MockProver; + let agentPollInterval: number; + + beforeEach(async () => { + proofStore = new InlineProofStore(); + prover = new MockProver(); + agentPollInterval = 100; + broker = new TestBroker(2, prover, proofStore, agentPollInterval); + facade = new BrokerCircuitProverFacade(broker, proofStore); + + await broker.start(); + }); + + it('sends jobs to the broker', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof'); + + await expect(facade.getBaseParityProof(inputs, controller.signal, 42)).resolves.toBeDefined(); + + expect(broker.enqueueProvingJob).toHaveBeenCalled(); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + }); + + it('handles multiple calls for the same job', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + const promises: Promise[] = []; + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + // send N identical proof requests + const CALLS = 50; + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); + } + + await sleep(agentPollInterval); + // the broker should have received all of them + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); + + // but really, it should have only enqueued just one + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + + // now we have 50 promises all waiting on the same result + // resolve the proof + const result = makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFakeHonk(), + ); + resultPromise.resolve(result); + + // enqueue another N requests for the same jobs + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); + } + + await sleep(agentPollInterval); + // the broker will have received the new requests + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); + // but no new jobs where created + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + + // and all 2 * N requests will have been resolved with the same result + for (const promise of promises) { + await expect(promise).resolves.toEqual(result); + } + }); + + it('handles proof errors', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + const promises: Promise[] = []; + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + // send N identical proof requests + const CALLS = 50; + for (let i = 0; i < CALLS; i++) { + // wrap the error in a resolved promises so that we don't have unhandled rejections + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err }))); + } + + await sleep(agentPollInterval); + // the broker should have received all of them + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); + + // but really, it should have only enqueued just one + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + + resultPromise.reject(new Error('TEST ERROR')); + + // enqueue another N requests for the same jobs + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err }))); + } + + await sleep(agentPollInterval); + // the broker will have received the new requests + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); + // but no new jobs where created + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + + // and all 2 * N requests will have been resolved with the same result + for (const promise of promises) { + await expect(promise).resolves.toEqual({ err: new Error('TEST ERROR') }); + } + }); + + it('handles aborts', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + const promise = facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err })); + + await sleep(agentPollInterval); + expect(prover.getBaseParityProof).toHaveBeenCalled(); + + controller.abort(); + + await expect(promise).resolves.toEqual({ err: new Error('Aborted') }); + }); +}); diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts similarity index 74% rename from yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts rename to yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index 82c59e7ea0fa..74d290769ca3 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -1,6 +1,5 @@ import { type ProofAndVerificationKey, - type ProverCache, type ProvingJobId, type ProvingJobInputsMap, type ProvingJobProducer, @@ -35,9 +34,9 @@ import { import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { retryUntil } from '@aztec/foundation/retry'; +import { truncate } from '@aztec/foundation/string'; import { InlineProofStore, type ProofStore } from './proof_store.js'; -import { InMemoryProverCache } from './prover_cache/memory.js'; // 20 minutes, roughly the length of an Aztec epoch. If a proof isn't ready in this amount of time then we've failed to prove the whole epoch const MAX_WAIT_MS = 1_200_000; @@ -45,10 +44,9 @@ const MAX_WAIT_MS = 1_200_000; /** * A facade around a job broker that generates stable job ids and caches results */ -export class CachingBrokerFacade implements ServerCircuitProver { +export class BrokerCircuitProverFacade implements ServerCircuitProver { constructor( private broker: ProvingJobProducer, - private cache: ProverCache = new InMemoryProverCache(), private proofStore: ProofStore = new InlineProofStore(), private waitTimeoutMs = MAX_WAIT_MS, private pollIntervalMs = 1000, @@ -62,49 +60,23 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber = 0, signal?: AbortSignal, ): Promise { - // first try the cache - let jobEnqueued = false; - try { - const cachedResult = await this.cache.getProvingJobStatus(id); - if (cachedResult.status !== 'not-found') { - this.log.debug(`Found cached result for job=${id}: status=${cachedResult.status}`); - } - - if (cachedResult.status === 'fulfilled') { - const output = await this.proofStore.getProofOutput(cachedResult.value); - if (output.type === type) { - return output.result as ProvingJobResultsMap[T]; - } else { - this.log.warn(`Cached result type mismatch for job=${id}. Expected=${type} but got=${output.type}`); - } - } else if (cachedResult.status === 'rejected') { - // prefer returning a rejected promises so that we don't trigger the catch block below - return Promise.reject(new Error(cachedResult.reason)); - } else if (cachedResult.status === 'in-progress' || cachedResult.status === 'in-queue') { - jobEnqueued = true; - } else { - jobEnqueued = false; - } - } catch (err) { - this.log.warn(`Failed to get cached proving job id=${id}: ${err}. Re-running job`); - } + const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); + await this.broker.enqueueProvingJob({ + id, + type, + inputsUri, + epochNumber, + }); - if (!jobEnqueued) { - try { - const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); - await this.broker.enqueueProvingJob({ - id, - type, - inputsUri, - epochNumber, - }); - await this.cache.setProvingJobStatus(id, { status: 'in-queue' }); - } catch (err) { - this.log.error(`Failed to enqueue proving job id=${id}: ${err}`); - await this.cache.setProvingJobStatus(id, { status: 'not-found' }); - throw err; - } - } + this.log.verbose( + `Sent proving job to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + inputsUri: truncate(inputsUri), + }, + ); // notify broker of cancelled job const abortFn = async () => { @@ -131,11 +103,16 @@ export class CachingBrokerFacade implements ServerCircuitProver { this.pollIntervalMs / 1000, ); - try { - await this.cache.setProvingJobStatus(id, result); - } catch (err) { - this.log.warn(`Failed to cache proving job id=${id} resultStatus=${result.status}: ${err}`); - } + this.log.verbose( + `Got result from proving broker id=${id} type=${ProvingRequestType[type]} epoch=${epochNumber} status=${result.status}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + status: result.status, + ...(result.status === 'fulfilled' ? { outputsUri: truncate(result.value) } : { reason: result.reason }), + }, + ); if (result.status === 'fulfilled') { const output = await this.proofStore.getProofOutput(result.value); @@ -149,8 +126,6 @@ export class CachingBrokerFacade implements ServerCircuitProver { } } finally { signal?.removeEventListener('abort', abortFn); - // we've saved the result in our cache. We can tell the broker to clear its state - await this.broker.cleanUpProvingJobState(id); } } @@ -160,7 +135,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PUBLIC_VM, inputs), + this.generateId(ProvingRequestType.PUBLIC_VM, inputs, epochNumber), ProvingRequestType.PUBLIC_VM, inputs, epochNumber, @@ -174,7 +149,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BASE_PARITY, inputs), + this.generateId(ProvingRequestType.BASE_PARITY, inputs, epochNumber), ProvingRequestType.BASE_PARITY, inputs, epochNumber, @@ -188,7 +163,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input), + this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_MERGE_ROLLUP, input, epochNumber, @@ -202,7 +177,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_ROOT_ROLLUP, input, epochNumber, @@ -216,7 +191,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, epochNumber, @@ -230,7 +205,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs), + this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, epochNumber), ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, epochNumber, @@ -244,7 +219,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.MERGE_ROLLUP, input), + this.generateId(ProvingRequestType.MERGE_ROLLUP, input, epochNumber), ProvingRequestType.MERGE_ROLLUP, input, epochNumber, @@ -257,7 +232,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput), + this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, epochNumber), ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, epochNumber, @@ -271,7 +246,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs), + this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, epochNumber), ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, epochNumber, @@ -285,7 +260,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.ROOT_PARITY, inputs), + this.generateId(ProvingRequestType.ROOT_PARITY, inputs, epochNumber), ProvingRequestType.ROOT_PARITY, inputs, epochNumber, @@ -299,7 +274,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.ROOT_ROLLUP, input, epochNumber), ProvingRequestType.ROOT_ROLLUP, input, epochNumber, @@ -313,7 +288,7 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput), + this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput, epochNumber), ProvingRequestType.TUBE_PROOF, tubeInput, epochNumber, @@ -321,8 +296,8 @@ export class CachingBrokerFacade implements ServerCircuitProver { ); } - private generateId(type: ProvingRequestType, inputs: { toBuffer(): Buffer }) { + private generateId(type: ProvingRequestType, inputs: { toBuffer(): Buffer }, epochNumber = 0) { const inputsHash = sha256(inputs.toBuffer()); - return `${ProvingRequestType[type]}:${inputsHash.toString('hex')}`; + return `${epochNumber}:${ProvingRequestType[type]}:${inputsHash.toString('hex')}`; } } diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts deleted file mode 100644 index a72918ade09d..000000000000 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { type ProvingJobProducer, ProvingRequestType, makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; -import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; -import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; -import { promiseWithResolvers } from '@aztec/foundation/promise'; - -import { jest } from '@jest/globals'; -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { CachingBrokerFacade } from './caching_broker_facade.js'; -import { InlineProofStore } from './proof_store.js'; -import { InMemoryProverCache } from './prover_cache/memory.js'; - -describe('CachingBrokerFacade', () => { - let facade: CachingBrokerFacade; - let cache: InMemoryProverCache; - let proofStore: InlineProofStore; - let broker: MockProxy; - - beforeAll(() => { - jest.useFakeTimers(); - }); - - beforeEach(() => { - broker = mock({ - enqueueProvingJob: jest.fn(), - getProvingJobStatus: jest.fn(), - cancelProvingJob: jest.fn(), - cleanUpProvingJobState: jest.fn(), - waitForJobToSettle: jest.fn(), - }); - cache = new InMemoryProverCache(); - proofStore = new InlineProofStore(); - facade = new CachingBrokerFacade(broker, cache, proofStore); - }); - - it('marks job as in progress', async () => { - const controller = new AbortController(); - void facade.getBaseParityProof(makeBaseParityInputs(), controller.signal); - - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.enqueueProvingJob).toHaveBeenCalled(); - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'in-queue' }); - controller.abort(); - }); - - it('removes the cached value if a job fails to enqueue', async () => { - const { promise, reject } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(promise); - - void facade.getBaseParityProof(makeBaseParityInputs()).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - reject(new Error('Failed to enqueue job')); - - await jest.advanceTimersToNextTimerAsync(); - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'not-found' }); - }); - - it('awaits existing job if in progress', async () => { - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - - void facade.getBaseParityProof(inputs).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - }); - - it('reuses already cached results', async () => { - const { promise, resolve } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs); - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - const result = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - - const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); - resolve({ - status: 'fulfilled', - value: outputUri, - }); - - await jest.advanceTimersToNextTimerAsync(); - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'fulfilled', value: outputUri }); - - await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once - }); - - it('clears broker state after a job resolves', async () => { - const { promise, resolve } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs); - await jest.advanceTimersToNextTimerAsync(); - - const job = broker.enqueueProvingJob.mock.calls[0][0]; - const result = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); - resolve({ - status: 'fulfilled', - value: outputUri, - }); - - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); - }); - - it('clears broker state after a job is canceled', async () => { - const { promise, resolve } = promiseWithResolvers(); - const catchSpy = jest.fn(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - const controller = new AbortController(); - void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy); - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.cancelProvingJob).not.toHaveBeenCalled(); - controller.abort(); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cancelProvingJob).toHaveBeenCalled(); - - resolve({ - status: 'rejected', - reason: 'Aborted', - }); - - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); - expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted')); - }); -}); diff --git a/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts b/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts deleted file mode 100644 index b4da076cbcb9..000000000000 --- a/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { ProverCache, ProvingJobStatus } from '@aztec/circuit-types'; - -export class InMemoryProverCache implements ProverCache { - private proofs: Record = {}; - - constructor() {} - - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise { - this.proofs[jobId] = status; - return Promise.resolve(); - } - - getProvingJobStatus(jobId: string): Promise { - return Promise.resolve(this.proofs[jobId] ?? { status: 'not-found' }); - } - - close(): Promise { - return Promise.resolve(); - } -} diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.ts index 4d6ad75165a5..3c099c8f1a56 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.ts @@ -10,6 +10,7 @@ import { } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { truncate } from '@aztec/foundation/string'; import { Timer } from '@aztec/foundation/timer'; import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; @@ -107,6 +108,7 @@ export class ProvingAgent implements Traceable { this.currentJobController = new ProvingJobController( job.id, inputs, + job.epochNumber, time, this.circuitProver, this.handleJobResult, @@ -114,13 +116,13 @@ export class ProvingAgent implements Traceable { if (abortedProofJobId) { this.log.info( - `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncate( job.inputsUri, )}`, ); } else { this.log.info( - `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncate( job.inputsUri, )}`, ); @@ -147,14 +149,8 @@ export class ProvingAgent implements Traceable { return this.broker.reportProvingJobError(jobId, err.message, retry); } else if (result) { const outputUri = await this.proofStore.saveProofOutput(jobId, type, result); - this.log.info( - `Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncateString(outputUri)}`, - ); + this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncate(outputUri)}`); return this.broker.reportProvingJobSuccess(jobId, outputUri); } }; } - -function truncateString(str: string, length: number = 64): string { - return str.length > length ? str.slice(0, length) + '...' : str; -} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 454e840543f4..42ce07819f21 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,4 +1,10 @@ -import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types'; +import { + type ProofUri, + type ProvingJob, + type ProvingJobId, + ProvingJobStatus, + ProvingRequestType, +} from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; import { sleep } from '@aztec/foundation/sleep'; import { openTmpStore } from '@aztec/kv-store/lmdb'; @@ -639,10 +645,8 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await sleep(jobTimeoutMs + brokerIntervalMs); - - // should be back in the queue now - await assertJobStatus(id, 'in-queue'); + await sleep(jobTimeoutMs); + await assertJobTransition(id, 'in-progress', 'in-queue'); }); it('cancel stale jobs that time out', async () => { @@ -659,10 +663,10 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await sleep(jobTimeoutMs + brokerIntervalMs); + await sleep(jobTimeoutMs); // should be back in the queue now - await assertJobStatus(id, 'in-queue'); + await assertJobTransition(id, 'in-progress', 'in-queue'); // another agent picks it up await getAndAssertNextJobId(id); @@ -926,48 +930,6 @@ describe.each([ await getAndAssertNextJobId(id2); }); - it('clears job state when job is removed', async () => { - const id1 = makeProvingJobId(); - - await database.addProvingJob({ - id: id1, - type: ProvingRequestType.BASE_PARITY, - epochNumber: 1, - inputsUri: makeInputsUri(), - }); - await database.setProvingJobResult(id1, makeOutputsUri()); - - const id2 = makeProvingJobId(); - await database.addProvingJob({ - id: id2, - type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - epochNumber: 2, - inputsUri: makeInputsUri(), - }); - - await broker.start(); - - await assertJobStatus(id1, 'fulfilled'); - await assertJobStatus(id2, 'in-queue'); - - jest.spyOn(database, 'deleteProvingJobAndResult'); - - await broker.cleanUpProvingJobState(id1); - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(1, id1); - - await broker.cancelProvingJob(id2); - await broker.cleanUpProvingJobState(id2); - await sleep(brokerIntervalMs); - - expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(2, id2); - - await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' }); - await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' }); - await assertJobStatus(id1, 'not-found'); - await assertJobStatus(id2, 'not-found'); - }); - it('saves job when enqueued', async () => { await broker.start(); const job: ProvingJob = { @@ -1017,7 +979,7 @@ describe.each([ expect(database.setProvingJobResult).toHaveBeenCalledWith(job.id, expect.any(String)); }); - it('does not retain job result if database fails to save', async () => { + it('saves result even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobResult').mockRejectedValue(new Error('db error')); const id = makeProvingJobId(); @@ -1028,7 +990,7 @@ describe.each([ inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobSuccess(id, makeOutputsUri())).rejects.toThrow(new Error('db error')); - await assertJobStatus(id, 'in-queue'); + await assertJobStatus(id, 'fulfilled'); }); it('saves job error', async () => { @@ -1050,7 +1012,7 @@ describe.each([ expect(database.setProvingJobError).toHaveBeenCalledWith(id, error); }); - it('does not retain job error if database fails to save', async () => { + it('saves job error even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobError').mockRejectedValue(new Error('db error')); const id = makeProvingJobId(); @@ -1061,7 +1023,7 @@ describe.each([ inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobError(id, 'test error')).rejects.toThrow(new Error('db error')); - await assertJobStatus(id, 'in-queue'); + await assertJobStatus(id, 'rejected'); }); it('does not save job result if job is unknown', async () => { @@ -1167,10 +1129,30 @@ describe.each([ }); }); - async function assertJobStatus(id: ProvingJobId, status: string) { + async function assertJobStatus(id: ProvingJobId, status: ProvingJobStatus['status']) { await expect(broker.getProvingJobStatus(id)).resolves.toEqual(expect.objectContaining({ status })); } + async function assertJobTransition( + id: ProvingJobId, + currentStatus: ProvingJobStatus['status'], + expectedStatus: ProvingJobStatus['status'], + timeoutMs = 5000, + interval = brokerIntervalMs / 4, + ): Promise { + let status; + const timeout = now() + timeoutMs; + while (now() < timeout) { + ({ status } = await broker.getProvingJobStatus(id)); + if (status !== currentStatus) { + break; + } + await sleep(interval); + } + + expect(status).toEqual(expectedStatus); + } + async function getAndAssertNextJobId(id: ProvingJobId, ...allowList: ProvingRequestType[]) { await expect(broker.getProvingJob({ allowList })).resolves.toEqual( expect.objectContaining({ job: expect.objectContaining({ id }) }), diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index b23b05a55734..09d75f8ea1e8 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -3,7 +3,7 @@ import { type ProvingJob, type ProvingJobConsumer, type ProvingJobFilter, - type ProvingJobId, + ProvingJobId, type ProvingJobProducer, type ProvingJobSettledResult, type ProvingJobStatus, @@ -143,7 +143,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr public start(): Promise { for (const [item, result] of this.database.allProvingJobs()) { - this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`); + this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`, { + provingJobId: item.id, + status: result ? result.status : 'pending', + }); this.jobsCache.set(item.id, item); this.promises.set(item.id, promiseWithResolvers()); @@ -152,7 +155,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr this.promises.get(item.id)!.resolve(result); this.resultsCache.set(item.id, result); } else { - this.logger.debug(`Re-enqueuing proving job id=${item.id}`); this.enqueueJobInternal(item); } } @@ -176,34 +178,53 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return; } - await this.database.addProvingJob(job); - this.jobsCache.set(job.id, job); - this.enqueueJobInternal(job); + this.logger.verbose(`New proving job id=${job.id} epochNumber=${job.epochNumber}`, { provingJobId: job.id }); + try { + // do this first so it acts as a "lock". If this job is enqueued again while we're saving it the if at the top will catch it. + this.jobsCache.set(job.id, job); + await this.database.addProvingJob(job); + this.enqueueJobInternal(job); + } catch (err) { + this.logger.error(`Failed to save proving job id=${job.id}: ${err}`, err, { provingJobId: job.id }); + this.jobsCache.delete(job.id); + throw err; + } } public waitForJobToSettle(id: ProvingJobId): Promise { const promiseWithResolvers = this.promises.get(id); if (!promiseWithResolvers) { + this.logger.warn(`Job id=${id} not found`, { provingJobId: id }); return Promise.resolve({ status: 'rejected', reason: `Job ${id} not found` }); } return promiseWithResolvers.promise; } public async cancelProvingJob(id: ProvingJobId): Promise { + if (!this.jobsCache.has(id)) { + this.logger.warn(`Can't cancel a job that doesn't exist id=${id}`, { provingJobId: id }); + return; + } + // notify listeners of the cancellation if (!this.resultsCache.has(id)) { - this.logger.info(`Cancelling job id=${id}`); + this.logger.info(`Cancelling job id=${id}`, { provingJobId: id }); await this.reportProvingJobError(id, 'Aborted', false); } } - public async cleanUpProvingJobState(id: ProvingJobId): Promise { + private async cleanUpProvingJobState(id: ProvingJobId): Promise { + if (!this.jobsCache.has(id)) { + this.logger.warn(`Can't clean up a job that doesn't exist id=${id}`, { provingJobId: id }); + return; + } + if (!this.resultsCache.has(id)) { - this.logger.warn(`Can't cleanup busy proving job: id=${id}`); + this.logger.warn(`Can't cleanup busy proving job: id=${id}`, { provingJobId: id }); return; } - this.logger.debug(`Cleaning up state for job id=${id}`); + this.logger.debug(`Cleaning up state for job id=${id}`, { provingJobId: id }); await this.database.deleteProvingJobAndResult(id); this.jobsCache.delete(id); this.promises.delete(id); @@ -221,7 +242,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const item = this.jobsCache.get(id); if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Proving job id=${id} not found`, { provingJobId: id }); return Promise.resolve({ status: 'not-found' }); } @@ -274,45 +295,68 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const retries = this.retries.get(id) ?? 0; if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Can't set error on unknown proving job id=${id} err=${err}`, { provingJoId: id }); return; } if (!info) { - this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { + provingJobId: id, + }); } else { this.inProgress.delete(id); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} already is already settled, ignoring error`); + this.logger.warn(`Proving job id=${id} is already settled, ignoring err=${err}`, { + provingJobId: id, + }); return; } if (retry && retries + 1 < this.maxRetries && !this.isJobStale(item)) { - this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); + this.logger.info( + `Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1} err=${err}`, + { + provingJobId: id, + }, + ); this.retries.set(id, retries + 1); this.enqueueJobInternal(item); this.instrumentation.incRetriedJobs(item.type); return; } - this.logger.warn( + this.logger.info( `Marking proving job as failed id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${ retries + 1 } err=${err}`, + { + provingJobId: id, + }, ); - await this.database.setProvingJobError(id, err); - + // save the result to the cache and notify clients of the job status + // this should work even if our database breaks because the result is cached in memory const result: ProvingJobSettledResult = { status: 'rejected', reason: String(err) }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incRejectedJobs(item.type); if (info) { const duration = this.msTimeSource() - info.startedAt; this.instrumentation.recordJobDuration(item.type, duration); } + + try { + await this.database.setProvingJobError(id, err); + } catch (saveErr) { + this.logger.error(`Failed to save proving job error status id=${id} jobErr=${err}`, saveErr, { + provingJobId: id, + }); + + throw saveErr; + } } reportProvingJobProgress( @@ -322,12 +366,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr ): Promise<{ job: ProvingJob; time: number } | undefined> { const job = this.jobsCache.get(id); if (!job) { - this.logger.warn(`Proving job id=${id} does not exist`); + this.logger.warn(`Proving job id=${id} does not exist`, { provingJobId: id }); return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} has already been completed`); + this.logger.warn(`Proving job id=${id} has already been completed`, { provingJobId: id }); return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } @@ -336,6 +380,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr if (!metadata) { this.logger.warn( `Proving job id=${id} type=${ProvingRequestType[job.type]} not found in the in-progress cache, adding it`, + { provingJobId: id }, ); // the queue will still contain the item at this point! // we need to be careful when popping off the queue to make sure we're not sending @@ -348,11 +393,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return Promise.resolve(undefined); } else if (startedAt <= metadata.startedAt) { if (startedAt < metadata.startedAt) { - this.logger.debug( + this.logger.info( `Proving job id=${id} type=${ProvingRequestType[job.type]} startedAt=${startedAt} older agent has taken job`, + { provingJobId: id }, ); } else { - this.logger.debug(`Proving job id=${id} type=${ProvingRequestType[job.type]} heartbeat`); + this.logger.debug(`Proving job id=${id} type=${ProvingRequestType[job.type]} heartbeat`, { provingJobId: id }); } metadata.startedAt = startedAt; metadata.lastUpdatedAt = now; @@ -362,6 +408,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr `Proving job id=${id} type=${ ProvingRequestType[job.type] } already being worked on by another agent. Sending new one`, + { provingJobId: id }, ); return this.getProvingJob(filter); } else { @@ -374,31 +421,50 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const item = this.jobsCache.get(id); const retries = this.retries.get(id) ?? 0; if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Proving job id=${id} not found`, { provingJobId: id }); return; } if (!info) { - this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { + provingJobId: id, + }); } else { this.inProgress.delete(id); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} already settled, ignoring result`); + this.logger.warn(`Proving job id=${id} already settled, ignoring result`, { provingJobId: id }); return; } - this.logger.debug( + this.logger.info( `Proving job complete id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1}`, + { provingJobId: id }, ); - await this.database.setProvingJobResult(id, value); - + // save result to our local cache and notify clients + // if save to database fails, that's ok because we have the result in memory + // if the broker crashes and needs the result again, we're covered because we can just recompute it const result: ProvingJobSettledResult = { status: 'fulfilled', value }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incResolvedJobs(item.type); + if (info) { + const duration = this.msTimeSource() - info.startedAt; + this.instrumentation.recordJobDuration(item.type, duration); + } + + try { + await this.database.setProvingJobResult(id, value); + } catch (saveErr) { + this.logger.error(`Failed to save proving job result id=${id}`, saveErr, { + provingJobId: id, + }); + + throw saveErr; + } } @trackSpan('ProvingBroker.cleanupPass') @@ -419,7 +485,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } if (jobsToClean.length > 0) { - this.logger.info(`Cleaning up [${jobsToClean.join(',')}]`); + this.logger.info(`Cleaning up jobs=${jobsToClean.length}`); await asyncPool(this.maxParallelCleanUps, jobsToClean, async jobId => { await this.cleanUpProvingJobState(jobId); }); @@ -431,7 +497,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); if (!item) { - this.logger.warn(`Proving job id=${id} not found. Removing it from the queue.`); + this.logger.warn(`Proving job id=${id} not found. Removing it from the queue.`, { provingJobId: id }); this.inProgress.delete(id); continue; } @@ -443,7 +509,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr // the job has timed out and it's also old, just cancel and move on await this.cancelProvingJob(item.id); } else { - this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`, { provingJobId: id }); this.inProgress.delete(id); this.enqueueJobInternal(item); this.instrumentation.incTimedOutJobs(item.type); @@ -462,7 +528,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr }); this.enqueuedAt.set(job.id, new Timer()); this.epochHeight = Math.max(this.epochHeight, job.epochNumber); - this.logger.debug(`Enqueued new proving job id=${job.id}`); } private isJobStale(job: ProvingJob) { diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts index 364703b23cf7..1c5fc3c02235 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts @@ -23,6 +23,7 @@ describe('ProvingJobController', () => { type: ProvingRequestType.BASE_PARITY, inputs: makeBaseParityInputs(), }, + 42, 0, prover, onComplete, diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts index 2ce47cbe6f74..0b08e0ad69cd 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts @@ -30,6 +30,7 @@ export class ProvingJobController { constructor( private jobId: ProvingJobId, private inputs: ProvingJobInputs, + private epochNumber: number, private startedAt: number, private circuitProver: ServerCircuitProver, private onComplete: ProvingJobCompletionCallback, @@ -100,51 +101,51 @@ export class ProvingJobController { const signal = this.abortController.signal; switch (type) { case ProvingRequestType.PUBLIC_VM: { - return await this.circuitProver.getAvmProof(inputs, signal); + return await this.circuitProver.getAvmProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PRIVATE_BASE_ROLLUP: { - return await this.circuitProver.getPrivateBaseRollupProof(inputs, signal); + return await this.circuitProver.getPrivateBaseRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PUBLIC_BASE_ROLLUP: { - return await this.circuitProver.getPublicBaseRollupProof(inputs, signal); + return await this.circuitProver.getPublicBaseRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.MERGE_ROLLUP: { - return await this.circuitProver.getMergeRollupProof(inputs, signal); + return await this.circuitProver.getMergeRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP: { - return await this.circuitProver.getEmptyBlockRootRollupProof(inputs, signal); + return await this.circuitProver.getEmptyBlockRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BLOCK_ROOT_ROLLUP: { - return await this.circuitProver.getBlockRootRollupProof(inputs, signal); + return await this.circuitProver.getBlockRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BLOCK_MERGE_ROLLUP: { - return await this.circuitProver.getBlockMergeRollupProof(inputs, signal); + return await this.circuitProver.getBlockMergeRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.ROOT_ROLLUP: { - return await this.circuitProver.getRootRollupProof(inputs, signal); + return await this.circuitProver.getRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BASE_PARITY: { - return await this.circuitProver.getBaseParityProof(inputs, signal); + return await this.circuitProver.getBaseParityProof(inputs, signal, this.epochNumber); } case ProvingRequestType.ROOT_PARITY: { - return await this.circuitProver.getRootParityProof(inputs, signal); + return await this.circuitProver.getRootParityProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PRIVATE_KERNEL_EMPTY: { - return await this.circuitProver.getEmptyPrivateKernelProof(inputs, signal); + return await this.circuitProver.getEmptyPrivateKernelProof(inputs, signal, this.epochNumber); } case ProvingRequestType.TUBE_PROOF: { - return await this.circuitProver.getTubeProof(inputs, signal); + return await this.circuitProver.getTubeProof(inputs, signal, this.epochNumber); } default: { diff --git a/yarn-project/prover-client/src/proving_broker/rpc.ts b/yarn-project/prover-client/src/proving_broker/rpc.ts index 0d63dcebeacb..3dec6bedbdb4 100644 --- a/yarn-project/prover-client/src/proving_broker/rpc.ts +++ b/yarn-project/prover-client/src/proving_broker/rpc.ts @@ -28,7 +28,6 @@ const GetProvingJobResponse = z.object({ export const ProvingJobProducerSchema: ApiSchemaFor = { enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()), getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus), - cleanUpProvingJobState: z.function().args(ProvingJobId).returns(z.void()), cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult), }; diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index a4ab77fc24dd..baf581d8f8ee 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -58,8 +58,12 @@ export class TestBroker implements ProvingJobProducer { agentCount: number, prover: ServerCircuitProver, private proofStore: ProofStore = new InlineProofStore(), + agentPollInterval = 100, ) { - this.agents = times(agentCount, () => new ProvingAgent(this.broker, proofStore, prover, new NoopTelemetryClient())); + this.agents = times( + agentCount, + () => new ProvingAgent(this.broker, proofStore, prover, new NoopTelemetryClient(), undefined, agentPollInterval), + ); } public async start() { @@ -82,9 +86,6 @@ export class TestBroker implements ProvingJobProducer { getProvingJobStatus(id: ProvingJobId): Promise { return this.broker.getProvingJobStatus(id); } - cleanUpProvingJobState(id: ProvingJobId): Promise { - return this.broker.cleanUpProvingJobState(id); - } cancelProvingJob(id: string): Promise { return this.broker.cancelProvingJob(id); } diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 7a4ec700721a..c1a5f27026ae 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -12,14 +12,12 @@ import { type TelemetryClient } from '@aztec/telemetry-client'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { createWorldStateSynchronizer } from '@aztec/world-state'; -import { join } from 'path'; import { createPublicClient, getAddress, getContract, http } from 'viem'; import { createBondManager } from './bond/factory.js'; import { type ProverNodeConfig, type QuoteProviderConfig } from './config.js'; import { ClaimsMonitor } from './monitors/claims-monitor.js'; import { EpochMonitor } from './monitors/epoch-monitor.js'; -import { ProverCacheManager } from './prover-cache/cache_manager.js'; import { createProverCoordination } from './prover-coordination/factory.js'; import { ProverNode, type ProverNodeOptions } from './prover-node.js'; import { HttpQuoteProvider } from './quote-provider/http.js'; @@ -78,9 +76,6 @@ export async function createProverNode( const walletClient = publisher.getClient(); const bondManager = await createBondManager(rollupContract, walletClient, config); - const cacheDir = config.cacheDir ? join(config.cacheDir, `prover_${config.proverId}`) : undefined; - const cacheManager = new ProverCacheManager(cacheDir); - return new ProverNode( prover, publisher, @@ -95,7 +90,6 @@ export async function createProverNode( epochMonitor, bondManager, telemetry, - cacheManager, proverNodeConfig, ); } diff --git a/yarn-project/prover-node/src/prover-cache/cache_manager.ts b/yarn-project/prover-node/src/prover-cache/cache_manager.ts deleted file mode 100644 index 497300d1e424..000000000000 --- a/yarn-project/prover-node/src/prover-cache/cache_manager.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { type ProverCache } from '@aztec/circuit-types'; -import { createLogger } from '@aztec/foundation/log'; -import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; -import { InMemoryProverCache } from '@aztec/prover-client'; - -import { type Dirent } from 'fs'; -import { mkdir, readFile, readdir, rm, writeFile } from 'fs/promises'; -import { join } from 'path'; - -import { KVProverCache } from './kv_cache.js'; - -const EPOCH_DIR_PREFIX = 'epoch'; -const EPOCH_DIR_SEPARATOR = '_'; -const EPOCH_HASH_FILENAME = 'epoch_hash.txt'; - -export class ProverCacheManager { - constructor(private cacheDir?: string, private log = createLogger('prover-node:cache-manager')) {} - - public async openCache(epochNumber: bigint, epochHash: Buffer): Promise { - if (!this.cacheDir) { - return new InMemoryProverCache(); - } - - const epochDir = EPOCH_DIR_PREFIX + EPOCH_DIR_SEPARATOR + epochNumber; - const dataDir = join(this.cacheDir, epochDir); - - const storedEpochHash = await readFile(join(dataDir, EPOCH_HASH_FILENAME), 'hex').catch(() => Buffer.alloc(0)); - if (storedEpochHash.toString() !== epochHash.toString()) { - await rm(dataDir, { recursive: true, force: true }); - } - - await mkdir(dataDir, { recursive: true }); - await writeFile(join(dataDir, EPOCH_HASH_FILENAME), epochHash.toString('hex')); - - const store = AztecLmdbStore.open(dataDir); - this.log.debug(`Created new database for epoch ${epochNumber} at ${dataDir}`); - const cleanup = () => store.close(); - return new KVProverCache(store, cleanup); - } - - /** - * Removes all caches for epochs older than the given epoch (including) - * @param upToAndIncludingEpoch - The epoch number up to which to remove caches - */ - public async removeStaleCaches(upToAndIncludingEpoch: bigint): Promise { - if (!this.cacheDir) { - return; - } - - const entries: Dirent[] = await readdir(this.cacheDir, { withFileTypes: true }).catch(() => []); - - for (const item of entries) { - if (!item.isDirectory()) { - continue; - } - - const [prefix, epochNumber] = item.name.split(EPOCH_DIR_SEPARATOR); - if (prefix !== EPOCH_DIR_PREFIX) { - continue; - } - - const epochNumberInt = BigInt(epochNumber); - if (epochNumberInt <= upToAndIncludingEpoch) { - this.log.info(`Removing old epoch database for epoch ${epochNumberInt} at ${join(this.cacheDir, item.name)}`); - await rm(join(this.cacheDir, item.name), { recursive: true }); - } - } - } -} diff --git a/yarn-project/prover-node/src/prover-cache/kv_cache.ts b/yarn-project/prover-node/src/prover-cache/kv_cache.ts deleted file mode 100644 index 82b216e384ae..000000000000 --- a/yarn-project/prover-node/src/prover-cache/kv_cache.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { ProverCache, ProvingJobStatus } from '@aztec/circuit-types'; -import type { AztecKVStore, AztecMap } from '@aztec/kv-store'; - -export class KVProverCache implements ProverCache { - private proofs: AztecMap; - - constructor(store: AztecKVStore, private cleanup?: () => Promise) { - this.proofs = store.openMap('prover_node_proof_status'); - } - - getProvingJobStatus(jobId: string): Promise { - const item = this.proofs.get(jobId); - if (!item) { - return Promise.resolve({ status: 'not-found' }); - } - - return Promise.resolve(JSON.parse(item)); - } - - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise { - return this.proofs.set(jobId, JSON.stringify(status)); - } - - async close(): Promise { - await this.cleanup?.(); - } -} diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 8d09f5174d3d..743360da4973 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -7,7 +7,6 @@ import { type L2Block, type L2BlockSource, type MerkleTreeWriteOperations, - type ProverCache, type ProverCoordination, WorldStateRunningState, type WorldStateSynchronizer, @@ -37,7 +36,6 @@ import { type BondManager } from './bond/bond-manager.js'; import { type EpochProvingJob } from './job/epoch-proving-job.js'; import { ClaimsMonitor } from './monitors/claims-monitor.js'; import { EpochMonitor } from './monitors/epoch-monitor.js'; -import { ProverCacheManager } from './prover-cache/cache_manager.js'; import { ProverNode, type ProverNodeOptions } from './prover-node.js'; import { type QuoteProvider } from './quote-provider/index.js'; import { type QuoteSigner } from './quote-signer.js'; @@ -104,7 +102,6 @@ describe('prover-node', () => { epochMonitor, bondManager, telemetryClient, - new ProverCacheManager(), config, ); @@ -390,7 +387,6 @@ describe('prover-node', () => { protected override doCreateEpochProvingJob( epochNumber: bigint, _blocks: L2Block[], - _cache: ProverCache, _publicProcessorFactory: PublicProcessorFactory, cleanUp: (job: EpochProvingJob) => Promise, ): EpochProvingJob { diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index d70ce17bb874..dacdbe97417a 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -6,7 +6,6 @@ import { type L1ToL2MessageSource, type L2Block, type L2BlockSource, - type ProverCache, type ProverCoordination, type ProverNodeApi, type Service, @@ -15,7 +14,6 @@ import { } from '@aztec/circuit-types'; import { type ContractDataSource } from '@aztec/circuits.js'; import { compact } from '@aztec/foundation/collection'; -import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { type Maybe } from '@aztec/foundation/types'; import { type P2P } from '@aztec/p2p'; @@ -28,7 +26,6 @@ import { EpochProvingJob, type EpochProvingJobState } from './job/epoch-proving- import { ProverNodeMetrics } from './metrics.js'; import { type ClaimsMonitor, type ClaimsMonitorHandler } from './monitors/claims-monitor.js'; import { type EpochMonitor, type EpochMonitorHandler } from './monitors/epoch-monitor.js'; -import { type ProverCacheManager } from './prover-cache/cache_manager.js'; import { type QuoteProvider } from './quote-provider/index.js'; import { type QuoteSigner } from './quote-signer.js'; @@ -66,7 +63,6 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr private readonly epochsMonitor: EpochMonitor, private readonly bondManager: BondManager, private readonly telemetryClient: TelemetryClient, - private readonly proverCacheManager: ProverCacheManager, options: Partial = {}, ) { this.options = { @@ -265,16 +261,11 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr // Create a processor using the forked world state const publicProcessorFactory = new PublicProcessorFactory(this.contractDataSource, this.telemetryClient); - const epochHash = sha256(Buffer.concat(blocks.map(block => block.hash().toBuffer()))); - const proverCache = await this.proverCacheManager.openCache(epochNumber, epochHash); - const cleanUp = async () => { - await proverCache.close(); - await this.proverCacheManager.removeStaleCaches(epochNumber); this.jobs.delete(job.getId()); }; - const job = this.doCreateEpochProvingJob(epochNumber, blocks, proverCache, publicProcessorFactory, cleanUp); + const job = this.doCreateEpochProvingJob(epochNumber, blocks, publicProcessorFactory, cleanUp); this.jobs.set(job.getId(), job); return job; } @@ -283,7 +274,6 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr protected doCreateEpochProvingJob( epochNumber: bigint, blocks: L2Block[], - proverCache: ProverCache, publicProcessorFactory: PublicProcessorFactory, cleanUp: () => Promise, ) { @@ -291,7 +281,7 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr this.worldState, epochNumber, blocks, - this.prover.createEpochProver(proverCache), + this.prover.createEpochProver(), publicProcessorFactory, this.publisher, this.l2BlockSource,