Skip to content

Commit

Permalink
fix: reorgs with multiple chains causing missing events (#1335)
Browse files Browse the repository at this point in the history
* fix: reorgs with multiple chains causing missing events

* chore: changeset

* cleanup

* add comment
  • Loading branch information
kyscott18 authored Dec 15, 2024
1 parent 4903a90 commit 77b92ef
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 83 deletions.
5 changes: 5 additions & 0 deletions .changeset/lemon-news-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ponder": patch
---

Fixed a bug that occassionaly caused reorgs to lead to missing events. Please note that this did not affect the rpc cache, users do not have to re-sync.
11 changes: 6 additions & 5 deletions packages/core/src/sync-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type CreateRealtimeSyncParameters = {

export type BlockWithEventData = {
block: SyncBlock;
filters: Set<Filter>;
// filters: Set<Filter>;
logs: SyncLog[];
factoryLogs: SyncLog[];
traces: SyncTrace[];
Expand All @@ -77,6 +77,7 @@ export type BlockWithEventData = {
export type RealtimeSyncEvent =
| ({
type: "block";
hasMatchedFilter: boolean;
} & BlockWithEventData)
| {
type: "finalize";
Expand Down Expand Up @@ -111,7 +112,7 @@ export const createRealtimeSync = (
* `parentHash` => `hash`.
*/
let unfinalizedBlocks: LightBlock[] = [];
let queue: Queue<void, Omit<BlockWithEventData, "filters">>;
let queue: Queue<void, BlockWithEventData>;
let consecutiveErrors = 0;
let interval: NodeJS.Timeout | undefined;

Expand Down Expand Up @@ -186,7 +187,7 @@ export const createRealtimeSync = (
traces,
transactions,
transactionReceipts,
}: Omit<BlockWithEventData, "filters">) => {
}: BlockWithEventData) => {
args.common.logger.debug({
service: "realtime",
msg: `Started syncing '${args.network.name}' block ${hexToNumber(block.number)}`,
Expand Down Expand Up @@ -400,7 +401,7 @@ export const createRealtimeSync = (

await args.onEvent({
type: "block",
filters: matchedFilters,
hasMatchedFilter: matchedFilters.size > 0,
block,
factoryLogs,
logs,
Expand Down Expand Up @@ -584,7 +585,7 @@ export const createRealtimeSync = (
*/
const fetchBlockEventData = async (
block: SyncBlock,
): Promise<Omit<BlockWithEventData, "filters">> => {
): Promise<BlockWithEventData> => {
////////
// Logs
////////
Expand Down
146 changes: 68 additions & 78 deletions packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
createHistoricalSync,
} from "@/sync-historical/index.js";
import {
type BlockWithEventData,
type RealtimeSync,
type RealtimeSyncEvent,
createRealtimeSync,
Expand Down Expand Up @@ -219,21 +218,23 @@ type CreateSyncParameters = {
};

export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
const localSyncContext = new Map<
const perNetworkSync = new Map<
Network,
{
requestQueue: RequestQueue;
syncProgress: SyncProgress;
historicalSync: HistoricalSync;
realtimeSync: RealtimeSync;
unfinalizedEventData: BlockWithEventData[];
unfinalizedBlocks: (Omit<
Extract<RealtimeSyncEvent, { type: "block" }>,
"type"
> & { events: RawEvent[] })[];
}
>();
const status: Status = {};
let isKilled = false;
// Realtime events across all chains that can't be passed to the parent function
// because the overall checkpoint hasn't caught up to the events yet.
let pendingEvents: RawEvent[] = [];

// Instantiate `localSyncData` and `status`
await Promise.all(
Expand Down Expand Up @@ -322,12 +323,12 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
0,
);

localSyncContext.set(network, {
perNetworkSync.set(network, {
requestQueue,
syncProgress,
historicalSync,
realtimeSync,
unfinalizedEventData: [],
unfinalizedBlocks: [],
});
status[network.name] = { block: null, ready: false };
}),
Expand All @@ -339,7 +340,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
const getOmnichainCheckpoint = (
tag: "start" | "end" | "current" | "finalized",
): string | undefined => {
const checkpoints = Array.from(localSyncContext.entries()).map(
const checkpoints = Array.from(perNetworkSync.entries()).map(
([network, { syncProgress }]) =>
getChainCheckpoint({ syncProgress, network, tag }),
);
Expand Down Expand Up @@ -389,7 +390,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
checkpoint: string;
network: Network;
}) => {
const localBlock = localSyncContext
const localBlock = perNetworkSync
.get(network)!
.realtimeSync.unfinalizedBlocks.findLast(
(block) =>
Expand Down Expand Up @@ -437,7 +438,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
let showLogs = true;
while (true) {
const syncGenerator = mergeAsyncGenerators(
Array.from(localSyncContext.entries()).map(
Array.from(perNetworkSync.entries()).map(
([network, { syncProgress, historicalSync }]) =>
localHistoricalSyncGenerator({
common: args.common,
Expand All @@ -459,7 +460,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
* It is an invariant that `latestBlock` will eventually be defined.
*/
if (
Array.from(localSyncContext.values()).some(
Array.from(perNetworkSync.values()).some(
({ syncProgress }) => syncProgress.current === undefined,
)
) {
Expand Down Expand Up @@ -546,7 +547,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {

/** `true` if all networks have synced all known finalized blocks. */
const allHistoricalSyncExhaustive = Array.from(
localSyncContext.values(),
perNetworkSync.values(),
).every(({ syncProgress }) => {
if (isSyncEnd(syncProgress)) return true;

Expand All @@ -566,7 +567,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
latestFinalizedFetch = Date.now();

await Promise.all(
Array.from(localSyncContext.entries()).map(
Array.from(perNetworkSync.entries()).map(
async ([network, { requestQueue, syncProgress }]) => {
args.common.logger.debug({
service: "sync",
Expand Down Expand Up @@ -611,8 +612,8 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
network,
event,
}: { network: Network; event: RealtimeSyncEvent }) => {
const { syncProgress, realtimeSync, unfinalizedEventData } =
localSyncContext.get(network)!;
const { syncProgress, realtimeSync, unfinalizedBlocks } =
perNetworkSync.get(network)!;

switch (event.type) {
/**
Expand All @@ -630,17 +631,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
hexToNumber(syncProgress.current.number),
);

const blockWithEventData = {
block: event.block,
filters: event.filters,
logs: event.logs,
factoryLogs: event.factoryLogs,
traces: event.traces,
transactions: event.transactions,
transactionReceipts: event.transactionReceipts,
};

unfinalizedEventData.push(blockWithEventData);
const blockWithEventData = event;

const events = buildEvents({
sources: args.sources,
Expand All @@ -650,19 +641,27 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
unfinalizedChildAddresses: realtimeSync.unfinalizedChildAddresses,
});

pendingEvents.push(...events);
unfinalizedBlocks.push({ ...blockWithEventData, events });

if (to > from) {
for (const network of args.networks) {
updateRealtimeStatus({ checkpoint: to, network });
}

const events = pendingEvents
.filter(({ checkpoint }) => checkpoint <= to)
.sort((a, b) => (a.checkpoint < b.checkpoint ? -1 : 1));
const pendingEvents: RawEvent[] = [];

pendingEvents = pendingEvents.filter(
({ checkpoint }) => checkpoint > to,
for (const { unfinalizedBlocks } of perNetworkSync.values()) {
for (const { events } of unfinalizedBlocks) {
for (const event of events) {
if (event.checkpoint > from && event.checkpoint <= to) {
pendingEvents.push(event);
}
}
}
}

const events = pendingEvents.sort((a, b) =>
a.checkpoint < b.checkpoint ? -1 : 1,
);

args
Expand Down Expand Up @@ -704,17 +703,6 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
args.onRealtimeEvent({ type: "finalize", checkpoint });
}

const finalizedEventData = unfinalizedEventData.filter(
(ued) =>
hexToNumber(ued.block.number) <= hexToNumber(event.block.number),
);

localSyncContext.get(network)!.unfinalizedEventData =
unfinalizedEventData.filter(
(ued) =>
hexToNumber(ued.block.number) > hexToNumber(event.block.number),
);

if (
getChainCheckpoint({ syncProgress, network, tag: "finalized" })! >
getOmnichainCheckpoint("current")!
Expand All @@ -723,57 +711,68 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
service: "sync",
msg: `Finalized block for '${network.name}' has surpassed overall indexing checkpoint`,
});
// exit early because we need to keep `unfinalizedBlocks.events`
return;
}

const finalizedBlocks = unfinalizedBlocks.filter(
({ block }) =>
hexToNumber(block.number) <= hexToNumber(event.block.number),
);

perNetworkSync.get(network)!.unfinalizedBlocks =
unfinalizedBlocks.filter(
({ block }) =>
hexToNumber(block.number) > hexToNumber(event.block.number),
);

// Add finalized blocks, logs, transactions, receipts, and traces to the sync-store.

await Promise.all([
args.syncStore.insertBlocks({
blocks: finalizedEventData
.filter(({ filters }) => filters.size > 0)
blocks: finalizedBlocks
.filter(({ hasMatchedFilter }) => hasMatchedFilter)
.map(({ block }) => block),
chainId: network.chainId,
}),
args.syncStore.insertLogs({
logs: finalizedEventData.flatMap(({ logs, block }) =>
logs: finalizedBlocks.flatMap(({ logs, block }) =>
logs.map((log) => ({ log, block })),
),
shouldUpdateCheckpoint: true,
chainId: network.chainId,
}),
args.syncStore.insertLogs({
logs: finalizedEventData.flatMap(({ factoryLogs }) =>
logs: finalizedBlocks.flatMap(({ factoryLogs }) =>
factoryLogs.map((log) => ({ log })),
),
shouldUpdateCheckpoint: false,
chainId: network.chainId,
}),
args.syncStore.insertTransactions({
transactions: finalizedEventData.flatMap(
({ transactions, block }) =>
transactions.map((transaction) => ({
transaction,
block,
})),
transactions: finalizedBlocks.flatMap(({ transactions, block }) =>
transactions.map((transaction) => ({
transaction,
block,
})),
),
chainId: network.chainId,
}),
args.syncStore.insertTransactionReceipts({
transactionReceipts: finalizedEventData.flatMap(
transactionReceipts: finalizedBlocks.flatMap(
({ transactionReceipts }) => transactionReceipts,
),
chainId: network.chainId,
}),
args.syncStore.insertTraces({
traces: finalizedEventData.flatMap(
({ traces, block, transactions }) =>
traces.map((trace) => ({
trace,
block,
transaction: transactions.find(
(t) => t.hash === trace.transactionHash,
)!,
})),
traces: finalizedBlocks.flatMap(({ traces, block, transactions }) =>
traces.map((trace) => ({
trace,
block,
transaction: transactions.find(
(t) => t.hash === trace.transactionHash,
)!,
})),
),
chainId: network.chainId,
}),
Expand Down Expand Up @@ -824,21 +823,12 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
hexToNumber(syncProgress.current.number),
);

localSyncContext.get(network)!.unfinalizedEventData =
unfinalizedEventData.filter(
(led) =>
hexToNumber(led.block.number) <= hexToNumber(event.block.number),
perNetworkSync.get(network)!.unfinalizedBlocks =
unfinalizedBlocks.filter(
({ block }) =>
hexToNumber(block.number) <= hexToNumber(event.block.number),
);

const reorgedHashes = new Set<Hash>();
for (const b of event.reorgedBlocks) {
reorgedHashes.add(b.hash);
}

pendingEvents = pendingEvents.filter(
(e) => reorgedHashes.has(e.block.hash) === false,
);

await args.syncStore.pruneRpcRequestResult({
blocks: event.reorgedBlocks,
chainId: network.chainId,
Expand All @@ -858,7 +848,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
getEvents,
async startRealtime() {
for (const network of args.networks) {
const { syncProgress, realtimeSync } = localSyncContext.get(network)!;
const { syncProgress, realtimeSync } = perNetworkSync.get(network)!;

status[network.name]!.block = {
number: hexToNumber(syncProgress.current!.number),
Expand Down Expand Up @@ -907,14 +897,14 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
return status;
},
getCachedTransport(network) {
const { requestQueue } = localSyncContext.get(network)!;
const { requestQueue } = perNetworkSync.get(network)!;
return cachedTransport({ requestQueue, syncStore: args.syncStore });
},
async kill() {
isKilled = true;
const promises: Promise<void>[] = [];
for (const network of args.networks) {
const { historicalSync, realtimeSync } = localSyncContext.get(network)!;
const { historicalSync, realtimeSync } = perNetworkSync.get(network)!;
historicalSync.kill();
promises.push(realtimeSync.kill());
}
Expand Down

0 comments on commit 77b92ef

Please sign in to comment.