Skip to content

Commit

Permalink
refactor: Split worker code and StreamHandler in to co-located files
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 15, 2023
1 parent 49a3a4b commit 6cfc43c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 103 deletions.
103 changes: 0 additions & 103 deletions runner/src/stream-handler.ts

This file was deleted.

1 change: 1 addition & 0 deletions runner/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default } from './stream-handler';
29 changes: 29 additions & 0 deletions runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import path from 'path';
import { Worker, isMainThread } from 'worker_threads';

import { type Message } from './types';
import { METRICS } from '../metrics';

export default class StreamHandler {
private readonly worker?: Worker;

constructor (
streamKey: string
) {
if (isMainThread) {
this.worker = new Worker(path.join(__dirname, 'worker.js'), {
workerData: {
streamKey,
},
});

this.worker.on('message', this.handleMessage);
} else {
throw new Error('StreamHandler should not be instantiated in a worker thread');
}
}

private handleMessage (message: Message): void {
METRICS[message.type].labels(message.labels).set(message.value);
}
}
9 changes: 9 additions & 0 deletions runner/src/stream-handler/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { type METRICS } from '../metrics';

interface Metric {
type: keyof typeof METRICS
labels: Record<string, string>
value: number
};

export type Message = Metric;
75 changes: 75 additions & 0 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { isMainThread, parentPort, workerData } from 'worker_threads';

import Indexer from '../indexer';
import RedisClient from '../redis-client';
import { type Message } from './types';

if (isMainThread) {
throw new Error('Worker should not be run on main thread');
}

const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();

const sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };

void (async function main () {
const { streamKey } = workerData;

console.log('Started processing stream: ', streamKey);

let indexerName = '';

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
await sleep(1000);
continue;
}

const [{ id, message }] = messages;

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
await indexer.runFunctions(Number(message.block_height), functions, false, {
provision: true,
});

await redisClient.deleteStreamMessage(streamKey, id);

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
} satisfies Message);

parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
} satisfies Message);

console.log(`Success: ${indexerName}`);
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName}`, err);
}
}
})();

0 comments on commit 6cfc43c

Please sign in to comment.