Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-1118 Parallelize stream processing with worker threads #191

Merged
merged 6 commits into from
Sep 19, 2023

Conversation

morgsmccauley
Copy link
Collaborator

@morgsmccauley morgsmccauley commented Sep 14, 2023

This PR parallelises indexer processing by moving them to their own threads. This means that CPU is no longer shared across all indexers on the single thread. I/O is still shared across all threads, but shouldn't be an issue as this isn't the limiting factor of Runner.

@morgsmccauley morgsmccauley force-pushed the feat/worker-threads branch 2 times, most recently from 2d69c4d to 71be610 Compare September 15, 2023 02:26
@morgsmccauley morgsmccauley force-pushed the feat/worker-threads branch 3 times, most recently from 42ecd8c to e82e2ef Compare September 15, 2023 04:36
@morgsmccauley morgsmccauley marked this pull request as ready for review September 15, 2023 05:19
@morgsmccauley morgsmccauley requested a review from a team as a code owner September 15, 2023 05:19
name: 'queryapi_runner_execution_duration_milliseconds',
help: 'Time taken to execute an indexer function',
labelNames: ['indexer', 'type'],
});

export const METRICS = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exporting as a single group allows us to automatically create associated types, see: Metric in StreamHandler

import { type Message } from './types';
import { METRICS } from '../metrics';

export default class StreamHandler {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a wrapper around Worker which will (eventually) expose a nicer API to the consumer, meaning they don't have to deal with sending/receiving messages.

@@ -0,0 +1,9 @@
import { type METRICS } from '../metrics';
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared types between the worker and api (StreamHandler)


const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

parentPort?.postMessage({
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics written directly from another thread won't be picked up by the main thread, where the metrics server aggregates and exposes them. We therefore need to send the metric to the main thread and write them there.

@morgsmccauley morgsmccauley changed the title feat: Parallelize stream processing with worker threads DPLT-1118 Parallelize stream processing with worker threads Sep 15, 2023
@morgsmccauley morgsmccauley merged commit 4dbf605 into main Sep 19, 2023
@morgsmccauley morgsmccauley deleted the feat/worker-threads branch September 19, 2023 03:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant