From 0c54bf4e95dbed1c0450b53391f6a51e3ddb7c79 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Dec 2024 10:27:23 +0000 Subject: [PATCH 1/7] refactor: orchestrator talks directly to broker --- .../src/interfaces/prover-broker.ts | 6 - .../src/interfaces/prover-client.ts | 35 +--- yarn-project/foundation/src/config/env_var.ts | 1 - yarn-project/foundation/src/string/index.ts | 4 + yarn-project/prover-client/src/index.ts | 1 - .../src/prover-client/prover-client.ts | 13 +- .../broker_prover_facade.test.ts | 152 +++++++++++++++++ ...oker_facade.ts => broker_prover_facade.ts} | 67 ++------ .../caching_broker_facade.test.ts | 156 ------------------ .../src/proving_broker/prover_cache/memory.ts | 20 --- .../src/proving_broker/proving_agent.ts | 14 +- .../src/proving_broker/proving_broker.test.ts | 90 ++++------ .../src/proving_broker/proving_broker.ts | 129 +++++++++++---- .../proving_job_controller.test.ts | 1 + .../proving_broker/proving_job_controller.ts | 25 +-- .../prover-client/src/proving_broker/rpc.ts | 1 - .../prover-client/src/test/mock_prover.ts | 9 +- yarn-project/prover-node/src/factory.ts | 6 - .../src/prover-cache/cache_manager.ts | 69 -------- .../prover-node/src/prover-cache/kv_cache.ts | 27 --- .../prover-node/src/prover-node.test.ts | 4 - yarn-project/prover-node/src/prover-node.ts | 14 +- 22 files changed, 336 insertions(+), 508 deletions(-) create mode 100644 yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts rename yarn-project/prover-client/src/proving_broker/{caching_broker_facade.ts => broker_prover_facade.ts} (82%) delete mode 100644 yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts delete mode 100644 yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts delete mode 100644 yarn-project/prover-node/src/prover-cache/cache_manager.ts delete mode 100644 yarn-project/prover-node/src/prover-cache/kv_cache.ts diff --git a/yarn-project/circuit-types/src/interfaces/prover-broker.ts b/yarn-project/circuit-types/src/interfaces/prover-broker.ts index fb2fdadef67..546b26aa672 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-broker.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-broker.ts @@ -61,12 +61,6 @@ export interface ProvingJobProducer { */ cancelProvingJob(id: ProvingJobId): Promise; - /** - * Cleans up after a job has completed. Throws if the job is in-progress - * @param id - The ID of the job to cancel - */ - cleanUpProvingJobState(id: ProvingJobId): Promise; - /** * Returns the current status fof the proving job * @param id - The ID of the job to get the status of diff --git a/yarn-project/circuit-types/src/interfaces/prover-client.ts b/yarn-project/circuit-types/src/interfaces/prover-client.ts index 5dd392d95c6..384bf8331ec 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-client.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-client.ts @@ -7,7 +7,6 @@ import { z } from 'zod'; import { type TxHash } from '../tx/tx_hash.js'; import { type EpochProver } from './epoch-prover.js'; import { type ProvingJobConsumer } from './prover-broker.js'; -import { type ProvingJobStatus } from './proving-job.js'; export type ActualProverConfig = { /** Whether to construct real proofs */ @@ -24,9 +23,6 @@ export type ProverConfig = ActualProverConfig & { nodeUrl?: string; /** Identifier of the prover */ proverId: Fr; - /** Where to store temporary data */ - cacheDir?: string; - proverAgentCount: number; }; @@ -35,7 +31,6 @@ export const ProverConfigSchema = z.object({ realProofs: z.boolean(), proverId: schemas.Fr, proverTestDelayMs: z.number(), - cacheDir: z.string().optional(), proverAgentCount: z.number(), }) satisfies ZodFor; @@ -60,11 +55,6 @@ export const proverConfigMappings: ConfigMappingsType = { description: 'Artificial delay to introduce to all operations to the test prover.', ...numberConfigHelper(0), }, - cacheDir: { - env: 'PROVER_CACHE_DIR', - description: 'Where to store cache data generated while proving', - defaultValue: '/tmp/aztec-prover', - }, proverAgentCount: { env: 'PROVER_AGENT_COUNT', description: 'The number of prover agents to start', @@ -76,35 +66,12 @@ function parseProverId(str: string) { return Fr.fromHexString(str.startsWith('0x') ? str : Buffer.from(str, 'utf8').toString('hex')); } -/** - * A database where the proving orchestrator can store intermediate results - */ -export interface ProverCache { - /** - * Saves the status of a proving job - * @param jobId - The job ID - * @param status - The status of the proof - */ - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise; - - /** - * Retrieves the status of a proving job (if known) - * @param jobId - The job ID - */ - getProvingJobStatus(jobId: string): Promise; - - /** - * Closes the cache - */ - close(): Promise; -} - /** * The interface to the prover client. * Provides the ability to generate proofs and build rollups. */ export interface EpochProverManager { - createEpochProver(cache?: ProverCache): EpochProver; + createEpochProver(): EpochProver; start(): Promise; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index ea13f2f9711..b1d0c8ce21f 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -125,7 +125,6 @@ export type EnvVar = | 'PROVER_REAL_PROOFS' | 'PROVER_REQUIRED_CONFIRMATIONS' | 'PROVER_TEST_DELAY_MS' - | 'PROVER_CACHE_DIR' | 'PXE_L2_STARTING_BLOCK' | 'PXE_PROVER_ENABLED' | 'QUOTE_PROVIDER_BASIS_POINT_FEE' diff --git a/yarn-project/foundation/src/string/index.ts b/yarn-project/foundation/src/string/index.ts index 1b85173fc1a..c6d0d4d8fc9 100644 --- a/yarn-project/foundation/src/string/index.ts +++ b/yarn-project/foundation/src/string/index.ts @@ -25,3 +25,7 @@ export function pluralize(str: string, count: number | bigint, plural?: string): export function count(count: number | bigint, str: string, plural?: string): string { return `${count} ${pluralize(str, count, plural)}`; } + +export function truncate(str: string, length: number = 64): string { + return str.length > length ? str.slice(0, length) + '...' : str; +} diff --git a/yarn-project/prover-client/src/index.ts b/yarn-project/prover-client/src/index.ts index 822b565f54a..60feee5fa19 100644 --- a/yarn-project/prover-client/src/index.ts +++ b/yarn-project/prover-client/src/index.ts @@ -2,4 +2,3 @@ export { EpochProverManager } from '@aztec/circuit-types'; export * from './prover-client/index.js'; export * from './config.js'; -export * from './proving_broker/prover_cache/memory.js'; diff --git a/yarn-project/prover-client/src/prover-client/prover-client.ts b/yarn-project/prover-client/src/prover-client/prover-client.ts index b5d81a34905..115b2a9f784 100644 --- a/yarn-project/prover-client/src/prover-client/prover-client.ts +++ b/yarn-project/prover-client/src/prover-client/prover-client.ts @@ -4,7 +4,6 @@ import { type EpochProver, type EpochProverManager, type ForkMerkleTreeOperations, - type ProverCache, type ProvingJobBroker, type ProvingJobConsumer, type ProvingJobProducer, @@ -16,13 +15,10 @@ import { createLogger } from '@aztec/foundation/log'; import { NativeACVMSimulator } from '@aztec/simulator'; import { type TelemetryClient } from '@aztec/telemetry-client'; -import { join } from 'path'; - import { type ProverClientConfig } from '../config.js'; import { ProvingOrchestrator } from '../orchestrator/orchestrator.js'; -import { CachingBrokerFacade } from '../proving_broker/caching_broker_facade.js'; +import { BrokerCircuitProverFacade } from '../proving_broker/broker_prover_facade.js'; import { InlineProofStore } from '../proving_broker/proof_store.js'; -import { InMemoryProverCache } from '../proving_broker/prover_cache/memory.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; /** Manages proving of epochs by orchestrating the proving of individual blocks relying on a pool of prover agents. */ @@ -30,8 +26,6 @@ export class ProverClient implements EpochProverManager { private running = false; private agents: ProvingAgent[] = []; - private cacheDir?: string; - private constructor( private config: ProverClientConfig, private worldState: ForkMerkleTreeOperations, @@ -42,13 +36,12 @@ export class ProverClient implements EpochProverManager { ) { // TODO(palla/prover-node): Cache the paddingTx here, and not in each proving orchestrator, // so it can be reused across multiple ones and not recomputed every time. - this.cacheDir = this.config.cacheDir ? join(this.config.cacheDir, `tx_prover_${this.config.proverId}`) : undefined; } - public createEpochProver(cache: ProverCache = new InMemoryProverCache()): EpochProver { + public createEpochProver(): EpochProver { return new ProvingOrchestrator( this.worldState, - new CachingBrokerFacade(this.orchestratorClient, cache), + new BrokerCircuitProverFacade(this.orchestratorClient), this.telemetry, this.config.proverId, ); diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts new file mode 100644 index 00000000000..13c09f09011 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.test.ts @@ -0,0 +1,152 @@ +import { makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; +import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; +import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { sleep } from '@aztec/foundation/sleep'; + +import { jest } from '@jest/globals'; + +import { MockProver, TestBroker } from '../test/mock_prover.js'; +import { BrokerCircuitProverFacade } from './broker_prover_facade.js'; +import { InlineProofStore } from './proof_store.js'; + +describe('BrokerCircuitProverFacade', () => { + let facade: BrokerCircuitProverFacade; + let proofStore: InlineProofStore; + let broker: TestBroker; + let prover: MockProver; + let agentPollInterval: number; + + beforeEach(async () => { + proofStore = new InlineProofStore(); + prover = new MockProver(); + agentPollInterval = 100; + broker = new TestBroker(2, prover, proofStore, agentPollInterval); + facade = new BrokerCircuitProverFacade(broker, proofStore); + + await broker.start(); + }); + + it('sends jobs to the broker', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof'); + + await expect(facade.getBaseParityProof(inputs, controller.signal, 42)).resolves.toBeDefined(); + + expect(broker.enqueueProvingJob).toHaveBeenCalled(); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + }); + + it('handles multiple calls for the same job', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + const promises: Promise[] = []; + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + // send N identical proof requests + const CALLS = 50; + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); + } + + await sleep(agentPollInterval); + // the broker should have received all of them + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); + + // but really, it should have only enqueued just one + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + + // now we have 50 promises all waiting on the same result + // resolve the proof + const result = makePublicInputsAndRecursiveProof( + makeParityPublicInputs(), + makeRecursiveProof(RECURSIVE_PROOF_LENGTH), + VerificationKeyData.makeFakeHonk(), + ); + resultPromise.resolve(result); + + // enqueue another N requests for the same jobs + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42)); + } + + await sleep(agentPollInterval); + // the broker will have received the new requests + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); + // but no new jobs where created + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + + // and all 2 * N requests will have been resolved with the same result + for (const promise of promises) { + await expect(promise).resolves.toEqual(result); + } + }); + + it('handles proof errors', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + const promises: Promise[] = []; + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + // send N identical proof requests + const CALLS = 50; + for (let i = 0; i < CALLS; i++) { + // wrap the error in a resolved promises so that we don't have unhandled rejections + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err }))); + } + + await sleep(agentPollInterval); + // the broker should have received all of them + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(CALLS); + + // but really, it should have only enqueued just one + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs, expect.anything(), 42); + + resultPromise.reject(new Error('TEST ERROR')); + + // enqueue another N requests for the same jobs + for (let i = 0; i < CALLS; i++) { + promises.push(facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err }))); + } + + await sleep(agentPollInterval); + // the broker will have received the new requests + expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(2 * CALLS); + // but no new jobs where created + expect(prover.getBaseParityProof).toHaveBeenCalledTimes(1); + + // and all 2 * N requests will have been resolved with the same result + for (const promise of promises) { + await expect(promise).resolves.toEqual({ err: new Error('TEST ERROR') }); + } + }); + + it('handles aborts', async () => { + const inputs = makeBaseParityInputs(); + const controller = new AbortController(); + + const resultPromise = promiseWithResolvers(); + jest.spyOn(broker, 'enqueueProvingJob'); + jest.spyOn(prover, 'getBaseParityProof').mockReturnValue(resultPromise.promise); + + const promise = facade.getBaseParityProof(inputs, controller.signal, 42).catch(err => ({ err })); + + await sleep(agentPollInterval); + expect(prover.getBaseParityProof).toHaveBeenCalled(); + + controller.abort(); + + await expect(promise).resolves.toEqual({ err: new Error('Aborted') }); + }); +}); diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts similarity index 82% rename from yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts rename to yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index bbd154a7e99..9c4e4e67d2b 100644 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -1,6 +1,5 @@ import { type ProofAndVerificationKey, - type ProverCache, type ProvingJobId, type ProvingJobInputsMap, type ProvingJobProducer, @@ -35,6 +34,7 @@ import { import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { retryUntil } from '@aztec/foundation/retry'; +import { truncate } from '@aztec/foundation/string'; import { InlineProofStore, type ProofStore } from './proof_store.js'; import { InMemoryProverCache } from './prover_cache/memory.js'; @@ -48,7 +48,6 @@ const MAX_WAIT_MS = 1_200_000; export class CachingBrokerFacade implements ServerCircuitProver { constructor( private broker: ProvingJobProducer, - private cache: ProverCache = new InMemoryProverCache(), private proofStore: ProofStore = new InlineProofStore(), private waitTimeoutMs = MAX_WAIT_MS, private pollIntervalMs = 1000, @@ -62,53 +61,23 @@ export class CachingBrokerFacade implements ServerCircuitProver { epochNumber = 0, signal?: AbortSignal, ): Promise { - // first try the cache - let jobEnqueued = false; - let jobRejected = undefined; - try { - const cachedResult = await this.cache.getProvingJobStatus(id); - if (cachedResult.status !== 'not-found') { - this.log.debug(`Found cached result for job=${id}: status=${cachedResult.status}`); - } - - if (cachedResult.status === 'fulfilled') { - const output = await this.proofStore.getProofOutput(cachedResult.value); - if (output.type === type) { - return output.result as ProvingJobResultsMap[T]; - } else { - this.log.warn(`Cached result type mismatch for job=${id}. Expected=${type} but got=${output.type}`); - } - } else if (cachedResult.status === 'rejected') { - jobRejected = cachedResult.reason ?? 'Job rejected for unknown reason'; - } else if (cachedResult.status === 'in-progress' || cachedResult.status === 'in-queue') { - jobEnqueued = true; - } else { - jobEnqueued = false; - } - } catch (err) { - this.log.warn(`Failed to get cached proving job id=${id}: ${err}. Re-running job`); - } - - if (jobRejected) { - throw new Error(jobRejected); - } + const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); + await this.broker.enqueueProvingJob({ + id, + type, + inputsUri, + epochNumber, + }); - if (!jobEnqueued) { - try { - const inputsUri = await this.proofStore.saveProofInput(id, type, inputs); - await this.broker.enqueueProvingJob({ - id, - type, - inputsUri, - epochNumber, - }); - await this.cache.setProvingJobStatus(id, { status: 'in-queue' }); - } catch (err) { - this.log.error(`Failed to enqueue proving job id=${id}: ${err}`); - await this.cache.setProvingJobStatus(id, { status: 'not-found' }); - throw err; - } - } + this.log.verbose( + `Sent proving job to broker id=${id} type=${ProvingRequestType[type]} epochNumber=${epochNumber}`, + { + provingJobId: id, + provingJobType: ProvingRequestType[type], + epochNumber, + inputsUri: truncate(inputsUri), + }, + ); // notify broker of cancelled job const abortFn = async () => { @@ -153,8 +122,6 @@ export class CachingBrokerFacade implements ServerCircuitProver { } } finally { signal?.removeEventListener('abort', abortFn); - // we've saved the result in our cache. We can tell the broker to clear its state - await this.broker.cleanUpProvingJobState(id); } } diff --git a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts b/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts deleted file mode 100644 index a72918ade09..00000000000 --- a/yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { type ProvingJobProducer, ProvingRequestType, makePublicInputsAndRecursiveProof } from '@aztec/circuit-types'; -import { RECURSIVE_PROOF_LENGTH, VerificationKeyData, makeRecursiveProof } from '@aztec/circuits.js'; -import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js/testing'; -import { promiseWithResolvers } from '@aztec/foundation/promise'; - -import { jest } from '@jest/globals'; -import { type MockProxy, mock } from 'jest-mock-extended'; - -import { CachingBrokerFacade } from './caching_broker_facade.js'; -import { InlineProofStore } from './proof_store.js'; -import { InMemoryProverCache } from './prover_cache/memory.js'; - -describe('CachingBrokerFacade', () => { - let facade: CachingBrokerFacade; - let cache: InMemoryProverCache; - let proofStore: InlineProofStore; - let broker: MockProxy; - - beforeAll(() => { - jest.useFakeTimers(); - }); - - beforeEach(() => { - broker = mock({ - enqueueProvingJob: jest.fn(), - getProvingJobStatus: jest.fn(), - cancelProvingJob: jest.fn(), - cleanUpProvingJobState: jest.fn(), - waitForJobToSettle: jest.fn(), - }); - cache = new InMemoryProverCache(); - proofStore = new InlineProofStore(); - facade = new CachingBrokerFacade(broker, cache, proofStore); - }); - - it('marks job as in progress', async () => { - const controller = new AbortController(); - void facade.getBaseParityProof(makeBaseParityInputs(), controller.signal); - - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.enqueueProvingJob).toHaveBeenCalled(); - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'in-queue' }); - controller.abort(); - }); - - it('removes the cached value if a job fails to enqueue', async () => { - const { promise, reject } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(promise); - - void facade.getBaseParityProof(makeBaseParityInputs()).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - reject(new Error('Failed to enqueue job')); - - await jest.advanceTimersToNextTimerAsync(); - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'not-found' }); - }); - - it('awaits existing job if in progress', async () => { - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - - void facade.getBaseParityProof(inputs).catch(() => {}); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - }); - - it('reuses already cached results', async () => { - const { promise, resolve } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs); - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); - const job = broker.enqueueProvingJob.mock.calls[0][0]; - - const result = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - - const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); - resolve({ - status: 'fulfilled', - value: outputUri, - }); - - await jest.advanceTimersToNextTimerAsync(); - await expect(cache.getProvingJobStatus(job.id)).resolves.toEqual({ status: 'fulfilled', value: outputUri }); - - await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result); - expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once - }); - - it('clears broker state after a job resolves', async () => { - const { promise, resolve } = promiseWithResolvers(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - void facade.getBaseParityProof(inputs); - await jest.advanceTimersToNextTimerAsync(); - - const job = broker.enqueueProvingJob.mock.calls[0][0]; - const result = makePublicInputsAndRecursiveProof( - makeParityPublicInputs(), - makeRecursiveProof(RECURSIVE_PROOF_LENGTH), - VerificationKeyData.makeFakeHonk(), - ); - const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result); - resolve({ - status: 'fulfilled', - value: outputUri, - }); - - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); - }); - - it('clears broker state after a job is canceled', async () => { - const { promise, resolve } = promiseWithResolvers(); - const catchSpy = jest.fn(); - broker.enqueueProvingJob.mockResolvedValue(Promise.resolve()); - broker.waitForJobToSettle.mockResolvedValue(promise); - - const inputs = makeBaseParityInputs(); - const controller = new AbortController(); - void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy); - await jest.advanceTimersToNextTimerAsync(); - - expect(broker.cancelProvingJob).not.toHaveBeenCalled(); - controller.abort(); - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cancelProvingJob).toHaveBeenCalled(); - - resolve({ - status: 'rejected', - reason: 'Aborted', - }); - - await jest.advanceTimersToNextTimerAsync(); - expect(broker.cleanUpProvingJobState).toHaveBeenCalled(); - expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted')); - }); -}); diff --git a/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts b/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts deleted file mode 100644 index b4da076cbcb..00000000000 --- a/yarn-project/prover-client/src/proving_broker/prover_cache/memory.ts +++ /dev/null @@ -1,20 +0,0 @@ -import type { ProverCache, ProvingJobStatus } from '@aztec/circuit-types'; - -export class InMemoryProverCache implements ProverCache { - private proofs: Record = {}; - - constructor() {} - - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise { - this.proofs[jobId] = status; - return Promise.resolve(); - } - - getProvingJobStatus(jobId: string): Promise { - return Promise.resolve(this.proofs[jobId] ?? { status: 'not-found' }); - } - - close(): Promise { - return Promise.resolve(); - } -} diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.ts index 4d6ad75165a..3c099c8f1a5 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.ts @@ -10,6 +10,7 @@ import { } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { truncate } from '@aztec/foundation/string'; import { Timer } from '@aztec/foundation/timer'; import { type TelemetryClient, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; @@ -107,6 +108,7 @@ export class ProvingAgent implements Traceable { this.currentJobController = new ProvingJobController( job.id, inputs, + job.epochNumber, time, this.circuitProver, this.handleJobResult, @@ -114,13 +116,13 @@ export class ProvingAgent implements Traceable { if (abortedProofJobId) { this.log.info( - `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + `Aborting job id=${abortedProofJobId} type=${abortedProofName} to start new job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncate( job.inputsUri, )}`, ); } else { this.log.info( - `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncateString( + `Starting job id=${this.currentJobController.getJobId()} type=${this.currentJobController.getProofTypeName()} inputsUri=${truncate( job.inputsUri, )}`, ); @@ -147,14 +149,8 @@ export class ProvingAgent implements Traceable { return this.broker.reportProvingJobError(jobId, err.message, retry); } else if (result) { const outputUri = await this.proofStore.saveProofOutput(jobId, type, result); - this.log.info( - `Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncateString(outputUri)}`, - ); + this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncate(outputUri)}`); return this.broker.reportProvingJobSuccess(jobId, outputUri); } }; } - -function truncateString(str: string, length: number = 64): string { - return str.length > length ? str.slice(0, length) + '...' : str; -} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 454e840543f..42ce07819f2 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,4 +1,10 @@ -import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types'; +import { + type ProofUri, + type ProvingJob, + type ProvingJobId, + ProvingJobStatus, + ProvingRequestType, +} from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; import { sleep } from '@aztec/foundation/sleep'; import { openTmpStore } from '@aztec/kv-store/lmdb'; @@ -639,10 +645,8 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await sleep(jobTimeoutMs + brokerIntervalMs); - - // should be back in the queue now - await assertJobStatus(id, 'in-queue'); + await sleep(jobTimeoutMs); + await assertJobTransition(id, 'in-progress', 'in-queue'); }); it('cancel stale jobs that time out', async () => { @@ -659,10 +663,10 @@ describe.each([ await assertJobStatus(id, 'in-progress'); // advance time so job times out because of no heartbeats - await sleep(jobTimeoutMs + brokerIntervalMs); + await sleep(jobTimeoutMs); // should be back in the queue now - await assertJobStatus(id, 'in-queue'); + await assertJobTransition(id, 'in-progress', 'in-queue'); // another agent picks it up await getAndAssertNextJobId(id); @@ -926,48 +930,6 @@ describe.each([ await getAndAssertNextJobId(id2); }); - it('clears job state when job is removed', async () => { - const id1 = makeProvingJobId(); - - await database.addProvingJob({ - id: id1, - type: ProvingRequestType.BASE_PARITY, - epochNumber: 1, - inputsUri: makeInputsUri(), - }); - await database.setProvingJobResult(id1, makeOutputsUri()); - - const id2 = makeProvingJobId(); - await database.addProvingJob({ - id: id2, - type: ProvingRequestType.PRIVATE_BASE_ROLLUP, - epochNumber: 2, - inputsUri: makeInputsUri(), - }); - - await broker.start(); - - await assertJobStatus(id1, 'fulfilled'); - await assertJobStatus(id2, 'in-queue'); - - jest.spyOn(database, 'deleteProvingJobAndResult'); - - await broker.cleanUpProvingJobState(id1); - await sleep(brokerIntervalMs); - expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(1, id1); - - await broker.cancelProvingJob(id2); - await broker.cleanUpProvingJobState(id2); - await sleep(brokerIntervalMs); - - expect(database.deleteProvingJobAndResult).toHaveBeenNthCalledWith(2, id2); - - await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' }); - await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' }); - await assertJobStatus(id1, 'not-found'); - await assertJobStatus(id2, 'not-found'); - }); - it('saves job when enqueued', async () => { await broker.start(); const job: ProvingJob = { @@ -1017,7 +979,7 @@ describe.each([ expect(database.setProvingJobResult).toHaveBeenCalledWith(job.id, expect.any(String)); }); - it('does not retain job result if database fails to save', async () => { + it('saves result even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobResult').mockRejectedValue(new Error('db error')); const id = makeProvingJobId(); @@ -1028,7 +990,7 @@ describe.each([ inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobSuccess(id, makeOutputsUri())).rejects.toThrow(new Error('db error')); - await assertJobStatus(id, 'in-queue'); + await assertJobStatus(id, 'fulfilled'); }); it('saves job error', async () => { @@ -1050,7 +1012,7 @@ describe.each([ expect(database.setProvingJobError).toHaveBeenCalledWith(id, error); }); - it('does not retain job error if database fails to save', async () => { + it('saves job error even if database fails to save', async () => { await broker.start(); jest.spyOn(database, 'setProvingJobError').mockRejectedValue(new Error('db error')); const id = makeProvingJobId(); @@ -1061,7 +1023,7 @@ describe.each([ inputsUri: makeInputsUri(), }); await expect(broker.reportProvingJobError(id, 'test error')).rejects.toThrow(new Error('db error')); - await assertJobStatus(id, 'in-queue'); + await assertJobStatus(id, 'rejected'); }); it('does not save job result if job is unknown', async () => { @@ -1167,10 +1129,30 @@ describe.each([ }); }); - async function assertJobStatus(id: ProvingJobId, status: string) { + async function assertJobStatus(id: ProvingJobId, status: ProvingJobStatus['status']) { await expect(broker.getProvingJobStatus(id)).resolves.toEqual(expect.objectContaining({ status })); } + async function assertJobTransition( + id: ProvingJobId, + currentStatus: ProvingJobStatus['status'], + expectedStatus: ProvingJobStatus['status'], + timeoutMs = 5000, + interval = brokerIntervalMs / 4, + ): Promise { + let status; + const timeout = now() + timeoutMs; + while (now() < timeout) { + ({ status } = await broker.getProvingJobStatus(id)); + if (status !== currentStatus) { + break; + } + await sleep(interval); + } + + expect(status).toEqual(expectedStatus); + } + async function getAndAssertNextJobId(id: ProvingJobId, ...allowList: ProvingRequestType[]) { await expect(broker.getProvingJob({ allowList })).resolves.toEqual( expect.objectContaining({ job: expect.objectContaining({ id }) }), diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index b23b05a5573..09d75f8ea1e 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -3,7 +3,7 @@ import { type ProvingJob, type ProvingJobConsumer, type ProvingJobFilter, - type ProvingJobId, + ProvingJobId, type ProvingJobProducer, type ProvingJobSettledResult, type ProvingJobStatus, @@ -143,7 +143,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr public start(): Promise { for (const [item, result] of this.database.allProvingJobs()) { - this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`); + this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`, { + provingJobId: item.id, + status: result ? result.status : 'pending', + }); this.jobsCache.set(item.id, item); this.promises.set(item.id, promiseWithResolvers()); @@ -152,7 +155,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr this.promises.get(item.id)!.resolve(result); this.resultsCache.set(item.id, result); } else { - this.logger.debug(`Re-enqueuing proving job id=${item.id}`); this.enqueueJobInternal(item); } } @@ -176,34 +178,53 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return; } - await this.database.addProvingJob(job); - this.jobsCache.set(job.id, job); - this.enqueueJobInternal(job); + this.logger.verbose(`New proving job id=${job.id} epochNumber=${job.epochNumber}`, { provingJobId: job.id }); + try { + // do this first so it acts as a "lock". If this job is enqueued again while we're saving it the if at the top will catch it. + this.jobsCache.set(job.id, job); + await this.database.addProvingJob(job); + this.enqueueJobInternal(job); + } catch (err) { + this.logger.error(`Failed to save proving job id=${job.id}: ${err}`, err, { provingJobId: job.id }); + this.jobsCache.delete(job.id); + throw err; + } } public waitForJobToSettle(id: ProvingJobId): Promise { const promiseWithResolvers = this.promises.get(id); if (!promiseWithResolvers) { + this.logger.warn(`Job id=${id} not found`, { provingJobId: id }); return Promise.resolve({ status: 'rejected', reason: `Job ${id} not found` }); } return promiseWithResolvers.promise; } public async cancelProvingJob(id: ProvingJobId): Promise { + if (!this.jobsCache.has(id)) { + this.logger.warn(`Can't cancel a job that doesn't exist id=${id}`, { provingJobId: id }); + return; + } + // notify listeners of the cancellation if (!this.resultsCache.has(id)) { - this.logger.info(`Cancelling job id=${id}`); + this.logger.info(`Cancelling job id=${id}`, { provingJobId: id }); await this.reportProvingJobError(id, 'Aborted', false); } } - public async cleanUpProvingJobState(id: ProvingJobId): Promise { + private async cleanUpProvingJobState(id: ProvingJobId): Promise { + if (!this.jobsCache.has(id)) { + this.logger.warn(`Can't clean up a job that doesn't exist id=${id}`, { provingJobId: id }); + return; + } + if (!this.resultsCache.has(id)) { - this.logger.warn(`Can't cleanup busy proving job: id=${id}`); + this.logger.warn(`Can't cleanup busy proving job: id=${id}`, { provingJobId: id }); return; } - this.logger.debug(`Cleaning up state for job id=${id}`); + this.logger.debug(`Cleaning up state for job id=${id}`, { provingJobId: id }); await this.database.deleteProvingJobAndResult(id); this.jobsCache.delete(id); this.promises.delete(id); @@ -221,7 +242,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const item = this.jobsCache.get(id); if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Proving job id=${id} not found`, { provingJobId: id }); return Promise.resolve({ status: 'not-found' }); } @@ -274,45 +295,68 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const retries = this.retries.get(id) ?? 0; if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Can't set error on unknown proving job id=${id} err=${err}`, { provingJoId: id }); return; } if (!info) { - this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { + provingJobId: id, + }); } else { this.inProgress.delete(id); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} already is already settled, ignoring error`); + this.logger.warn(`Proving job id=${id} is already settled, ignoring err=${err}`, { + provingJobId: id, + }); return; } if (retry && retries + 1 < this.maxRetries && !this.isJobStale(item)) { - this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); + this.logger.info( + `Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1} err=${err}`, + { + provingJobId: id, + }, + ); this.retries.set(id, retries + 1); this.enqueueJobInternal(item); this.instrumentation.incRetriedJobs(item.type); return; } - this.logger.warn( + this.logger.info( `Marking proving job as failed id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${ retries + 1 } err=${err}`, + { + provingJobId: id, + }, ); - await this.database.setProvingJobError(id, err); - + // save the result to the cache and notify clients of the job status + // this should work even if our database breaks because the result is cached in memory const result: ProvingJobSettledResult = { status: 'rejected', reason: String(err) }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incRejectedJobs(item.type); if (info) { const duration = this.msTimeSource() - info.startedAt; this.instrumentation.recordJobDuration(item.type, duration); } + + try { + await this.database.setProvingJobError(id, err); + } catch (saveErr) { + this.logger.error(`Failed to save proving job error status id=${id} jobErr=${err}`, saveErr, { + provingJobId: id, + }); + + throw saveErr; + } } reportProvingJobProgress( @@ -322,12 +366,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr ): Promise<{ job: ProvingJob; time: number } | undefined> { const job = this.jobsCache.get(id); if (!job) { - this.logger.warn(`Proving job id=${id} does not exist`); + this.logger.warn(`Proving job id=${id} does not exist`, { provingJobId: id }); return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} has already been completed`); + this.logger.warn(`Proving job id=${id} has already been completed`, { provingJobId: id }); return filter ? this.getProvingJob(filter) : Promise.resolve(undefined); } @@ -336,6 +380,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr if (!metadata) { this.logger.warn( `Proving job id=${id} type=${ProvingRequestType[job.type]} not found in the in-progress cache, adding it`, + { provingJobId: id }, ); // the queue will still contain the item at this point! // we need to be careful when popping off the queue to make sure we're not sending @@ -348,11 +393,12 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return Promise.resolve(undefined); } else if (startedAt <= metadata.startedAt) { if (startedAt < metadata.startedAt) { - this.logger.debug( + this.logger.info( `Proving job id=${id} type=${ProvingRequestType[job.type]} startedAt=${startedAt} older agent has taken job`, + { provingJobId: id }, ); } else { - this.logger.debug(`Proving job id=${id} type=${ProvingRequestType[job.type]} heartbeat`); + this.logger.debug(`Proving job id=${id} type=${ProvingRequestType[job.type]} heartbeat`, { provingJobId: id }); } metadata.startedAt = startedAt; metadata.lastUpdatedAt = now; @@ -362,6 +408,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr `Proving job id=${id} type=${ ProvingRequestType[job.type] } already being worked on by another agent. Sending new one`, + { provingJobId: id }, ); return this.getProvingJob(filter); } else { @@ -374,31 +421,50 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr const item = this.jobsCache.get(id); const retries = this.retries.get(id) ?? 0; if (!item) { - this.logger.warn(`Proving job id=${id} not found`); + this.logger.warn(`Proving job id=${id} not found`, { provingJobId: id }); return; } if (!info) { - this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`); + this.logger.warn(`Proving job id=${id} type=${ProvingRequestType[item.type]} not in the in-progress set`, { + provingJobId: id, + }); } else { this.inProgress.delete(id); } if (this.resultsCache.has(id)) { - this.logger.warn(`Proving job id=${id} already settled, ignoring result`); + this.logger.warn(`Proving job id=${id} already settled, ignoring result`, { provingJobId: id }); return; } - this.logger.debug( + this.logger.info( `Proving job complete id=${id} type=${ProvingRequestType[item.type]} totalAttempts=${retries + 1}`, + { provingJobId: id }, ); - await this.database.setProvingJobResult(id, value); - + // save result to our local cache and notify clients + // if save to database fails, that's ok because we have the result in memory + // if the broker crashes and needs the result again, we're covered because we can just recompute it const result: ProvingJobSettledResult = { status: 'fulfilled', value }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incResolvedJobs(item.type); + if (info) { + const duration = this.msTimeSource() - info.startedAt; + this.instrumentation.recordJobDuration(item.type, duration); + } + + try { + await this.database.setProvingJobResult(id, value); + } catch (saveErr) { + this.logger.error(`Failed to save proving job result id=${id}`, saveErr, { + provingJobId: id, + }); + + throw saveErr; + } } @trackSpan('ProvingBroker.cleanupPass') @@ -419,7 +485,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr } if (jobsToClean.length > 0) { - this.logger.info(`Cleaning up [${jobsToClean.join(',')}]`); + this.logger.info(`Cleaning up jobs=${jobsToClean.length}`); await asyncPool(this.maxParallelCleanUps, jobsToClean, async jobId => { await this.cleanUpProvingJobState(jobId); }); @@ -431,7 +497,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr for (const [id, metadata] of inProgressEntries) { const item = this.jobsCache.get(id); if (!item) { - this.logger.warn(`Proving job id=${id} not found. Removing it from the queue.`); + this.logger.warn(`Proving job id=${id} not found. Removing it from the queue.`, { provingJobId: id }); this.inProgress.delete(id); continue; } @@ -443,7 +509,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr // the job has timed out and it's also old, just cancel and move on await this.cancelProvingJob(item.id); } else { - this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); + this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`, { provingJobId: id }); this.inProgress.delete(id); this.enqueueJobInternal(item); this.instrumentation.incTimedOutJobs(item.type); @@ -462,7 +528,6 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr }); this.enqueuedAt.set(job.id, new Timer()); this.epochHeight = Math.max(this.epochHeight, job.epochNumber); - this.logger.debug(`Enqueued new proving job id=${job.id}`); } private isJobStale(job: ProvingJob) { diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts index 364703b23cf..1c5fc3c0223 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts @@ -23,6 +23,7 @@ describe('ProvingJobController', () => { type: ProvingRequestType.BASE_PARITY, inputs: makeBaseParityInputs(), }, + 42, 0, prover, onComplete, diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts index 2ce47cbe6f7..0b08e0ad69c 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts @@ -30,6 +30,7 @@ export class ProvingJobController { constructor( private jobId: ProvingJobId, private inputs: ProvingJobInputs, + private epochNumber: number, private startedAt: number, private circuitProver: ServerCircuitProver, private onComplete: ProvingJobCompletionCallback, @@ -100,51 +101,51 @@ export class ProvingJobController { const signal = this.abortController.signal; switch (type) { case ProvingRequestType.PUBLIC_VM: { - return await this.circuitProver.getAvmProof(inputs, signal); + return await this.circuitProver.getAvmProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PRIVATE_BASE_ROLLUP: { - return await this.circuitProver.getPrivateBaseRollupProof(inputs, signal); + return await this.circuitProver.getPrivateBaseRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PUBLIC_BASE_ROLLUP: { - return await this.circuitProver.getPublicBaseRollupProof(inputs, signal); + return await this.circuitProver.getPublicBaseRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.MERGE_ROLLUP: { - return await this.circuitProver.getMergeRollupProof(inputs, signal); + return await this.circuitProver.getMergeRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP: { - return await this.circuitProver.getEmptyBlockRootRollupProof(inputs, signal); + return await this.circuitProver.getEmptyBlockRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BLOCK_ROOT_ROLLUP: { - return await this.circuitProver.getBlockRootRollupProof(inputs, signal); + return await this.circuitProver.getBlockRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BLOCK_MERGE_ROLLUP: { - return await this.circuitProver.getBlockMergeRollupProof(inputs, signal); + return await this.circuitProver.getBlockMergeRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.ROOT_ROLLUP: { - return await this.circuitProver.getRootRollupProof(inputs, signal); + return await this.circuitProver.getRootRollupProof(inputs, signal, this.epochNumber); } case ProvingRequestType.BASE_PARITY: { - return await this.circuitProver.getBaseParityProof(inputs, signal); + return await this.circuitProver.getBaseParityProof(inputs, signal, this.epochNumber); } case ProvingRequestType.ROOT_PARITY: { - return await this.circuitProver.getRootParityProof(inputs, signal); + return await this.circuitProver.getRootParityProof(inputs, signal, this.epochNumber); } case ProvingRequestType.PRIVATE_KERNEL_EMPTY: { - return await this.circuitProver.getEmptyPrivateKernelProof(inputs, signal); + return await this.circuitProver.getEmptyPrivateKernelProof(inputs, signal, this.epochNumber); } case ProvingRequestType.TUBE_PROOF: { - return await this.circuitProver.getTubeProof(inputs, signal); + return await this.circuitProver.getTubeProof(inputs, signal, this.epochNumber); } default: { diff --git a/yarn-project/prover-client/src/proving_broker/rpc.ts b/yarn-project/prover-client/src/proving_broker/rpc.ts index 0d63dcebeac..3dec6bedbdb 100644 --- a/yarn-project/prover-client/src/proving_broker/rpc.ts +++ b/yarn-project/prover-client/src/proving_broker/rpc.ts @@ -28,7 +28,6 @@ const GetProvingJobResponse = z.object({ export const ProvingJobProducerSchema: ApiSchemaFor = { enqueueProvingJob: z.function().args(ProvingJob).returns(z.void()), getProvingJobStatus: z.function().args(ProvingJobId).returns(ProvingJobStatus), - cleanUpProvingJobState: z.function().args(ProvingJobId).returns(z.void()), cancelProvingJob: z.function().args(ProvingJobId).returns(z.void()), waitForJobToSettle: z.function().args(ProvingJobId).returns(ProvingJobSettledResult), }; diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index a4ab77fc24d..baf581d8f8e 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -58,8 +58,12 @@ export class TestBroker implements ProvingJobProducer { agentCount: number, prover: ServerCircuitProver, private proofStore: ProofStore = new InlineProofStore(), + agentPollInterval = 100, ) { - this.agents = times(agentCount, () => new ProvingAgent(this.broker, proofStore, prover, new NoopTelemetryClient())); + this.agents = times( + agentCount, + () => new ProvingAgent(this.broker, proofStore, prover, new NoopTelemetryClient(), undefined, agentPollInterval), + ); } public async start() { @@ -82,9 +86,6 @@ export class TestBroker implements ProvingJobProducer { getProvingJobStatus(id: ProvingJobId): Promise { return this.broker.getProvingJobStatus(id); } - cleanUpProvingJobState(id: ProvingJobId): Promise { - return this.broker.cleanUpProvingJobState(id); - } cancelProvingJob(id: string): Promise { return this.broker.cancelProvingJob(id); } diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 7a4ec700721..c1a5f27026a 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -12,14 +12,12 @@ import { type TelemetryClient } from '@aztec/telemetry-client'; import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { createWorldStateSynchronizer } from '@aztec/world-state'; -import { join } from 'path'; import { createPublicClient, getAddress, getContract, http } from 'viem'; import { createBondManager } from './bond/factory.js'; import { type ProverNodeConfig, type QuoteProviderConfig } from './config.js'; import { ClaimsMonitor } from './monitors/claims-monitor.js'; import { EpochMonitor } from './monitors/epoch-monitor.js'; -import { ProverCacheManager } from './prover-cache/cache_manager.js'; import { createProverCoordination } from './prover-coordination/factory.js'; import { ProverNode, type ProverNodeOptions } from './prover-node.js'; import { HttpQuoteProvider } from './quote-provider/http.js'; @@ -78,9 +76,6 @@ export async function createProverNode( const walletClient = publisher.getClient(); const bondManager = await createBondManager(rollupContract, walletClient, config); - const cacheDir = config.cacheDir ? join(config.cacheDir, `prover_${config.proverId}`) : undefined; - const cacheManager = new ProverCacheManager(cacheDir); - return new ProverNode( prover, publisher, @@ -95,7 +90,6 @@ export async function createProverNode( epochMonitor, bondManager, telemetry, - cacheManager, proverNodeConfig, ); } diff --git a/yarn-project/prover-node/src/prover-cache/cache_manager.ts b/yarn-project/prover-node/src/prover-cache/cache_manager.ts deleted file mode 100644 index 497300d1e42..00000000000 --- a/yarn-project/prover-node/src/prover-cache/cache_manager.ts +++ /dev/null @@ -1,69 +0,0 @@ -import { type ProverCache } from '@aztec/circuit-types'; -import { createLogger } from '@aztec/foundation/log'; -import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; -import { InMemoryProverCache } from '@aztec/prover-client'; - -import { type Dirent } from 'fs'; -import { mkdir, readFile, readdir, rm, writeFile } from 'fs/promises'; -import { join } from 'path'; - -import { KVProverCache } from './kv_cache.js'; - -const EPOCH_DIR_PREFIX = 'epoch'; -const EPOCH_DIR_SEPARATOR = '_'; -const EPOCH_HASH_FILENAME = 'epoch_hash.txt'; - -export class ProverCacheManager { - constructor(private cacheDir?: string, private log = createLogger('prover-node:cache-manager')) {} - - public async openCache(epochNumber: bigint, epochHash: Buffer): Promise { - if (!this.cacheDir) { - return new InMemoryProverCache(); - } - - const epochDir = EPOCH_DIR_PREFIX + EPOCH_DIR_SEPARATOR + epochNumber; - const dataDir = join(this.cacheDir, epochDir); - - const storedEpochHash = await readFile(join(dataDir, EPOCH_HASH_FILENAME), 'hex').catch(() => Buffer.alloc(0)); - if (storedEpochHash.toString() !== epochHash.toString()) { - await rm(dataDir, { recursive: true, force: true }); - } - - await mkdir(dataDir, { recursive: true }); - await writeFile(join(dataDir, EPOCH_HASH_FILENAME), epochHash.toString('hex')); - - const store = AztecLmdbStore.open(dataDir); - this.log.debug(`Created new database for epoch ${epochNumber} at ${dataDir}`); - const cleanup = () => store.close(); - return new KVProverCache(store, cleanup); - } - - /** - * Removes all caches for epochs older than the given epoch (including) - * @param upToAndIncludingEpoch - The epoch number up to which to remove caches - */ - public async removeStaleCaches(upToAndIncludingEpoch: bigint): Promise { - if (!this.cacheDir) { - return; - } - - const entries: Dirent[] = await readdir(this.cacheDir, { withFileTypes: true }).catch(() => []); - - for (const item of entries) { - if (!item.isDirectory()) { - continue; - } - - const [prefix, epochNumber] = item.name.split(EPOCH_DIR_SEPARATOR); - if (prefix !== EPOCH_DIR_PREFIX) { - continue; - } - - const epochNumberInt = BigInt(epochNumber); - if (epochNumberInt <= upToAndIncludingEpoch) { - this.log.info(`Removing old epoch database for epoch ${epochNumberInt} at ${join(this.cacheDir, item.name)}`); - await rm(join(this.cacheDir, item.name), { recursive: true }); - } - } - } -} diff --git a/yarn-project/prover-node/src/prover-cache/kv_cache.ts b/yarn-project/prover-node/src/prover-cache/kv_cache.ts deleted file mode 100644 index 82b216e384a..00000000000 --- a/yarn-project/prover-node/src/prover-cache/kv_cache.ts +++ /dev/null @@ -1,27 +0,0 @@ -import type { ProverCache, ProvingJobStatus } from '@aztec/circuit-types'; -import type { AztecKVStore, AztecMap } from '@aztec/kv-store'; - -export class KVProverCache implements ProverCache { - private proofs: AztecMap; - - constructor(store: AztecKVStore, private cleanup?: () => Promise) { - this.proofs = store.openMap('prover_node_proof_status'); - } - - getProvingJobStatus(jobId: string): Promise { - const item = this.proofs.get(jobId); - if (!item) { - return Promise.resolve({ status: 'not-found' }); - } - - return Promise.resolve(JSON.parse(item)); - } - - setProvingJobStatus(jobId: string, status: ProvingJobStatus): Promise { - return this.proofs.set(jobId, JSON.stringify(status)); - } - - async close(): Promise { - await this.cleanup?.(); - } -} diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index c7b70d6136a..931396a2676 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -8,7 +8,6 @@ import { type L2BlockSource, type MerkleTreeWriteOperations, P2PClientType, - type ProverCache, type ProverCoordination, WorldStateRunningState, type WorldStateSynchronizer, @@ -32,7 +31,6 @@ import { type BondManager } from './bond/bond-manager.js'; import { type EpochProvingJob } from './job/epoch-proving-job.js'; import { ClaimsMonitor } from './monitors/claims-monitor.js'; import { EpochMonitor } from './monitors/epoch-monitor.js'; -import { ProverCacheManager } from './prover-cache/cache_manager.js'; import { ProverNode, type ProverNodeOptions } from './prover-node.js'; import { type QuoteProvider } from './quote-provider/index.js'; import { type QuoteSigner } from './quote-signer.js'; @@ -99,7 +97,6 @@ describe('prover-node', () => { epochMonitor, bondManager, telemetryClient, - new ProverCacheManager(), config, ); @@ -385,7 +382,6 @@ describe('prover-node', () => { protected override doCreateEpochProvingJob( epochNumber: bigint, _blocks: L2Block[], - _cache: ProverCache, _publicProcessorFactory: PublicProcessorFactory, cleanUp: (job: EpochProvingJob) => Promise, ): EpochProvingJob { diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index ceabaf00e33..f6853ca28fe 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -7,7 +7,6 @@ import { type L2Block, type L2BlockSource, type P2PClientType, - type ProverCache, type ProverCoordination, type ProverNodeApi, type Service, @@ -16,7 +15,6 @@ import { } from '@aztec/circuit-types'; import { type ContractDataSource } from '@aztec/circuits.js'; import { compact } from '@aztec/foundation/collection'; -import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { type Maybe } from '@aztec/foundation/types'; import { type P2P } from '@aztec/p2p'; @@ -29,7 +27,6 @@ import { EpochProvingJob, type EpochProvingJobState } from './job/epoch-proving- import { ProverNodeMetrics } from './metrics.js'; import { type ClaimsMonitor, type ClaimsMonitorHandler } from './monitors/claims-monitor.js'; import { type EpochMonitor, type EpochMonitorHandler } from './monitors/epoch-monitor.js'; -import { type ProverCacheManager } from './prover-cache/cache_manager.js'; import { type QuoteProvider } from './quote-provider/index.js'; import { type QuoteSigner } from './quote-signer.js'; @@ -67,7 +64,6 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr private readonly epochsMonitor: EpochMonitor, private readonly bondManager: BondManager, private readonly telemetryClient: TelemetryClient, - private readonly proverCacheManager: ProverCacheManager, options: Partial = {}, ) { this.options = { @@ -266,16 +262,11 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr // Create a processor using the forked world state const publicProcessorFactory = new PublicProcessorFactory(this.contractDataSource, this.telemetryClient); - const epochHash = sha256(Buffer.concat(blocks.map(block => block.hash().toBuffer()))); - const proverCache = await this.proverCacheManager.openCache(epochNumber, epochHash); - const cleanUp = async () => { - await proverCache.close(); - await this.proverCacheManager.removeStaleCaches(epochNumber); this.jobs.delete(job.getId()); }; - const job = this.doCreateEpochProvingJob(epochNumber, blocks, proverCache, publicProcessorFactory, cleanUp); + const job = this.doCreateEpochProvingJob(epochNumber, blocks, publicProcessorFactory, cleanUp); this.jobs.set(job.getId(), job); return job; } @@ -284,7 +275,6 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr protected doCreateEpochProvingJob( epochNumber: bigint, blocks: L2Block[], - proverCache: ProverCache, publicProcessorFactory: PublicProcessorFactory, cleanUp: () => Promise, ) { @@ -292,7 +282,7 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr this.worldState, epochNumber, blocks, - this.prover.createEpochProver(proverCache), + this.prover.createEpochProver(), publicProcessorFactory, this.publisher, this.l2BlockSource, From 818bd62fc7073b3f7807893dd85de098dfa2eef5 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Dec 2024 16:56:26 +0000 Subject: [PATCH 2/7] chore: logs --- .../prover-client/src/proving_broker/proving_broker.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 09d75f8ea1e..4086c5b3803 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -175,10 +175,13 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr if (this.jobsCache.has(job.id)) { const existing = this.jobsCache.get(job.id); assert.deepStrictEqual(job, existing, 'Duplicate proving job ID'); + this.logger.debug(`Duplicate proving job id=${job.id} epochNumber=${job.epochNumber}. Ignoring`, { + provingJobId: job.id, + }); return; } - this.logger.verbose(`New proving job id=${job.id} epochNumber=${job.epochNumber}`, { provingJobId: job.id }); + this.logger.info(`New proving job id=${job.id} epochNumber=${job.epochNumber}`, { provingJobId: job.id }); try { // do this first so it acts as a "lock". If this job is enqueued again while we're saving it the if at the top will catch it. this.jobsCache.set(job.id, job); From 13719a9263e941808dff2856559a0dc711657c92 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Dec 2024 17:09:57 +0000 Subject: [PATCH 3/7] fix: guard against stale proving jobs --- .../src/proving_broker/proving_broker.test.ts | 30 ++++++++++++++----- .../src/proving_broker/proving_broker.ts | 7 +++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 42ce07819f2..e0609ce22dd 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -63,6 +63,28 @@ describe.each([ await broker.stop(); }); + it('refuses stale jobs', async () => { + const id = makeProvingJobId(); + await broker.enqueueProvingJob({ + id, + epochNumber: 42, + type: ProvingRequestType.BASE_PARITY, + inputsUri: makeInputsUri(), + }); + expect(await broker.getProvingJobStatus(id)).toEqual({ status: 'in-queue' }); + + const id2 = makeProvingJobId(); + await expect( + broker.enqueueProvingJob({ + id: id2, + epochNumber: 1, + type: ProvingRequestType.PRIVATE_BASE_ROLLUP, + inputsUri: makeInputsUri(), + }), + ).rejects.toThrow(); + await assertJobStatus(id2, 'not-found'); + }); + it('enqueues jobs', async () => { const id = makeProvingJobId(); await broker.enqueueProvingJob({ @@ -210,15 +232,7 @@ describe.each([ inputsUri: makeInputsUri(), }; - const provingJob3: ProvingJob = { - id: makeProvingJobId(), - type: ProvingRequestType.BASE_PARITY, - epochNumber: 3, - inputsUri: makeInputsUri(), - }; - await broker.enqueueProvingJob(provingJob2); - await broker.enqueueProvingJob(provingJob3); await broker.enqueueProvingJob(provingJob1); await getAndAssertNextJobId(provingJob1.id, ProvingRequestType.BASE_PARITY); diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 4086c5b3803..6edf6c824e2 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -181,6 +181,13 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr return; } + if (this.isJobStale(job)) { + this.logger.warn(`Tried enqueueing stale proving job id=${job.id} epochNumber=${job.epochNumber}`, { + provingJobId: job.id, + }); + throw new Error(`Epoch too old: job epoch ${job.epochNumber}, current epoch: ${this.epochHeight}`); + } + this.logger.info(`New proving job id=${job.id} epochNumber=${job.epochNumber}`, { provingJobId: job.id }); try { // do this first so it acts as a "lock". If this job is enqueued again while we're saving it the if at the top will catch it. From 43241296178264007b5149933e0a14c05cf56293 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 13 Dec 2024 17:21:20 +0000 Subject: [PATCH 4/7] chore: lint --- .../prover-client/src/proving_broker/proving_broker.test.ts | 2 +- .../prover-client/src/proving_broker/proving_broker.ts | 2 +- yarn-project/prover-node/src/prover-node.ts | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index e0609ce22dd..4782cd61489 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -2,7 +2,7 @@ import { type ProofUri, type ProvingJob, type ProvingJobId, - ProvingJobStatus, + type ProvingJobStatus, ProvingRequestType, } from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 6edf6c824e2..7ffbb5244ed 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -3,7 +3,7 @@ import { type ProvingJob, type ProvingJobConsumer, type ProvingJobFilter, - ProvingJobId, + type ProvingJobId, type ProvingJobProducer, type ProvingJobSettledResult, type ProvingJobStatus, diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index f6853ca28fe..cc7a5b126f5 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -262,8 +262,9 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr // Create a processor using the forked world state const publicProcessorFactory = new PublicProcessorFactory(this.contractDataSource, this.telemetryClient); - const cleanUp = async () => { + const cleanUp = () => { this.jobs.delete(job.getId()); + return Promise.resolve(); }; const job = this.doCreateEpochProvingJob(epochNumber, blocks, publicProcessorFactory, cleanUp); From be059494c0f9b329910b438c0e2ed21132012dda Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Mon, 16 Dec 2024 08:04:24 +0000 Subject: [PATCH 5/7] chore: merge conflicts --- .../src/proving_broker/broker_prover_facade.ts | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index 9c4e4e67d2b..3b2977a2831 100644 --- a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -37,15 +37,11 @@ import { retryUntil } from '@aztec/foundation/retry'; import { truncate } from '@aztec/foundation/string'; import { InlineProofStore, type ProofStore } from './proof_store.js'; -import { InMemoryProverCache } from './prover_cache/memory.js'; // 20 minutes, roughly the length of an Aztec epoch. If a proof isn't ready in this amount of time then we've failed to prove the whole epoch const MAX_WAIT_MS = 1_200_000; -/** - * A facade around a job broker that generates stable job ids and caches results - */ -export class CachingBrokerFacade implements ServerCircuitProver { +export class BrokerCircuitProverFacade implements ServerCircuitProver { constructor( private broker: ProvingJobProducer, private proofStore: ProofStore = new InlineProofStore(), @@ -104,12 +100,6 @@ export class CachingBrokerFacade implements ServerCircuitProver { this.pollIntervalMs / 1000, ); - try { - await this.cache.setProvingJobStatus(id, result); - } catch (err) { - this.log.warn(`Failed to cache proving job id=${id} resultStatus=${result.status}: ${err}`); - } - if (result.status === 'fulfilled') { const output = await this.proofStore.getProofOutput(result.value); if (output.type === type) { From 85264b3b8050a26b3bd20584b1277a33fe9677bf Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Mon, 16 Dec 2024 11:42:16 +0000 Subject: [PATCH 6/7] fix: splits --- scripts/tmux_split_args.sh | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/scripts/tmux_split_args.sh b/scripts/tmux_split_args.sh index 25b3a5a4b6f..ef69695682d 100755 --- a/scripts/tmux_split_args.sh +++ b/scripts/tmux_split_args.sh @@ -23,23 +23,29 @@ tmux new-session -d -s "$session_name" -e LOG_LEVEL=${LOG_LEVEL:-"debug"} \ -e OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=${OTEL_EXPORTER_OTLP_TRACES_ENDPOINT:-} \ -e LOG_JSON=${LOG_JSON:-} +echo "DONE" + shift 1 commands=("$@") # Set pane-border-status to top and pane-border-format to display pane title tmux set-option -t "$session_name" pane-border-status top tmux set-option -t "$session_name" pane-border-format "#{pane_title}" +base_index=$(tmux show-options -g base-index 2>/dev/null | awk '{print $2}') +base_index=${base_index:-0} + +echo "base_index=$base_index" # Create the necessary number of panes and set titles num_commands=${#commands[@]} for ((i=0; i Date: Mon, 16 Dec 2024 12:46:09 +0000 Subject: [PATCH 7/7] chore: merge conflict --- .../proving_broker/broker_prover_facade.ts | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index 3b2977a2831..dc20621706d 100644 --- a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -47,7 +47,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { private proofStore: ProofStore = new InlineProofStore(), private waitTimeoutMs = MAX_WAIT_MS, private pollIntervalMs = 1000, - private log = createLogger('prover-client:caching-prover-broker'), + private log = createLogger('prover-client:broker-circuit-prover-facade'), ) {} private async enqueueAndWaitForJob( @@ -121,7 +121,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PUBLIC_VM, inputs), + this.generateId(ProvingRequestType.PUBLIC_VM, inputs, epochNumber), ProvingRequestType.PUBLIC_VM, inputs, epochNumber, @@ -135,7 +135,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BASE_PARITY, inputs), + this.generateId(ProvingRequestType.BASE_PARITY, inputs, epochNumber), ProvingRequestType.BASE_PARITY, inputs, epochNumber, @@ -149,7 +149,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input), + this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_MERGE_ROLLUP, input, epochNumber, @@ -163,7 +163,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.BLOCK_ROOT_ROLLUP, input, epochNumber, @@ -177,7 +177,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, epochNumber), ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input, epochNumber, @@ -191,7 +191,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs), + this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, epochNumber), ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs, epochNumber, @@ -205,7 +205,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.MERGE_ROLLUP, input), + this.generateId(ProvingRequestType.MERGE_ROLLUP, input, epochNumber), ProvingRequestType.MERGE_ROLLUP, input, epochNumber, @@ -218,7 +218,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput), + this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, epochNumber), ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput, epochNumber, @@ -232,7 +232,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs), + this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, epochNumber), ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs, epochNumber, @@ -246,7 +246,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.ROOT_PARITY, inputs), + this.generateId(ProvingRequestType.ROOT_PARITY, inputs, epochNumber), ProvingRequestType.ROOT_PARITY, inputs, epochNumber, @@ -260,7 +260,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.ROOT_ROLLUP, input), + this.generateId(ProvingRequestType.ROOT_ROLLUP, input, epochNumber), ProvingRequestType.ROOT_ROLLUP, input, epochNumber, @@ -274,7 +274,7 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { epochNumber?: number, ): Promise> { return this.enqueueAndWaitForJob( - this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput), + this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput, epochNumber), ProvingRequestType.TUBE_PROOF, tubeInput, epochNumber, @@ -282,8 +282,8 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { ); } - private generateId(type: ProvingRequestType, inputs: { toBuffer(): Buffer }) { + private generateId(type: ProvingRequestType, inputs: { toBuffer(): Buffer }, epochNumber = 0) { const inputsHash = sha256(inputs.toBuffer()); - return `${ProvingRequestType[type]}:${inputsHash.toString('hex')}`; + return `${epochNumber}:${ProvingRequestType[type]}:${inputsHash.toString('hex')}`; } }