diff --git a/.changeset/long-ligers-travel.md b/.changeset/long-ligers-travel.md new file mode 100644 index 000000000..78d7f119c --- /dev/null +++ b/.changeset/long-ligers-travel.md @@ -0,0 +1,5 @@ +--- +"ponder": patch +--- + +Added support for "eth_getBlockReceipts" request for better performance and cost. diff --git a/packages/core/src/sync-historical/index.ts b/packages/core/src/sync-historical/index.ts index e1b97d5fd..38111b52c 100644 --- a/packages/core/src/sync-historical/index.ts +++ b/packages/core/src/sync-historical/index.ts @@ -18,7 +18,12 @@ import { shouldGetTransactionReceipt, } from "@/sync/source.js"; import type { Source, TransactionFilter } from "@/sync/source.js"; -import type { SyncBlock, SyncLog, SyncTrace } from "@/types/sync.js"; +import type { + SyncBlock, + SyncLog, + SyncTrace, + SyncTransactionReceipt, +} from "@/types/sync.js"; import { type Interval, getChunks, @@ -30,6 +35,7 @@ import type { RequestQueue } from "@/utils/requestQueue.js"; import { _debug_traceBlockByNumber, _eth_getBlockByNumber, + _eth_getBlockReceipts, _eth_getLogs, _eth_getTransactionReceipt, } from "@/utils/rpc.js"; @@ -66,7 +72,10 @@ export const createHistoricalSync = async ( args: CreateHistoricalSyncParameters, ): Promise => { let isKilled = false; - + /** + * Flag to fetch transaction receipts through _eth_getBlockReceipts (true) or _eth_getTransactionReceipt (false) + */ + let isBlockReceipts = true; /** * Blocks that have already been extracted. * Note: All entries are deleted at the end of each call to `sync()`. @@ -82,6 +91,20 @@ export const createHistoricalSync = async ( * Note: All entries are deleted at the end of each call to `sync()`. */ const transactionsCache = new Set(); + /** + * Block transaction receipts that have already been fetched. + * Note: All entries are deleted at the end of each call to `sync()`. + */ + const blockReceiptsCache = new Map>(); + /** + * Transaction receipts that have already been fetched. + * Note: All entries are deleted at the end of each call to `sync()`. + */ + const transactionReceiptsCache = new Map< + Hash, + Promise + >(); + /** * Data about the range passed to "eth_getLogs" for all log * filters and log factories. @@ -318,6 +341,79 @@ export const createHistoricalSync = async ( } }; + const syncTransactionReceipts = async ( + block: Hash, + transactionHashes: Set, + ): Promise => { + if (isBlockReceipts === false) { + const transactionReceipts = await Promise.all( + Array.from(transactionHashes).map((hash) => + syncTransactionReceipt(hash), + ), + ); + + return transactionReceipts; + } + + let blockReceipts: SyncTransactionReceipt[]; + try { + blockReceipts = await syncBlockReceipts(block); + } catch (_error) { + const error = _error as Error; + args.common.logger.warn({ + service: "sync", + msg: `Caught eth_getBlockReceipts error on '${ + args.network.name + }', switching to eth_getTransactionReceipt method.`, + error, + }); + + isBlockReceipts = false; + return syncTransactionReceipts(block, transactionHashes); + } + + const blockReceiptsTransactionHashes = new Set( + blockReceipts.map((r) => r.transactionHash), + ); + // Validate that block transaction receipts include all required transactions + for (const hash of Array.from(transactionHashes)) { + if (blockReceiptsTransactionHashes.has(hash) === false) { + throw new Error( + `Detected inconsistent RPC responses. 'transaction.hash' ${hash} not found in eth_getBlockReceipts response for block '${block}'`, + ); + } + } + const transactionReceipts = blockReceipts.filter((receipt) => + transactionHashes.has(receipt.transactionHash), + ); + + return transactionReceipts; + }; + + const syncTransactionReceipt = async (transaction: Hash) => { + if (transactionReceiptsCache.has(transaction)) { + return await transactionReceiptsCache.get(transaction)!; + } else { + const receipt = _eth_getTransactionReceipt(args.requestQueue, { + hash: transaction, + }); + transactionReceiptsCache.set(transaction, receipt); + return await receipt; + } + }; + + const syncBlockReceipts = async (block: Hash) => { + if (blockReceiptsCache.has(block)) { + return await blockReceiptsCache.get(block)!; + } else { + const blockReceipts = _eth_getBlockReceipts(args.requestQueue, { + blockHash: block, + }); + blockReceiptsCache.set(block, blockReceipts); + return await blockReceipts; + } + }; + /** Extract and insert the log-based addresses that match `filter` + `interval`. */ const syncLogFactory = async (filter: LogFactory, interval: Interval) => { const logs = await syncLogsDynamic({ @@ -380,6 +476,8 @@ export const createHistoricalSync = async ( logs.map((log) => syncBlock(hexToNumber(log.blockNumber))), ); + const requiredBlocks = new Set(blocks.map((b) => b.hash)); + // Validate that logs point to the valid transaction hash in the block for (let i = 0; i < logs.length; i++) { const log = logs[i]!; @@ -418,10 +516,15 @@ export const createHistoricalSync = async ( if (shouldGetTransactionReceipt(filter)) { const transactionReceipts = await Promise.all( - Array.from(transactionHashes).map((hash) => - _eth_getTransactionReceipt(args.requestQueue, { hash }), - ), - ); + Array.from(requiredBlocks).map((blockHash) => { + const blockTransactionHashes = new Set( + logs + .filter((l) => l.blockHash === blockHash) + .map((l) => l.transactionHash), + ); + return syncTransactionReceipts(blockHash, blockTransactionHashes); + }), + ).then((receipts) => receipts.flat()); if (isKilled) return; @@ -472,6 +575,7 @@ export const createHistoricalSync = async ( if (isKilled) return; const transactionHashes: Set = new Set(); + const requiredBlocks: Set = new Set(); for (const block of blocks) { block.transactions.map((transaction) => { @@ -485,6 +589,7 @@ export const createHistoricalSync = async ( }) ) { transactionHashes.add(transaction.hash); + requiredBlocks.add(block); } }); } @@ -496,10 +601,15 @@ export const createHistoricalSync = async ( if (isKilled) return; const transactionReceipts = await Promise.all( - Array.from(transactionHashes).map((hash) => - _eth_getTransactionReceipt(args.requestQueue, { hash }), - ), - ); + Array.from(requiredBlocks).map((block) => { + const blockTransactionHashes = new Set( + block.transactions + .filter((t) => transactionHashes.has(t.hash)) + .map((t) => t.hash), + ); + return syncTransactionReceipts(block.hash, blockTransactionHashes); + }), + ).then((receipts) => receipts.flat()); if (isKilled) return; @@ -521,6 +631,7 @@ export const createHistoricalSync = async ( ? await syncAddressFactory(filter.toAddress, interval) : undefined; + const requiredBlocks: Set = new Set(); const traces = await Promise.all( intervalRange(interval).map(async (number) => { let traces = await syncTrace(number); @@ -555,6 +666,7 @@ export const createHistoricalSync = async ( if (traces.length === 0) return []; const block = await syncBlock(number); + requiredBlocks.add(block.hash); return traces.map((trace) => { const transaction = block.transactions.find( @@ -576,10 +688,6 @@ export const createHistoricalSync = async ( if (isKilled) return; - const transactionHashes = new Set( - traces.map(({ transaction }) => transaction.hash), - ); - await args.syncStore.insertTraces({ traces, chainId: args.network.chainId, @@ -589,10 +697,15 @@ export const createHistoricalSync = async ( if (shouldGetTransactionReceipt(filter)) { const transactionReceipts = await Promise.all( - Array.from(transactionHashes).map((hash) => - _eth_getTransactionReceipt(args.requestQueue, { hash }), - ), - ); + Array.from(requiredBlocks).map((blockHash) => { + const blockTransactionHashes = new Set( + traces + .filter((t) => t.block.hash === blockHash) + .map((t) => t.transaction.hash), + ); + return syncTransactionReceipts(blockHash, blockTransactionHashes); + }), + ).then((receipts) => receipts.flat()); if (isKilled) return; @@ -722,6 +835,8 @@ export const createHistoricalSync = async ( blockCache.clear(); traceCache.clear(); transactionsCache.clear(); + blockReceiptsCache.clear(); + transactionReceiptsCache.clear(); return latestBlock; }, diff --git a/packages/core/src/sync-realtime/index.ts b/packages/core/src/sync-realtime/index.ts index b5b7e37bf..c4492b55d 100644 --- a/packages/core/src/sync-realtime/index.ts +++ b/packages/core/src/sync-realtime/index.ts @@ -28,6 +28,7 @@ import { _debug_traceBlockByHash, _eth_getBlockByHash, _eth_getBlockByNumber, + _eth_getBlockReceipts, _eth_getLogs, _eth_getTransactionReceipt, } from "@/utils/rpc.js"; @@ -100,6 +101,7 @@ export const createRealtimeSync = ( // state //////// let isKilled = false; + let isBlockReceipts = true; let finalizedBlock: LightBlock; let finalizedChildAddresses: Map>; const unfinalizedChildAddresses = new Map>(); @@ -575,6 +577,57 @@ export const createRealtimeSync = ( } }; + const syncTransactionReceipts = async ( + blockHash: Hash, + transactionHashes: Set, + ): Promise => { + if (isBlockReceipts === false) { + const transactionReceipts = await Promise.all( + Array.from(transactionHashes).map(async (hash) => + _eth_getTransactionReceipt(args.requestQueue, { hash }), + ), + ); + + return transactionReceipts; + } + + let blockReceipts: SyncTransactionReceipt[]; + try { + blockReceipts = await _eth_getBlockReceipts(args.requestQueue, { + blockHash, + }); + } catch (_error) { + const error = _error as Error; + args.common.logger.warn({ + service: "realtime", + msg: `Caught eth_getBlockReceipts error on '${ + args.network.name + }', switching to eth_getTransactionReceipt method.`, + error, + }); + + isBlockReceipts = false; + return syncTransactionReceipts(blockHash, transactionHashes); + } + + const blockReceiptsTransactionHashes = new Set( + blockReceipts.map((r) => r.transactionHash), + ); + // Validate that block transaction receipts include all required transactions + for (const hash of Array.from(transactionHashes)) { + if (blockReceiptsTransactionHashes.has(hash) === false) { + throw new Error( + `Detected inconsistent RPC responses. Transaction receipt with transactionHash ${hash} is missing in \`blockReceipts\`.`, + ); + } + } + const transactionReceipts = blockReceipts.filter((receipt) => + transactionHashes.has(receipt.transactionHash), + ); + + return transactionReceipts; + }; + /** * Fetch all data (logs, traces, receipts) for the specified block required by `args.sources` * @@ -769,7 +822,7 @@ export const createRealtimeSync = ( for (const hash of Array.from(requiredTransactions)) { if (blockTransactionsHashes.has(hash) === false) { throw new Error( - `Detected inconsistent RPC responses. Transaction with hash ${hash} is missing in \`block.transactions\`.`, + `Detected inconsistent RPC responses. 'transaction.hash' ${hash} not found in eth_getBlockReceipts response for block '${block.hash}'.`, ); } } @@ -778,12 +831,9 @@ export const createRealtimeSync = ( // Transaction Receipts //////// - const transactionReceipts = await Promise.all( - block.transactions - .filter(({ hash }) => requiredTransactionReceipts.has(hash)) - .map(({ hash }) => - _eth_getTransactionReceipt(args.requestQueue, { hash }), - ), + const transactionReceipts = await syncTransactionReceipts( + block.hash, + requiredTransactionReceipts, ); return { diff --git a/packages/core/src/utils/rpc.ts b/packages/core/src/utils/rpc.ts index 6150d30a7..5d3d31cec 100644 --- a/packages/core/src/utils/rpc.ts +++ b/packages/core/src/utils/rpc.ts @@ -146,6 +146,20 @@ export const _eth_getTransactionReceipt = ( return receipt as SyncTransactionReceipt; }); +/** + * Helper function for "eth_getBlockReceipts" request. + */ +export const _eth_getBlockReceipts = ( + requestQueue: RequestQueue, + { blockHash }: { blockHash: Hash }, +): Promise => + requestQueue + .request({ + method: "eth_getBlockReceipts", + params: [blockHash], + } as any) + .then((receipts) => receipts as unknown as SyncTransactionReceipt[]); + /** * Helper function for "debug_traceBlockByNumber" request. */