Skip to content

Commit

Permalink
fix: Surface metrics to main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 15, 2023
1 parent 2fd35a1 commit 843a402
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
16 changes: 13 additions & 3 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import { Worker } from 'worker_threads';

import * as metrics from './metrics';
import { METRICS, startServer as startMetricsServer } from './metrics';
import RedisClient from './redis-client';

const redisClient = new RedisClient();

metrics.startServer().catch((err) => {
startMetricsServer().catch((err) => {
console.error('Failed to start metrics server', err);
});

const STREAM_HANDLER_THROTTLE_MS = 500;

type StreamHandlers = Record<string, Worker>;

interface Metric {
type: keyof typeof METRICS
labels: Record<string, string>
value: number
};

void (async function main () {
try {
const streamHandlers: StreamHandlers = {};
Expand All @@ -27,7 +33,11 @@ void (async function main () {

const worker = new Worker('./dist/worker.js');
worker.postMessage({ streamKey });
// const handler = processStream(streamKey);

worker.on('message', (message: Metric) => {
METRICS[message.type].labels(message.labels).set(message.value);
});

streamHandlers[streamKey] = worker;
});

Expand Down
9 changes: 7 additions & 2 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import express from 'express';
import promClient from 'prom-client';

export const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({
const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({
name: 'queryapi_runner_unprocessed_stream_messages',
help: 'Number of Redis Stream messages not yet processed',
labelNames: ['indexer', 'type'],
});

export const EXECUTION_DURATION = new promClient.Gauge({
const EXECUTION_DURATION = new promClient.Gauge({
name: 'queryapi_runner_execution_duration_milliseconds',
help: 'Time taken to execute an indexer function',
labelNames: ['indexer', 'type'],
});

export const METRICS = {
EXECUTION_DURATION,
UNPROCESSED_STREAM_MESSAGES,
};

export const startServer = async (): Promise<void> => {
const app = express();

Expand Down
13 changes: 10 additions & 3 deletions runner/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { parentPort } from 'worker_threads';

import Indexer from './indexer';
import RedisClient from './redis-client';
import * as metrics from './metrics';

const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();
Expand Down Expand Up @@ -46,8 +45,16 @@ parentPort?.on('message', async ({ streamKey }) => {

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);
metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime);
parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
});
parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
});

console.log(`Success: ${indexerName}`);
} catch (err) {
Expand Down

0 comments on commit 843a402

Please sign in to comment.