diff --git a/runner/src/index.ts b/runner/src/index.ts index d12adf170..3e568e9b8 100644 --- a/runner/src/index.ts +++ b/runner/src/index.ts @@ -13,6 +13,12 @@ const STREAM_HANDLER_THROTTLE_MS = 500; type StreamHandlers = Record; +interface Metric { + type: 'UNPROCESSED_STREAM_MESSAGES' | 'EXECUTION_DURATION' + labels: Record + value: number +}; + void (async function main () { try { const streamHandlers: StreamHandlers = {}; @@ -27,7 +33,11 @@ void (async function main () { const worker = new Worker('./dist/worker.js'); worker.postMessage({ streamKey }); - // const handler = processStream(streamKey); + + worker.on('message', (message: Metric) => { + metrics[message.type].labels(message.labels).set(message.value); + }); + streamHandlers[streamKey] = worker; }); diff --git a/runner/src/worker.ts b/runner/src/worker.ts index ca931fc8d..9d79d61a9 100644 --- a/runner/src/worker.ts +++ b/runner/src/worker.ts @@ -2,7 +2,6 @@ import { parentPort } from 'worker_threads'; import Indexer from './indexer'; import RedisClient from './redis-client'; -import * as metrics from './metrics'; const indexer = new Indexer('mainnet'); const redisClient = new RedisClient(); @@ -46,8 +45,16 @@ parentPort?.on('message', async ({ streamKey }) => { const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); - metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime); + parentPort?.postMessage({ + type: 'UNPROCESSED_STREAM_MESSAGES', + labels: { indexer: indexerName, type: streamType }, + value: unprocessedMessages?.length ?? 0, + }); + parentPort?.postMessage({ + type: 'EXECUTION_DURATION', + labels: { indexer: indexerName, type: streamType }, + value: performance.now() - startTime, + }); console.log(`Success: ${indexerName}`); } catch (err) {