Skip to content

Commit

Permalink
feat(common,store-sync): improve initial sync to not block returned p…
Browse files Browse the repository at this point in the history
…romise (#1315)
  • Loading branch information
holic authored Aug 21, 2023
1 parent 433078c commit bb6ada7
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 147 deletions.
23 changes: 23 additions & 0 deletions .changeset/flat-trainers-marry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
"@latticexyz/common": patch
"@latticexyz/store-sync": patch
---

Initial sync from indexer no longer blocks the promise returning from `createStoreSync`, `syncToRecs`, and `syncToSqlite`. This should help with rendering loading screens using the `SyncProgress` RECS component and avoid the long flashes of no content in templates.

By default, `syncToRecs` and `syncToSqlite` will start syncing (via observable subscription) immediately after called.

If your app needs to control when syncing starts, you can use the `startSync: false` option and then `blockStoreOperations$.subscribe()` to start the sync yourself. Just be sure to unsubscribe to avoid memory leaks.

```ts
const { blockStorageOperations$ } = syncToRecs({
...
startSync: false,
});

// start sync manually by subscribing to `blockStorageOperation$`
const subcription = blockStorageOperation$.subscribe();

// clean up subscription
subscription.unsubscribe();
```
2 changes: 1 addition & 1 deletion e2e/packages/sync-test/indexerSync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe("Sync from indexer", async () => {
await waitForInitialSync(page);

expect(asyncErrorHandler.getErrors()).toHaveLength(1);
expect(asyncErrorHandler.getErrors()[0]).toContain("couldn't get initial state from indexer");
expect(asyncErrorHandler.getErrors()[0]).toContain("error fetching initial state from indexer");
});

describe("indexer online", () => {
Expand Down
1 change: 1 addition & 0 deletions packages/common/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from "./curry";
export * from "./isDefined";
export * from "./isNotNull";
export * from "./wait";
export * from "./waitForIdle";
2 changes: 1 addition & 1 deletion packages/common/src/utils/wait.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
return new Promise<void>((resolve) => setTimeout(() => resolve(), ms));
}
5 changes: 5 additions & 0 deletions packages/common/src/utils/waitForIdle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export function waitForIdle(): Promise<void> {
return new Promise<void>((resolve) => {
requestIdleCallback(() => resolve());
});
}
10 changes: 6 additions & 4 deletions packages/store-sync/src/blockLogsToStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ export function blockLogsToStorage<TConfig extends StoreConfig = StoreConfig>({
.filter(isDefined);

// Then register tables before we start storing data in them
await registerTables({
blockNumber: block.blockNumber,
tables: newTables,
});
if (newTables.length > 0) {
await registerTables({
blockNumber: block.blockNumber,
tables: newTables,
});
}

const tablesToFetch = Array.from(
new Set(
Expand Down
266 changes: 179 additions & 87 deletions packages/store-sync/src/createStoreSync.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
import { ConfigToKeyPrimitives, ConfigToValuePrimitives, StoreConfig, storeEventsAbi } from "@latticexyz/store";
import { Hex, TransactionReceipt } from "viem";
import { SetRecordOperation, SyncOptions, SyncResult } from "./common";
import { Hex, TransactionReceipt, WaitForTransactionReceiptTimeoutError } from "viem";
import { SetRecordOperation, SyncOptions, SyncResult, TableWithRecords } from "./common";
import { createBlockStream, blockRangeToLogs, groupLogsByBlockNumber } from "@latticexyz/block-logs-stream";
import { filter, map, tap, mergeMap, from, concatMap, share, firstValueFrom } from "rxjs";
import {
filter,
map,
tap,
mergeMap,
from,
concat,
concatMap,
share,
firstValueFrom,
defer,
of,
catchError,
shareReplay,
combineLatest,
} from "rxjs";
import pRetry from "p-retry";
import { blockLogsToStorage } from "./blockLogsToStorage";
import { debug as parentDebug } from "./debug";
import { createIndexerClient } from "./trpc-indexer";
import { BlockLogsToStorageOptions } from "./blockLogsToStorage";
import { SyncStep } from "./SyncStep";
import { chunk } from "@latticexyz/common/utils";

const debug = parentDebug.extend("createStoreSync");

Expand All @@ -19,6 +35,7 @@ type CreateStoreSyncOptions<TConfig extends StoreConfig = StoreConfig> = SyncOpt
percentage: number;
latestBlockNumber: bigint;
lastBlockNumberProcessed: bigint;
message: string;
}) => void;
};

Expand All @@ -29,85 +46,145 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
onProgress,
address,
publicClient,
startBlock = 0n,
startBlock: initialStartBlock = 0n,
maxBlockRange,
initialState,
indexerUrl,
}: CreateStoreSyncOptions<TConfig>): Promise<CreateStoreSyncResult<TConfig>> {
if (indexerUrl != null && initialState == null) {
try {
const initialState$ = defer(
async (): Promise<
| {
blockNumber: bigint | null;
tables: TableWithRecords[];
}
| undefined
> => {
if (initialState) return initialState;
if (!indexerUrl) return;

debug("fetching initial state from indexer", indexerUrl);

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetching snapshot from indexer",
});

const indexer = createIndexerClient({ url: indexerUrl });
const chainId = publicClient.chain?.id ?? (await publicClient.getChainId());
initialState = await indexer.findAll.query({ chainId, address });
} catch (error) {
debug("couldn't get initial state from indexer", error);
const result = await indexer.findAll.query({ chainId, address });

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: 0n,
message: "Fetched snapshot from indexer",
});

return result;
}
}
).pipe(
catchError((error) => {
debug("error fetching initial state from indexer", error);

if (initialState != null) {
const { blockNumber, tables } = initialState;
if (blockNumber != null) {
debug("hydrating from initial state to block", initialState.blockNumber);
startBlock = blockNumber + 1n;
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: initialStartBlock,
message: "Failed to fetch snapshot from indexer",
});

await storageAdapter.registerTables({ blockNumber, tables });
return of(undefined);
}),
shareReplay(1)
);

const numRecords = initialState.tables.reduce((sum, table) => sum + table.records.length, 0);
const recordsPerProgressUpdate = Math.floor(numRecords / 100);
let recordsProcessed = 0;
let recordsProcessedSinceLastUpdate = 0;

for (const table of initialState.tables) {
await storageAdapter.storeOperations({
blockNumber,
operations: table.records.map(
(record) =>
({
type: "SetRecord",
address: table.address,
namespace: table.namespace,
name: table.name,
key: record.key as ConfigToKeyPrimitives<TConfig, typeof table.name>,
value: record.value as ConfigToValuePrimitives<TConfig, typeof table.name>,
} as const satisfies SetRecordOperation<TConfig>)
),
});
const startBlock$ = initialState$.pipe(
map((initialState) => initialState?.blockNumber ?? initialStartBlock),
// TODO: if start block is still 0, find via deploy event
tap((startBlock) => debug("starting sync from block", startBlock))
);

recordsProcessed += table.records.length;
recordsProcessedSinceLastUpdate += table.records.length;

if (recordsProcessedSinceLastUpdate > recordsPerProgressUpdate) {
recordsProcessedSinceLastUpdate = 0;
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: (recordsProcessed / numRecords) * 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: blockNumber,
});
}
const initialStorageOperations$ = initialState$.pipe(
filter(
(initialState): initialState is { blockNumber: bigint; tables: TableWithRecords[] } =>
initialState != null && initialState.blockNumber != null && initialState.tables.length > 0
),
concatMap(async ({ blockNumber, tables }) => {
debug("hydrating from initial state to block", blockNumber);

onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 0,
latestBlockNumber: 0n,
lastBlockNumberProcessed: blockNumber,
message: "Hydrating from snapshot",
});

await storageAdapter.registerTables({ blockNumber, tables });

const operations: SetRecordOperation<TConfig>[] = tables.flatMap((table) =>
table.records.map((record) => ({
type: "SetRecord",
address: table.address,
namespace: table.namespace,
name: table.name,
key: record.key as ConfigToKeyPrimitives<TConfig, typeof table.name>,
value: record.value as ConfigToValuePrimitives<TConfig, typeof table.name>,
}))
);

debug(`hydrated ${table.records.length} records for table ${table.namespace}:${table.name}`);
// Split snapshot operations into chunks so we can update the progress callback (and ultimately render visual progress for the user).
// This isn't ideal if we want to e.g. batch load these into a DB in a single DB tx, but we'll take it.
//
// Split into 50 equal chunks (for better `onProgress` updates) but only if we have 100+ items per chunk
const chunkSize = Math.max(100, Math.floor(operations.length / 50));
const chunks = Array.from(chunk(operations, chunkSize));
for (const [i, chunk] of chunks.entries()) {
await storageAdapter.storeOperations({ blockNumber, operations: chunk });
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: (i + chunk.length) / chunks.length,
latestBlockNumber: 0n,
lastBlockNumberProcessed: blockNumber,
message: "Hydrating from snapshot",
});
}
}
}

// TODO: if startBlock is still 0, find via deploy event
onProgress?.({
step: SyncStep.SNAPSHOT,
percentage: 100,
latestBlockNumber: 0n,
lastBlockNumberProcessed: blockNumber,
message: "Hydrated from snapshot",
});

debug("starting sync from block", startBlock);
return { blockNumber, operations };
}),
shareReplay(1)
);

const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(share());
const latestBlock$ = createBlockStream({ publicClient, blockTag: "latest" }).pipe(shareReplay(1));
const latestBlockNumber$ = latestBlock$.pipe(
map((block) => block.number),
share()
);

let latestBlockNumber: bigint | null = null;
const blockLogs$ = latestBlockNumber$.pipe(
tap((blockNumber) => {
debug("latest block number", blockNumber);
latestBlockNumber = blockNumber;
}),
map((blockNumber) => ({ startBlock, endBlock: blockNumber })),
shareReplay(1)
);

let startBlock: bigint | null = null;
let endBlock: bigint | null = null;
const blockLogs$ = combineLatest([startBlock$, latestBlockNumber$]).pipe(
map(([startBlock, endBlock]) => ({ startBlock, endBlock })),
tap((range) => {
startBlock = range.startBlock;
endBlock = range.endBlock;
}),
blockRangeToLogs({
publicClient,
address,
Expand All @@ -119,32 +196,38 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
);

let lastBlockNumberProcessed: bigint | null = null;
const blockStorageOperations$ = blockLogs$.pipe(
concatMap(blockLogsToStorage(storageAdapter)),
tap(({ blockNumber, operations }) => {
debug("stored", operations.length, "operations for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (latestBlockNumber != null) {
if (blockNumber < latestBlockNumber) {
onProgress?.({
step: SyncStep.RPC,
percentage: Number((lastBlockNumberProcessed * 1000n) / (latestBlockNumber * 1000n)) / 100,
latestBlockNumber,
lastBlockNumberProcessed,
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber,
lastBlockNumberProcessed,
});
const blockStorageOperations$ = concat(
initialStorageOperations$,
blockLogs$.pipe(
concatMap(blockLogsToStorage(storageAdapter)),
tap(({ blockNumber, operations }) => {
debug("stored", operations.length, "operations for block", blockNumber);
lastBlockNumberProcessed = blockNumber;

if (startBlock != null && endBlock != null) {
if (blockNumber < endBlock) {
const totalBlocks = endBlock - startBlock;
const processedBlocks = lastBlockNumberProcessed - startBlock;
onProgress?.({
step: SyncStep.RPC,
percentage: Number((processedBlocks * 1000n) / totalBlocks) / 1000,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "Hydrating from RPC",
});
} else {
onProgress?.({
step: SyncStep.LIVE,
percentage: 100,
latestBlockNumber: endBlock,
lastBlockNumberProcessed,
message: "All caught up!",
});
}
}
}
}),
share()
);
})
)
).pipe(share());

async function waitForTransaction(tx: Hex): Promise<{
receipt: TransactionReceipt;
Expand All @@ -159,7 +242,16 @@ export async function createStoreSync<TConfig extends StoreConfig = StoreConfig>
timeout: publicClient.pollingInterval * 2 * attempt,
});
},
{ retries: 3 }
{
retries: 3,
onFailedAttempt: (error) => {
if (error instanceof WaitForTransactionReceiptTimeoutError) {
debug("timed out waiting for tx receipt, trying again", tx);
return;
}
throw error;
},
}
);
debug("got tx receipt", tx, receipt);

Expand Down
Loading

0 comments on commit bb6ada7

Please sign in to comment.