Skip to content

Commit

Permalink
fix: Surface metrics to main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 14, 2023
1 parent 2fd35a1 commit 6983e21
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
12 changes: 11 additions & 1 deletion runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ const STREAM_HANDLER_THROTTLE_MS = 500;

type StreamHandlers = Record<string, Worker>;

interface Metric {
type: 'UNPROCESSED_STREAM_MESSAGES' | 'EXECUTION_DURATION'
labels: Record<string, string>
value: number
};

void (async function main () {
try {
const streamHandlers: StreamHandlers = {};
Expand All @@ -27,7 +33,11 @@ void (async function main () {

const worker = new Worker('./dist/worker.js');
worker.postMessage({ streamKey });
// const handler = processStream(streamKey);

worker.on('message', (message: Metric) => {
metrics[message.type].labels(message.labels).set(message.value);
});

streamHandlers[streamKey] = worker;
});

Expand Down
13 changes: 10 additions & 3 deletions runner/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { parentPort } from 'worker_threads';

import Indexer from './indexer';
import RedisClient from './redis-client';
import * as metrics from './metrics';

const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();
Expand Down Expand Up @@ -46,8 +45,16 @@ parentPort?.on('message', async ({ streamKey }) => {

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);
metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime);
parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
});
parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
});

console.log(`Success: ${indexerName}`);
} catch (err) {
Expand Down

0 comments on commit 6983e21

Please sign in to comment.