Skip to content

Commit

Permalink
fix: skipped events in historical to realtime handoff (#1421)
Browse files Browse the repository at this point in the history
* remove refetch finalized block

* remove extra cache logic

* fix: skipped events in historical to realtime handoff

* rename

* Revert "remove refetch finalized block"

This reverts commit c41c502.

* Revert "remove extra cache logic"

This reverts commit da0204e.

* Update index.ts

* lint

---------

Co-authored-by: Kevin <[email protected]>
  • Loading branch information
kyscott18 and typedarray authored Jan 8, 2025
1 parent 85c9b32 commit de4b398
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/shaggy-oranges-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ponder": patch
---

Fixed a bug where events between the historical backfill and live indexing were skipped. This does not affect the rpc cache.
4 changes: 2 additions & 2 deletions packages/core/src/sync-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export type SyncStore = {
filters: Filter[];
from: string;
to: string;
limit: number;
limit?: number;
}): Promise<{ events: RawEvent[]; cursor: string }>;
insertRpcRequestResult(args: {
request: string;
Expand Down Expand Up @@ -836,7 +836,7 @@ export const createSyncStore = ({
.where("event.checkpoint", "<=", to)
.orderBy("event.checkpoint", "asc")
.orderBy("event.filterIndex", "asc")
.limit(limit)
.$if(limit !== undefined, (qb) => qb.limit(limit!))
.execute();
},
);
Expand Down
103 changes: 75 additions & 28 deletions packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
syncProgress: SyncProgress;
historicalSync: HistoricalSync;
realtimeSync: RealtimeSync;
unfinalizedBlocks: (Omit<
unfinalizedBlocks: Omit<
Extract<RealtimeSyncEvent, { type: "block" }>,
"type"
> & { events: RawEvent[] })[];
>[];
}
>();
/** Events that have been executed but not finalized. */
let executedEvents: RawEvent[] = [];
/** Events that have not been executed yet. */
let pendingEvents: RawEvent[] = [];
const status: Status = {};
let isKilled = false;
// Realtime events across all chains that can't be passed to the parent function
Expand Down Expand Up @@ -634,38 +638,32 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
hexToNumber(syncProgress.current.number),
);

const blockWithEventData = event;

const events = buildEvents({
const newEvents = buildEvents({
sources: args.sources,
chainId: network.chainId,
blockWithEventData,
blockWithEventData: event,
finalizedChildAddresses: realtimeSync.finalizedChildAddresses,
unfinalizedChildAddresses: realtimeSync.unfinalizedChildAddresses,
});

unfinalizedBlocks.push({ ...blockWithEventData, events });
unfinalizedBlocks.push(event);
pendingEvents.push(...newEvents);

if (to > from) {
for (const network of args.networks) {
updateRealtimeStatus({ checkpoint: to, network });
}

const pendingEvents: RawEvent[] = [];
// Move events from pending to executed

for (const { unfinalizedBlocks } of perNetworkSync.values()) {
for (const { events } of unfinalizedBlocks) {
for (const event of events) {
if (event.checkpoint > from && event.checkpoint <= to) {
pendingEvents.push(event);
}
}
}
}
const events = pendingEvents
.filter((event) => event.checkpoint < to)
.sort((a, b) => (a.checkpoint < b.checkpoint ? -1 : 1));

const events = pendingEvents.sort((a, b) =>
a.checkpoint < b.checkpoint ? -1 : 1,
pendingEvents = pendingEvents.filter(
({ checkpoint }) => checkpoint > to,
);
executedEvents.push(...events);

args
.onRealtimeEvent({
Expand Down Expand Up @@ -714,10 +712,10 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
service: "sync",
msg: `Finalized block for '${network.name}' has surpassed overall indexing checkpoint`,
});
// exit early because we need to keep `unfinalizedBlocks.events`
return;
}

// Remove all finalized data

const finalizedBlocks = unfinalizedBlocks.filter(
({ block }) =>
hexToNumber(block.number) <= hexToNumber(event.block.number),
Expand All @@ -729,6 +727,10 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
hexToNumber(block.number) > hexToNumber(event.block.number),
);

executedEvents = executedEvents.filter(
(e) => e.checkpoint > checkpoint,
);

// Add finalized blocks, logs, transactions, receipts, and traces to the sync-store.

await Promise.all([
Expand Down Expand Up @@ -819,6 +821,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
*/
case "reorg": {
syncProgress.current = event.block;
// Note: this checkpoint is <= the previous checkpoint
const checkpoint = getOmnichainCheckpoint("current")!;

// Update "ponder_sync_block" metric
Expand All @@ -827,15 +830,36 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
hexToNumber(syncProgress.current.number),
);

// Remove all reorged data

perNetworkSync.get(network)!.unfinalizedBlocks =
unfinalizedBlocks.filter(
({ block }) =>
hexToNumber(block.number) <= hexToNumber(event.block.number),
);

const isReorgedEvent = ({ chainId, block }: RawEvent) =>
chainId === network.chainId &&
Number(block.number) > hexToNumber(event.block.number);

pendingEvents = pendingEvents.filter(
(e) => isReorgedEvent(e) === false,
);
executedEvents = executedEvents.filter(
(e) => isReorgedEvent(e) === false,
);

// Move events from executed to pending

const events = executedEvents.filter((e) => e.checkpoint > checkpoint);
executedEvents = executedEvents.filter(
(e) => e.checkpoint < checkpoint,
);
pendingEvents.push(...events);

await args.syncStore.pruneRpcRequestResult({
blocks: event.reorgedBlocks,
chainId: network.chainId,
blocks: event.reorgedBlocks,
});

// Raise event to parent function (runtime)
Expand All @@ -854,12 +878,39 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
for (const network of args.networks) {
const { syncProgress, realtimeSync } = perNetworkSync.get(network)!;

const filters = args.sources
.filter(({ filter }) => filter.chainId === network.chainId)
.map(({ filter }) => filter);

status[network.name]!.block = {
number: hexToNumber(syncProgress.current!.number),
timestamp: hexToNumber(syncProgress.current!.timestamp),
};
status[network.name]!.ready = true;

// Fetch any events between the omnichain finalized checkpoint and the single-chain
// finalized checkpoint and add them to pendingEvents. These events are synced during
// the historical phase, but must be indexed in the realtime phase because events
// synced in realtime on other chains might be ordered before them.
const from = getOmnichainCheckpoint("finalized")!;

const finalized = getChainCheckpoint({
syncProgress,
network,
tag: "finalized",
})!;
const end = getChainCheckpoint({
syncProgress,
network,
tag: "end",
})!;
const to = min(finalized, end);

if (to > from) {
const events = await args.syncStore.getEvents({ filters, from, to });
pendingEvents.push(...events.events);
}

if (isSyncEnd(syncProgress)) {
args.common.metrics.ponder_sync_is_complete.set(
{ network: network.name },
Expand All @@ -873,12 +924,8 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {

const initialChildAddresses = new Map<Factory, Set<Address>>();

for (const { filter } of args.sources) {
if (
filter.chainId === network.chainId &&
"address" in filter &&
isAddressFactory(filter.address)
) {
for (const filter of filters) {
if ("address" in filter && isAddressFactory(filter.address)) {
const addresses = await args.syncStore.getChildAddresses({
filter: filter.address,
});
Expand Down

0 comments on commit de4b398

Please sign in to comment.