Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-Fetch Streamer Messages #264

Closed
7 tasks done
Tracked by #204
darunrs opened this issue Sep 29, 2023 · 0 comments · Fixed by #269
Closed
7 tasks done
Tracked by #204

Pre-Fetch Streamer Messages #264

darunrs opened this issue Sep 29, 2023 · 0 comments · Fixed by #269
Assignees

Comments

@darunrs
Copy link
Collaborator

darunrs commented Sep 29, 2023

Historical streamer messages are not fetched in coordinator prior to the IndexerRunner call. As a result, we cannot apply the latency saving benefits of having coordinator cache the streamer message for use by runner. Instead, we want to pre fetch from S3 so that runner invocations don't have to wait for the streamer message to be retrieved from S3. Instead, these messages will be pre fetched and awaited in an array to ensure in order processing of block heights.

Below is the current flow for historical processing:

  1. Coordinator retrieves timestamp for the block height that historical processing is starting from as well as current block height. It uses this to see which days to look for index files.
  2. Index files are fils generated for each day an indexer is active. They contain information such as block heights which the particular indexer function was applied for. Coordinator fetches each index file available for the indexer starting from the day the starting block height falls into.
  3. Coordinator parses the block heights from the file and puts them in the historical redis stream. This is where the divergence between real time and historical lies. Real time does not have any index files so it reads the streamer message from S3 to get data including block height. This block height is put into the real time stream.
  4. Runner reads the block height from the historical stream. It pulls the streamer message from S3, parses it, and uses it for execution. This leads to each invocation taking at least 200ms if not more. I've seen as high as 700ms in a sample size of 20 invocations. 99th percentile might be much higher.

Below is the new workflow:

  1. Coordinator functionality remains the same.
  2. In runner, fetch X blocks from S3 as a promise.
  3. Load the promises into an array, which is used as a queue.
  4. Delete the block height from the stream, for each block height successfully placed on queue.
  5. Await the first block in the queue. Upon completion of the promise, trigger the function call and pass in the loaded data.

I've also made it so that real-time also uses prefetch mechanism on top of the existing caching.

While an indexer function is running, several other blocks are being loaded simultaneously. For each loop, we ensure the array is as full as possible. This ensures few functions are waiting for the block instead of all of them.

Tasks

Preview Give feedback
@darunrs darunrs self-assigned this Sep 29, 2023
@darunrs darunrs changed the title Store Historical Streamer Message in Redis Cache Historical Streamer Message in Redis Oct 2, 2023
@darunrs darunrs changed the title Cache Historical Streamer Message in Redis Pre-Fetch Historical Streamer Messages Oct 4, 2023
@darunrs darunrs linked a pull request Oct 5, 2023 that will close this issue
@darunrs darunrs changed the title Pre-Fetch Historical Streamer Messages Pre-Fetch Streamer Messages Nov 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant