Skip to content

Commit

Permalink
feat: Remove executor metrics on stop (#557)
Browse files Browse the repository at this point in the history
When an Executor is stopped, the metrics exposed from that worker still
persist. As metrics are aggregated across all workers, the metrics for a
given indexer would be aggregated across all previous workers, creating
incorrect metrics.
  • Loading branch information
morgsmccauley authored Feb 13, 2024
1 parent 8864a2a commit e2e8e53
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
10 changes: 7 additions & 3 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ export const METRICS = {
};

const aggregatorRegistry = new AggregatorRegistry();
const workerMetrics: Record<number, string> = {};
const workerMetrics = new Map<number, string>();

export const registerWorkerMetrics = (workerId: number, metrics: string): void => {
workerMetrics[workerId] = metrics;
workerMetrics.set(workerId, metrics);
};

export const deregisterWorkerMetrics = (workerId: number): void => {
workerMetrics.delete(workerId);
};

export const startServer = async (): Promise<void> => {
Expand All @@ -81,7 +85,7 @@ export const startServer = async (): Promise<void> => {
app.get('/metrics', async (_req, res) => {
res.set('Content-Type', aggregatorRegistry.contentType);

const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics();
const metrics = await AggregatorRegistry.aggregate(Array.from(workerMetrics.values())).metrics();
res.send(metrics);
});

Expand Down
4 changes: 3 additions & 1 deletion runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import path from 'path';
import { Worker, isMainThread } from 'worker_threads';

import { registerWorkerMetrics } from '../metrics';
import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics';
import Indexer from '../indexer';

export enum Status {
Expand Down Expand Up @@ -60,6 +60,8 @@ export default class StreamHandler {
}

async stop (): Promise<void> {
deregisterWorkerMetrics(this.worker.threadId);

await this.worker.terminate();
}

Expand Down

0 comments on commit e2e8e53

Please sign in to comment.