diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index b30d2011c..89bd8e47e 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -67,10 +67,14 @@ export const METRICS = { }; const aggregatorRegistry = new AggregatorRegistry(); -const workerMetrics: Record = {}; +const workerMetrics = new Map(); export const registerWorkerMetrics = (workerId: number, metrics: string): void => { - workerMetrics[workerId] = metrics; + workerMetrics.set(workerId, metrics); +}; + +export const deregisterWorkerMetrics = (workerId: number): void => { + workerMetrics.delete(workerId); }; export const startServer = async (): Promise => { @@ -81,7 +85,7 @@ export const startServer = async (): Promise => { app.get('/metrics', async (_req, res) => { res.set('Content-Type', aggregatorRegistry.contentType); - const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics(); + const metrics = await AggregatorRegistry.aggregate(Array.from(workerMetrics.values())).metrics(); res.send(metrics); }); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 0f7268a09..3ef5991c9 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -1,7 +1,7 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; -import { registerWorkerMetrics } from '../metrics'; +import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; export enum Status { @@ -60,6 +60,8 @@ export default class StreamHandler { } async stop (): Promise { + deregisterWorkerMetrics(this.worker.threadId); + await this.worker.terminate(); }