Skip to content

Commit

Permalink
refactor: Move Worker instantiation next to implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 15, 2023
1 parent f621411 commit 71be610
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 86 deletions.
23 changes: 5 additions & 18 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Worker } from 'worker_threads';

import { METRICS, startServer as startMetricsServer } from './metrics';
import { startServer as startMetricsServer } from './metrics';
import RedisClient from './redis-client';
import StreamHandler from './stream-handler';

const redisClient = new RedisClient();

Expand All @@ -11,13 +10,7 @@ startMetricsServer().catch((err) => {

const STREAM_HANDLER_THROTTLE_MS = 500;

type StreamHandlers = Record<string, Worker>;

interface Metric {
type: keyof typeof METRICS
labels: Record<string, string>
value: number
};
type StreamHandlers = Record<string, StreamHandler>;

void (async function main () {
try {
Expand All @@ -31,15 +24,9 @@ void (async function main () {
return;
}

const worker = new Worker('./dist/worker.js', {
workerData: { streamKey },
});

worker.on('message', (message: Metric) => {
METRICS[message.type].labels(message.labels).set(message.value);
});
const streamHandler = new StreamHandler(streamKey);

streamHandlers[streamKey] = worker;
streamHandlers[streamKey] = streamHandler;
});

await new Promise((resolve) =>
Expand Down
99 changes: 99 additions & 0 deletions runner/src/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

import Indexer from './indexer';
import RedisClient from './redis-client';
import { METRICS } from './metrics';

interface Metric {
type: keyof typeof METRICS
labels: Record<string, string>
value: number
};

export default class StreamHandler {
private readonly worker?: Worker;

constructor (
streamKey: string
) {
if (isMainThread) {
this.worker = new Worker(__filename, {
workerData: {
streamKey,
},
});

this.worker.on('message', this.handleMessage);
} else {
throw new Error('StreamHandler should not be instantiated in a worker thread');
}
}

private handleMessage (metric: Metric): void {
METRICS[metric.type].labels(metric.labels).set(metric.value);
}
}

if (!isMainThread) {
void (async function main () {
const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();

const { streamKey } = workerData;

console.log('Started processing stream: ', streamKey);

let indexerName = '';

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
continue;
}

const [{ id, message }] = messages;

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
await indexer.runFunctions(Number(message.block_height), functions, false, {
provision: true,
});

await redisClient.deleteStreamMessage(streamKey, id);

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
});

parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
});

console.log(`Success: ${indexerName}`);
} catch (err) {
console.log(`Failed: ${indexerName}`, err);
}
}
})();
}
68 changes: 0 additions & 68 deletions runner/src/worker.ts

This file was deleted.

0 comments on commit 71be610

Please sign in to comment.