diff --git a/runner/src/index.ts b/runner/src/index.ts index d12adf170..ff824dd12 100644 --- a/runner/src/index.ts +++ b/runner/src/index.ts @@ -1,11 +1,11 @@ import { Worker } from 'worker_threads'; -import * as metrics from './metrics'; +import { METRICS, startServer as startMetricsServer } from './metrics'; import RedisClient from './redis-client'; const redisClient = new RedisClient(); -metrics.startServer().catch((err) => { +startMetricsServer().catch((err) => { console.error('Failed to start metrics server', err); }); @@ -13,6 +13,12 @@ const STREAM_HANDLER_THROTTLE_MS = 500; type StreamHandlers = Record; +interface Metric { + type: keyof typeof METRICS + labels: Record + value: number +}; + void (async function main () { try { const streamHandlers: StreamHandlers = {}; @@ -27,7 +33,11 @@ void (async function main () { const worker = new Worker('./dist/worker.js'); worker.postMessage({ streamKey }); - // const handler = processStream(streamKey); + + worker.on('message', (message: Metric) => { + METRICS[message.type].labels(message.labels).set(message.value); + }); + streamHandlers[streamKey] = worker; }); diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index f4fbeb801..65ef6b990 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,18 +1,23 @@ import express from 'express'; import promClient from 'prom-client'; -export const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({ +const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({ name: 'queryapi_runner_unprocessed_stream_messages', help: 'Number of Redis Stream messages not yet processed', labelNames: ['indexer', 'type'], }); -export const EXECUTION_DURATION = new promClient.Gauge({ +const EXECUTION_DURATION = new promClient.Gauge({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', labelNames: ['indexer', 'type'], }); +export const METRICS = { + EXECUTION_DURATION, + UNPROCESSED_STREAM_MESSAGES, +}; + export const startServer = async (): Promise => { const app = express(); diff --git a/runner/src/worker.ts b/runner/src/worker.ts index ca931fc8d..9d79d61a9 100644 --- a/runner/src/worker.ts +++ b/runner/src/worker.ts @@ -2,7 +2,6 @@ import { parentPort } from 'worker_threads'; import Indexer from './indexer'; import RedisClient from './redis-client'; -import * as metrics from './metrics'; const indexer = new Indexer('mainnet'); const redisClient = new RedisClient(); @@ -46,8 +45,16 @@ parentPort?.on('message', async ({ streamKey }) => { const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); - metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime); + parentPort?.postMessage({ + type: 'UNPROCESSED_STREAM_MESSAGES', + labels: { indexer: indexerName, type: streamType }, + value: unprocessedMessages?.length ?? 0, + }); + parentPort?.postMessage({ + type: 'EXECUTION_DURATION', + labels: { indexer: indexerName, type: streamType }, + value: performance.now() - startTime, + }); console.log(`Success: ${indexerName}`); } catch (err) {