diff --git a/runner/src/stream-handler.ts b/runner/src/stream-handler.ts deleted file mode 100644 index 8edbc57c2..000000000 --- a/runner/src/stream-handler.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; - -import Indexer from './indexer'; -import RedisClient from './redis-client'; -import { METRICS } from './metrics'; - -interface Metric { - type: keyof typeof METRICS - labels: Record - value: number -}; - -export default class StreamHandler { - private readonly worker?: Worker; - - constructor ( - streamKey: string - ) { - if (isMainThread) { - this.worker = new Worker(__filename, { - workerData: { - streamKey, - }, - }); - - this.worker.on('message', this.handleMessage); - } else { - throw new Error('StreamHandler should not be instantiated in a worker thread'); - } - } - - private handleMessage (metric: Metric): void { - METRICS[metric.type].labels(metric.labels).set(metric.value); - } -} - -if (!isMainThread) { - const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; - - void (async function main () { - const indexer = new Indexer('mainnet'); - const redisClient = new RedisClient(); - - const { streamKey } = workerData; - - console.log('Started processing stream: ', streamKey); - - let indexerName = ''; - - while (true) { - try { - const startTime = performance.now(); - const streamType = redisClient.getStreamType(streamKey); - - const messages = await redisClient.getNextStreamMessage(streamKey); - const indexerConfig = await redisClient.getStreamStorage(streamKey); - - indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; - - if (messages == null) { - await sleep(1000); - continue; - } - - const [{ id, message }] = messages; - - const functions = { - [indexerName]: { - account_id: indexerConfig.account_id, - function_name: indexerConfig.function_name, - code: indexerConfig.code, - schema: indexerConfig.schema, - provisioned: false, - }, - }; - await indexer.runFunctions(Number(message.block_height), functions, false, { - provision: true, - }); - - await redisClient.deleteStreamMessage(streamKey, id); - - const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); - - parentPort?.postMessage({ - type: 'UNPROCESSED_STREAM_MESSAGES', - labels: { indexer: indexerName, type: streamType }, - value: unprocessedMessages?.length ?? 0, - } satisfies Metric); - - parentPort?.postMessage({ - type: 'EXECUTION_DURATION', - labels: { indexer: indexerName, type: streamType }, - value: performance.now() - startTime, - } satisfies Metric); - - console.log(`Success: ${indexerName}`); - } catch (err) { - await sleep(10000); - console.log(`Failed: ${indexerName}`, err); - } - } - })(); -} diff --git a/runner/src/stream-handler/index.ts b/runner/src/stream-handler/index.ts new file mode 100644 index 000000000..1b4a410f1 --- /dev/null +++ b/runner/src/stream-handler/index.ts @@ -0,0 +1 @@ +export { default } from './stream-handler'; diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts new file mode 100644 index 000000000..7e1fe2237 --- /dev/null +++ b/runner/src/stream-handler/stream-handler.ts @@ -0,0 +1,29 @@ +import path from 'path'; +import { Worker, isMainThread } from 'worker_threads'; + +import { type Message } from './types'; +import { METRICS } from '../metrics'; + +export default class StreamHandler { + private readonly worker?: Worker; + + constructor ( + streamKey: string + ) { + if (isMainThread) { + this.worker = new Worker(path.join(__dirname, 'worker.js'), { + workerData: { + streamKey, + }, + }); + + this.worker.on('message', this.handleMessage); + } else { + throw new Error('StreamHandler should not be instantiated in a worker thread'); + } + } + + private handleMessage (message: Message): void { + METRICS[message.type].labels(message.labels).set(message.value); + } +} diff --git a/runner/src/stream-handler/types.ts b/runner/src/stream-handler/types.ts new file mode 100644 index 000000000..945248e1b --- /dev/null +++ b/runner/src/stream-handler/types.ts @@ -0,0 +1,9 @@ +import { type METRICS } from '../metrics'; + +interface Metric { + type: keyof typeof METRICS + labels: Record + value: number +}; + +export type Message = Metric; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts new file mode 100644 index 000000000..bcd2602a3 --- /dev/null +++ b/runner/src/stream-handler/worker.ts @@ -0,0 +1,75 @@ +import { isMainThread, parentPort, workerData } from 'worker_threads'; + +import Indexer from '../indexer'; +import RedisClient from '../redis-client'; +import { type Message } from './types'; + +if (isMainThread) { + throw new Error('Worker should not be run on main thread'); +} + +const indexer = new Indexer('mainnet'); +const redisClient = new RedisClient(); + +const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; + +void (async function main () { + const { streamKey } = workerData; + + console.log('Started processing stream: ', streamKey); + + let indexerName = ''; + + while (true) { + try { + const startTime = performance.now(); + const streamType = redisClient.getStreamType(streamKey); + + const messages = await redisClient.getNextStreamMessage(streamKey); + const indexerConfig = await redisClient.getStreamStorage(streamKey); + + indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; + + if (messages == null) { + await sleep(500); + continue; + } + + const [{ id, message }] = messages; + + const functions = { + [indexerName]: { + account_id: indexerConfig.account_id, + function_name: indexerConfig.function_name, + code: indexerConfig.code, + schema: indexerConfig.schema, + provisioned: false, + }, + }; + await indexer.runFunctions(Number(message.block_height), functions, false, { + provision: true, + }); + + await redisClient.deleteStreamMessage(streamKey, id); + + const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + + parentPort?.postMessage({ + type: 'UNPROCESSED_STREAM_MESSAGES', + labels: { indexer: indexerName, type: streamType }, + value: unprocessedMessages?.length ?? 0, + } satisfies Message); + + parentPort?.postMessage({ + type: 'EXECUTION_DURATION', + labels: { indexer: indexerName, type: streamType }, + value: performance.now() - startTime, + } satisfies Message); + + console.log(`Success: ${indexerName}`); + } catch (err) { + await sleep(10000); + console.log(`Failed: ${indexerName}`, err); + } + } +})();