Skip to content

Commit

Permalink
feat: agent and broker expose OTEL metrics (#10264)
Browse files Browse the repository at this point in the history
This PR adds instrumentation to both the proving broker and agent.
  • Loading branch information
alexghr authored Dec 5, 2024
1 parent 9d833c5 commit c2c8cc6
Show file tree
Hide file tree
Showing 20 changed files with 332 additions and 49 deletions.
5 changes: 1 addition & 4 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class Archiver implements ArchiveSource {
config.l1Contracts.registryAddress,
archiverStore,
config.archiverPollingIntervalMS ?? 10_000,
new ArchiverInstrumentation(telemetry),
new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()),
{ l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration },
);
await archiver.start(blockUntilSynced);
Expand Down Expand Up @@ -271,9 +271,6 @@ export class Archiver implements ArchiveSource {
// the chain locally before we start unwinding stuff. This can be optimized by figuring out
// up to which point we're pruning, and then requesting L2 blocks up to that point only.
await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber);

const storeSizes = this.store.estimateSize();
this.instrumentation.recordDBMetrics(storeSizes);
}
}

Expand Down
8 changes: 3 additions & 5 deletions yarn-project/archiver/src/archiver/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Gauge,
type Histogram,
LmdbMetrics,
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type UpDownCounter,
Expand All @@ -23,7 +24,7 @@ export class ArchiverInstrumentation {

private log = createDebugLogger('aztec:archiver:instrumentation');

constructor(private telemetry: TelemetryClient) {
constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) {
const meter = telemetry.getMeter('Archiver');
this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, {
description: 'The height of the latest block processed by the archiver',
Expand Down Expand Up @@ -72,13 +73,10 @@ export class ArchiverInstrumentation {
name: Metrics.ARCHIVER_DB_NUM_ITEMS,
description: 'Num items in the archiver database',
},
lmdbStats,
);
}

public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) {
this.dbMetrics.recordDBMetrics(metrics);
}

public isEnabled(): boolean {
return this.telemetry.isEnabled();
}
Expand Down
13 changes: 12 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,18 @@ export async function startProverAgent(
);
const prover = await buildServerCircuitProver(config, telemetry);
const proofStore = new InlineProofStore();
const agents = times(config.proverAgentCount, () => new ProvingAgent(broker, proofStore, prover));
const agents = times(
config.proverAgentCount,
() =>
new ProvingAgent(
broker,
proofStore,
prover,
telemetry,
config.proverAgentProofTypes,
config.proverAgentPollIntervalMs,
),
);

await Promise.all(agents.map(agent => agent.start()));

Expand Down
7 changes: 6 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
import { ProvingJobBrokerSchema, createAndStartProvingBroker } from '@aztec/prover-client/broker';
import { getProverNodeBrokerConfigFromEnv } from '@aztec/prover-node';
import {
createAndStartTelemetryClient,
getConfigEnvVars as getTelemetryClientConfig,
} from '@aztec/telemetry-client/start';

import { extractRelevantOptions } from '../util.js';

Expand All @@ -22,7 +26,8 @@ export async function startProverBroker(
...extractRelevantOptions<ProverBrokerConfig>(options, proverBrokerConfigMappings, 'proverBroker'), // override with command line options
};

const broker = await createAndStartProvingBroker(config);
const client = await createAndStartTelemetryClient(getTelemetryClientConfig());
const broker = await createAndStartProvingBroker(config, client);
services.proverBroker = [broker, ProvingJobBrokerSchema];
signalHandlers.push(() => broker.stop());

Expand Down
8 changes: 3 additions & 5 deletions yarn-project/p2p/src/mem_pools/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Attributes,
type Histogram,
LmdbMetrics,
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type UpDownCounter,
Expand Down Expand Up @@ -58,7 +59,7 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {

private defaultAttributes;

constructor(telemetry: TelemetryClient, name: PoolName) {
constructor(telemetry: TelemetryClient, name: PoolName, dbStats?: LmdbStatsCallback) {
const meter = telemetry.getMeter(name);
this.defaultAttributes = { [Attributes.POOL_NAME]: name };

Expand Down Expand Up @@ -98,13 +99,10 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
name: Metrics.MEMPOOL_DB_NUM_ITEMS,
description: 'Num items in database for the Tx mempool',
},
dbStats,
);
}

public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) {
this.dbMetrics.recordDBMetrics(metrics);
}

public recordSize(poolObject: PoolObject) {
this.objectSize.record(poolObject.getSize());
}
Expand Down
4 changes: 1 addition & 3 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class AztecKVTxPool implements TxPool {

this.#store = store;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL);
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}

public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
Expand All @@ -53,8 +53,6 @@ export class AztecKVTxPool implements TxPool {
}
this.#metrics.recordRemovedObjects(deleted, 'pending');
this.#metrics.recordAddedObjects(txHashes.length, 'mined');
const storeSizes = this.#store.estimateSize();
this.#metrics.recordDBMetrics(storeSizes);
});
}

Expand Down
10 changes: 9 additions & 1 deletion yarn-project/prover-client/src/prover-client/prover-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,15 @@ export class ProverClient implements EpochProverManager {
const prover = await buildServerCircuitProver(this.config, this.telemetry);
this.agents = times(
this.config.proverAgentCount,
() => new ProvingAgent(this.agentClient!, proofStore, prover, [], this.config.proverAgentPollIntervalMs),
() =>
new ProvingAgent(
this.agentClient!,
proofStore,
prover,
this.telemetry,
[],
this.config.proverAgentPollIntervalMs,
),
);

await Promise.all(this.agents.map(agent => agent.start()));
Expand Down
10 changes: 7 additions & 3 deletions yarn-project/prover-client/src/proving_broker/factory.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { type ProverBrokerConfig } from '@aztec/circuit-types';
import { AztecLmdbStore } from '@aztec/kv-store/lmdb';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { ProvingBroker } from './proving_broker.js';
import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js';
import { KVBrokerDatabase } from './proving_broker_database/persisted.js';

export async function createAndStartProvingBroker(config: ProverBrokerConfig): Promise<ProvingBroker> {
export async function createAndStartProvingBroker(
config: ProverBrokerConfig,
client: TelemetryClient,
): Promise<ProvingBroker> {
const database = config.proverBrokerDataDirectory
? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory))
? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory), client)
: new InMemoryBrokerDatabase();

const broker = new ProvingBroker(database, {
const broker = new ProvingBroker(database, client, {
jobTimeoutMs: config.proverBrokerJobTimeoutMs,
maxRetries: config.proverBrokerJobMaxRetries,
timeoutIntervalMs: config.proverBrokerPollIntervalMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js
import { randomBytes } from '@aztec/foundation/crypto';
import { AbortError } from '@aztec/foundation/error';
import { promiseWithResolvers } from '@aztec/foundation/promise';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';

Expand Down Expand Up @@ -50,7 +51,7 @@ describe('ProvingAgent', () => {
saveProofOutput: jest.fn(),
};

agent = new ProvingAgent(jobSource, proofDB, prover, [ProvingRequestType.BASE_PARITY]);
agent = new ProvingAgent(jobSource, proofDB, prover, new NoopTelemetryClient(), [ProvingRequestType.BASE_PARITY]);
});

afterEach(async () => {
Expand Down
15 changes: 15 additions & 0 deletions yarn-project/prover-client/src/proving_broker/proving_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import {
} from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { Timer } from '@aztec/foundation/timer';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { type ProofStore } from './proof_store.js';
import { ProvingAgentInstrumentation } from './proving_agent_instrumentation.js';
import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_controller.js';

/**
Expand All @@ -20,6 +23,8 @@ import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_
export class ProvingAgent {
private currentJobController?: ProvingJobController;
private runningPromise: RunningPromise;
private instrumentation: ProvingAgentInstrumentation;
private idleTimer: Timer | undefined;

constructor(
/** The source of proving jobs */
Expand All @@ -28,12 +33,15 @@ export class ProvingAgent {
private proofStore: ProofStore,
/** The prover implementation to defer jobs to */
private circuitProver: ServerCircuitProver,
/** A telemetry client through which to emit metrics */
client: TelemetryClient,
/** Optional list of allowed proof types to build */
private proofAllowList: Array<ProvingRequestType> = [],
/** How long to wait between jobs */
private pollIntervalMs = 1000,
private log = createDebugLogger('aztec:prover-client:proving-agent'),
) {
this.instrumentation = new ProvingAgentInstrumentation(client);
this.runningPromise = new RunningPromise(this.safeWork, this.pollIntervalMs);
}

Expand All @@ -46,6 +54,7 @@ export class ProvingAgent {
}

public start(): void {
this.idleTimer = new Timer();
this.runningPromise.start();
}

Expand Down Expand Up @@ -114,6 +123,11 @@ export class ProvingAgent {
);
}

if (this.idleTimer) {
this.instrumentation.recordIdleTime(this.idleTimer);
}
this.idleTimer = undefined;

this.currentJobController.start();
} catch (err) {
this.log.error(`Error in ProvingAgent: ${String(err)}`);
Expand All @@ -126,6 +140,7 @@ export class ProvingAgent {
err: Error | undefined,
result: ProvingJobResultsMap[T] | undefined,
) => {
this.idleTimer = new Timer();
if (err) {
const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false;
this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { type Timer } from '@aztec/foundation/timer';
import { type Histogram, Metrics, type TelemetryClient, ValueType } from '@aztec/telemetry-client';

export class ProvingAgentInstrumentation {
private idleTime: Histogram;

constructor(client: TelemetryClient, name = 'ProvingAgent') {
const meter = client.getMeter(name);

this.idleTime = meter.createHistogram(Metrics.PROVING_AGENT_IDLE, {
description: 'Records how long an agent was idle',
unit: 'ms',
valueType: ValueType.INT,
});
}

recordIdleTime(msOrTimer: Timer | number) {
const duration = typeof msOrTimer === 'number' ? msOrTimer : Math.floor(msOrTimer.ms());
this.idleTime.record(duration);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types';
import { randomBytes } from '@aztec/foundation/crypto';
import { openTmpStore } from '@aztec/kv-store/utils';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';

Expand All @@ -17,7 +18,7 @@ describe.each([
() => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }),
() => {
const store = openTmpStore(true);
const database = new KVBrokerDatabase(store);
const database = new KVBrokerDatabase(store, new NoopTelemetryClient());
const cleanup = () => store.close();
return { database, cleanup };
},
Expand All @@ -35,7 +36,7 @@ describe.each([
maxRetries = 2;
({ database, cleanup } = createDb());

broker = new ProvingBroker(database, {
broker = new ProvingBroker(database, new NoopTelemetryClient(), {
jobTimeoutMs,
timeoutIntervalMs: jobTimeoutMs / 4,
maxRetries,
Expand Down Expand Up @@ -409,7 +410,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(10_000);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();

await assertJobStatus(job1.id, 'in-queue');
Expand Down Expand Up @@ -470,7 +471,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(10_000);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();

await assertJobStatus(job1.id, 'in-queue');
Expand Down Expand Up @@ -521,7 +522,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(100 * jobTimeoutMs);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();
await assertJobStatus(job1.id, 'in-queue');

Expand Down
Loading

0 comments on commit c2c8cc6

Please sign in to comment.