Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store-sync): apply logs from receipt #3215

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/common/src/LruMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ export class LruMap<key, value> extends Map<key, value> {
this.maxSize = size;
}

override get(key: key): value | undefined {
const value = super.get(key);
if (this.has(key)) {
this.delete(key);
this.set(key, value as never);
}
return value;
}

override set(key: key, value: value): this {
super.set(key, value);
if (this.maxSize && this.size > this.maxSize) {
Expand Down
121 changes: 92 additions & 29 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { storeEventsAbi } from "@latticexyz/store";
import { GetTransactionReceiptErrorType, Hex } from "viem";
import { GetTransactionReceiptErrorType, Hex, parseEventLogs } from "viem";
import {
StorageAdapter,
StorageAdapterBlock,
Expand All @@ -10,7 +10,7 @@ import {
internalTableIds,
WaitForTransactionResult,
} from "./common";
import { createBlockStream } from "@latticexyz/block-logs-stream";
import { createBlockStream, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import {
filter,
map,
Expand All @@ -25,14 +25,17 @@ import {
catchError,
shareReplay,
combineLatest,
scan,
mergeMap,
BehaviorSubject,
ignoreElements,
first,
} from "rxjs";
import { debug as parentDebug } from "./debug";
import { SyncStep } from "./SyncStep";
import { bigIntMax, chunk, isDefined, waitForIdle } from "@latticexyz/common/utils";
import { getSnapshot } from "./getSnapshot";
import { fetchAndStoreLogs } from "./fetchAndStoreLogs";
import { LruMap } from "@latticexyz/common";

const debug = parentDebug.extend("createStoreSync");

Expand Down Expand Up @@ -199,6 +202,27 @@ export async function createStoreSync({
let endBlock: bigint | null = null;
let lastBlockNumberProcessed: bigint | null = null;

// For chains that provide guaranteed receipts ahead of block mining, we can apply the logs immediately.
// This works because, once the block is mined, the same logs will be applied. Store events are defined in
// such a way that reapplying the same logs will mean that the storage adapter
// is kept up to date.

let optimisticLogs: readonly StoreEventsLog[] = [];
async function applyOptimisticLogs(blockNumber: bigint): Promise<void> {
const logs = optimisticLogs.filter((log) => log.blockNumber > blockNumber);
if (logs.length) {
debug("applying", logs.length, "optimistic logs");
const blocks = groupLogsByBlockNumber(logs).filter((block) => block.logs.length);
for (const block of blocks) {
debug("applying optimistic logs for block", block.blockNumber);
await storageAdapter(block);
}
}
optimisticLogs = logs;
}

const storageAdapterLock$ = new BehaviorSubject(false);

const storedBlock$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
Expand All @@ -219,29 +243,49 @@ export async function createStoreSync({
logFilter,
});

return from(storedBlocks);
}),
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;
const storedBlock$ = from(storedBlocks).pipe(share());

return concat(
storageAdapterLock$.pipe(
first((lock) => lock === false),
tap(() => storageAdapterLock$.next(true)),
ignoreElements(),
),
storedBlock$.pipe(
tap(({ blockNumber, logs }) => {
debug("stored", logs.length, "logs for block", blockNumber);
lastBlockNumberProcessed = blockNumber;
}),
),
of(true).pipe(
concatMap(async () => {
if (lastBlockNumberProcessed != null) {
await applyOptimisticLogs(lastBlockNumberProcessed);
}
}),
tap(() => storageAdapterLock$.next(false)),
ignoreElements(),
),
);
}),
Comment on lines +248 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the storageAdapterLock$ logic need to be rxjs based or could this be a boolean variable? i always need extra brain cycles to think through rxjs logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an observable because we need to subscribe to it in two places (one here to wait for the lock to free up and one in waitForTransaction)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's because it's late but i've stared at this part of the code for a while and am still not fully sure what's going on 🙈 What is the concat of three streams doing, out of which the first and last have a ignoreElements? why is the last stream of(true)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we do something like

for(const block of storedBlocks) {
  storageAdapterLock.next(true);
  await applyOptimisticLogs(block.blockNumber);
  storageAdapterLock.next(false);
}

or

storedBlock$.pipe(concatMap(block => {
  storageAdapterLock$.next(true);
  await applyOptimisticLogs(lastBlockNumberProcessed);
  storageAdapterLock$.next(false);
}))

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this took a bunch of iteration to get here but here's the idea:

The current approach turns an async generator into a stream of blocks, one emission per yield of the generator. I originally had your suggested approach above, but found we were applying optimistic logs for each block rather than each range of blocks, doing a lot more work than is necessary, especially when hydrating from RPC.

Instead what this does is takes the block range stream (storedBlock$ but could rename for clarity) and adds an operation before and after the stream: before to wait for + take out a lock and after to apply optimistic logs and release the lock. Concatenating arbitrary things to the nice stream of storedBlocks$ means we'd have to find/filter them out later, but we can encapsulate that with ignoreElements() which basically empties the stream we added to the start/end and only completes/errors the stream.

So it works like this:

  • create a block stream/observable for the range of blocks
  • before we start evaluating that stream (and thus fetching+storing data), force the stream to wait for the lock to release in case there are existing pending operations
  • once unlocked, take out a lock for the duration of the range
  • process each block in the range
  • once range is done, apply optimistic logs and release the lock before exiting the stream

tap(({ blockNumber }) => {
if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
const processedBlocks = blockNumber - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 10,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
lastBlockNumberProcessed: blockNumber,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
lastBlockNumberProcessed: blockNumber,
message: "All caught up!",
});
}
Expand All @@ -250,27 +294,27 @@ export async function createStoreSync({
share(),
);

const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(share());

// keep 10 blocks worth processed transactions in memory
const recentBlocksWindow = 10;
// most recent block first, for ease of pulling the first one off the array
const recentBlocks$ = storedBlockLogs$.pipe(
scan<StorageAdapterBlock, StorageAdapterBlock[]>(
(recentBlocks, block) => [block, ...recentBlocks].slice(0, recentBlocksWindow),
[],
),
filter((recentBlocks) => recentBlocks.length > 0),
shareReplay(1),
const recentBlocks$ = new BehaviorSubject<StorageAdapterBlock[]>([]);

const storedBlockLogs$ = concat(storedInitialBlockLogs$, storedBlock$).pipe(
tap((block) => {
// most recent block first, for ease of pulling the first one off the array
recentBlocks$.next([block, ...recentBlocks$.value].slice(0, recentBlocksWindow));
}),
share(),
);

// TODO: move to its own file so we can test it, have its own debug instance, etc.
async function waitForTransaction(tx: Hex): Promise<WaitForTransactionResult> {
debug("waiting for tx", tx);
const waitPromises = new LruMap<Hex, Promise<WaitForTransactionResult>>(1024);
function waitForTransaction(tx: Hex): Promise<WaitForTransactionResult> {
const existingPromise = waitPromises.get(tx);
if (existingPromise) return existingPromise;

// This currently blocks for async call on each block processed
// We could potentially speed this up a tiny bit by racing to see if 1) tx exists in processed block or 2) fetch tx receipt for latest block processed
const hasTransaction$ = recentBlocks$.pipe(
const receipt$ = recentBlocks$.pipe(
// We use `mergeMap` instead of `concatMap` here to send the fetch request immediately when a new block range appears,
// instead of sending the next request only when the previous one completed.
mergeMap(async (blocks) => {
Expand All @@ -284,11 +328,26 @@ export async function createStoreSync({

try {
const lastBlock = blocks[0];
debug("fetching tx receipt for block", lastBlock.blockNumber);
const { status, blockNumber, transactionHash } = await publicClient.getTransactionReceipt({ hash: tx });
if (lastBlock.blockNumber >= blockNumber) {
return { status, blockNumber, transactionHash };
debug("fetching tx receipt after seeing block", lastBlock.blockNumber);
const receipt = await publicClient.getTransactionReceipt({ hash: tx });
debug("got receipt", receipt.status);
if (receipt.status === "success") {
const logs = parseEventLogs({ abi: storeEventsAbi, logs: receipt.logs });
if (logs.length) {
// wait for lock to clear
// unclear what happens if two waitForTransaction calls are triggered simultaneously and both get released for the same lock emission?
await firstValueFrom(storageAdapterLock$.pipe(filter((lock) => lock === false)));
storageAdapterLock$.next(true);
optimisticLogs = [...optimisticLogs, ...logs];
await applyOptimisticLogs(lastBlock.blockNumber);
storageAdapterLock$.next(false);
}
}
return {
status: receipt.status,
blockNumber: receipt.blockNumber,
transactionHash: receipt.transactionHash,
};
} catch (e) {
const error = e as GetTransactionReceiptErrorType;
if (error.name === "TransactionReceiptNotFoundError") {
Expand All @@ -297,9 +356,13 @@ export async function createStoreSync({
throw error;
}
}),
filter(isDefined),
);

return await firstValueFrom(hasTransaction$.pipe(filter(isDefined)));
debug("waiting for tx", tx);
const promise = firstValueFrom(receipt$);
waitPromises.set(tx, promise);
return promise;
}

return {
Expand Down
8 changes: 4 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading