-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Pre-Fetch Streamer Messages #269
Conversation
e0604e0
to
0024859
Compare
f71a38d
to
4e10f80
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great start on this - left a couple comments :)
I'm still experimenting with the blocking values. In the meantime, I released the changes I've made so far. Let me know what you think! |
ae1aef8
to
43911fd
Compare
I did some more experimentation and found that blocking does in fact block both loops, not just the producer loop. As a result, once no stream messages are present, execution of the functions becomes slow as the xread blocks the whole thread for the blocking duration. Removing it returned the speed improvements. In addition, I ran a test against sweat_blockheight where I appended the block-height of each stream message to one file and then appended the block height to a second file after runFunction is called. I diff'd the two files and found no differences. This was enough to verify that every message in the stream is seen and processed exactly once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - great work :)
Just need to fix failing tests |
This reverts commit 262b183.
Historical streamer messages are not fetched in coordinator prior to the IndexerRunner call. As a result, we cannot apply the latency saving benefits of having coordinator cache the streamer message for use by runner. Instead, we want to pre fetch from S3 so that runner invocations don't have to wait for the streamer message to be retrieved from S3.
In addition, it's possible for real-time messages to backup temporarily preventing the cached message from being used. So, we also want to prefetch any messages which aren't found in the cache.
The new workflow works by having two loops for each worker thread: a producer and a consumer. The producer loads a promise for fetching the block (either from cache or S3) into an array. The consumer then removes the first element from the array and processes it, deleting the streamer message upon success. While one block is being processed, the other blocks are being fetched. This ensures that wait time is minimal. The producer loop attempts to keep the array as close to full as possible.