From e2e8e53887931da801fe7719c08e2422a615c80d Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Wed, 14 Feb 2024 11:30:33 +1300 Subject: [PATCH] feat: Remove executor metrics on stop (#557) When an Executor is stopped, the metrics exposed from that worker still persist. As metrics are aggregated across all workers, the metrics for a given indexer would be aggregated across all previous workers, creating incorrect metrics. --- runner/src/metrics.ts | 10 +++++++--- runner/src/stream-handler/stream-handler.ts | 4 +++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index b30d2011c..89bd8e47e 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -67,10 +67,14 @@ export const METRICS = { }; const aggregatorRegistry = new AggregatorRegistry(); -const workerMetrics: Record = {}; +const workerMetrics = new Map(); export const registerWorkerMetrics = (workerId: number, metrics: string): void => { - workerMetrics[workerId] = metrics; + workerMetrics.set(workerId, metrics); +}; + +export const deregisterWorkerMetrics = (workerId: number): void => { + workerMetrics.delete(workerId); }; export const startServer = async (): Promise => { @@ -81,7 +85,7 @@ export const startServer = async (): Promise => { app.get('/metrics', async (_req, res) => { res.set('Content-Type', aggregatorRegistry.contentType); - const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics(); + const metrics = await AggregatorRegistry.aggregate(Array.from(workerMetrics.values())).metrics(); res.send(metrics); }); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 0f7268a09..3ef5991c9 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -1,7 +1,7 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; -import { registerWorkerMetrics } from '../metrics'; +import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; export enum Status { @@ -60,6 +60,8 @@ export default class StreamHandler { } async stop (): Promise { + deregisterWorkerMetrics(this.worker.threadId); + await this.worker.terminate(); }