Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): apply logs from receipt #3215

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft

feat(store-sync): apply logs from receipt #3215

wants to merge 17 commits into from

Conversation

holic
Copy link
Member

@holic holic commented Sep 20, 2024

No description provided.

Copy link

changeset-bot bot commented Sep 20, 2024

⚠️ No Changeset found

Latest commit: 2c74e64

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@holic holic changed the title feat(store-sync): apply optimistic logs feat(store-sync): apply logs from receipt Sep 20, 2024
packages/store-sync/src/createStoreSync.ts Outdated Show resolved Hide resolved
Comment on lines +248 to +270
return concat(
storageAdapterLock$.pipe(
first((lock) => lock === false),
tap(() => storageAdapterLock$.next(true)),
ignoreElements(),
),
storedBlock$.pipe(
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;
}),
),
of(true).pipe(
concatMap(async () => {
if (lastBlockNumberProcessed != null) {
await applyOptimisticLogs(lastBlockNumberProcessed);
}
}),
tap(() => storageAdapterLock$.next(false)),
ignoreElements(),
),
);
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the storageAdapterLock$ logic need to be rxjs based or could this be a boolean variable? i always need extra brain cycles to think through rxjs logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an observable because we need to subscribe to it in two places (one here to wait for the lock to free up and one in waitForTransaction)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's because it's late but i've stared at this part of the code for a while and am still not fully sure what's going on 🙈 What is the concat of three streams doing, out of which the first and last have a ignoreElements? why is the last stream of(true)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we do something like

for(const block of storedBlocks) {
  storageAdapterLock.next(true);
  await applyOptimisticLogs(block.blockNumber);
  storageAdapterLock.next(false);
}

or

storedBlock$.pipe(concatMap(block => {
  storageAdapterLock$.next(true);
  await applyOptimisticLogs(lastBlockNumberProcessed);
  storageAdapterLock$.next(false);
}))

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this took a bunch of iteration to get here but here's the idea:

The current approach turns an async generator into a stream of blocks, one emission per yield of the generator. I originally had your suggested approach above, but found we were applying optimistic logs for each block rather than each range of blocks, doing a lot more work than is necessary, especially when hydrating from RPC.

Instead what this does is takes the block range stream (storedBlock$ but could rename for clarity) and adds an operation before and after the stream: before to wait for + take out a lock and after to apply optimistic logs and release the lock. Concatenating arbitrary things to the nice stream of storedBlocks$ means we'd have to find/filter them out later, but we can encapsulate that with ignoreElements() which basically empties the stream we added to the start/end and only completes/errors the stream.

So it works like this:

  • create a block stream/observable for the range of blocks
  • before we start evaluating that stream (and thus fetching+storing data), force the stream to wait for the lock to release in case there are existing pending operations
  • once unlocked, take out a lock for the duration of the range
  • process each block in the range
  • once range is done, apply optimistic logs and release the lock before exiting the stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants