Skip to content

Commit

Permalink
feat: Parallelize stream processing with workers threads
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 14, 2023
1 parent f0c3a53 commit 2fd35a1
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 53 deletions.
60 changes: 7 additions & 53 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Indexer from './indexer';
import { Worker } from 'worker_threads';

import * as metrics from './metrics';
import RedisClient from './redis-client';

const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();

metrics.startServer().catch((err) => {
Expand All @@ -11,55 +11,7 @@ metrics.startServer().catch((err) => {

const STREAM_HANDLER_THROTTLE_MS = 500;

const processStream = async (streamKey: string): Promise<void> => {
console.log('Started processing stream: ', streamKey);

let indexerName = '';

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
continue;
}

const [{ id, message }] = messages;

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
await indexer.runFunctions(Number(message.block_height), functions, false, {
provision: true,
});

await redisClient.deleteStreamMessage(streamKey, id);

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);
metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime);

console.log(`Success: ${indexerName}`);
} catch (err) {
console.log(`Failed: ${indexerName}`, err);
}
}
};

type StreamHandlers = Record<string, Promise<void>>;
type StreamHandlers = Record<string, Worker>;

void (async function main () {
try {
Expand All @@ -73,8 +25,10 @@ void (async function main () {
return;
}

const handler = processStream(streamKey);
streamHandlers[streamKey] = handler;
const worker = new Worker('./dist/worker.js');
worker.postMessage({ streamKey });
// const handler = processStream(streamKey);
streamHandlers[streamKey] = worker;
});

await new Promise((resolve) =>
Expand Down
57 changes: 57 additions & 0 deletions runner/src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { parentPort } from 'worker_threads';

import Indexer from './indexer';
import RedisClient from './redis-client';
import * as metrics from './metrics';

const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();

// eslint-disable-next-line
parentPort?.on('message', async ({ streamKey }) => {
console.log('Started processing stream: ', streamKey);

let indexerName = '';

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
continue;
}

const [{ id, message }] = messages;

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
await indexer.runFunctions(Number(message.block_height), functions, false, {
provision: true,
});

await redisClient.deleteStreamMessage(streamKey, id);

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

metrics.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);
metrics.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - startTime);

console.log(`Success: ${indexerName}`);
} catch (err) {
console.log(`Failed: ${indexerName}`, err);
}
}
});

0 comments on commit 2fd35a1

Please sign in to comment.