From 787a8dc1d92b08ed85ee9762ef41fd0918f163ef Mon Sep 17 00:00:00 2001 From: kyscott18 <43524469+kyscott18@users.noreply.github.com> Date: Sun, 5 Jan 2025 10:09:58 -0500 Subject: [PATCH] fix: duplicate logs (#1397) --- .changeset/stale-timers-remain.md | 5 +++ packages/common/src/dedupe.ts | 5 ++- .../src/build/configAndIndexingFunctions.ts | 11 ++--- packages/core/src/build/factory.ts | 7 +-- packages/core/src/sync-historical/index.ts | 1 + packages/core/src/sync-store/index.test.ts | 44 +++++++++++++++++++ packages/core/src/sync-store/index.ts | 2 +- 7 files changed, 65 insertions(+), 10 deletions(-) create mode 100644 .changeset/stale-timers-remain.md diff --git a/.changeset/stale-timers-remain.md b/.changeset/stale-timers-remain.md new file mode 100644 index 000000000..39c6070b1 --- /dev/null +++ b/.changeset/stale-timers-remain.md @@ -0,0 +1,5 @@ +--- +"ponder": patch +--- + +Fixed a bug resulting in `error: ON CONFLICT DO UPDATE command cannot affect row a second time`. diff --git a/packages/common/src/dedupe.ts b/packages/common/src/dedupe.ts index b69310ad4..32fc2a5a0 100644 --- a/packages/common/src/dedupe.ts +++ b/packages/common/src/dedupe.ts @@ -16,7 +16,10 @@ * ) // [{a: 1, b: 2}, {a: 2, b: 2}] * */ -export function dedupe(arr: item[], getId?: (x: item) => id): item[] { +export function dedupe( + arr: item[] | readonly item[], + getId?: (x: item) => id, +): item[] { const seen = new Set(); return arr.filter((x) => { diff --git a/packages/core/src/build/configAndIndexingFunctions.ts b/packages/core/src/build/configAndIndexingFunctions.ts index 28c14824a..73ac983d6 100644 --- a/packages/core/src/build/configAndIndexingFunctions.ts +++ b/packages/core/src/build/configAndIndexingFunctions.ts @@ -21,7 +21,8 @@ import { } from "@/sync/source.js"; import { chains } from "@/utils/chains.js"; import { toLowerCase } from "@/utils/lowercase.js"; -import type { Address, Hex, LogTopic } from "viem"; +import { dedupe } from "@ponder/common"; +import type { Hex, LogTopic } from "viem"; import { buildLogFactory } from "./factory.js"; export type RawIndexingFunctions = { @@ -488,9 +489,9 @@ export async function buildConfigAndIndexingFunctions({ } const validatedAddress = Array.isArray(resolvedAddress) - ? (resolvedAddress.map((r) => toLowerCase(r)) as Address[]) + ? dedupe(resolvedAddress).map((r) => toLowerCase(r)) : resolvedAddress !== undefined - ? (toLowerCase(resolvedAddress) as Address) + ? toLowerCase(resolvedAddress) : undefined; const logSource = { @@ -680,9 +681,9 @@ export async function buildConfigAndIndexingFunctions({ } const validatedAddress = Array.isArray(resolvedAddress) - ? (resolvedAddress.map((r) => toLowerCase(r)) as Address[]) + ? dedupe(resolvedAddress).map((r) => toLowerCase(r)) : resolvedAddress !== undefined - ? (toLowerCase(resolvedAddress) as Address) + ? toLowerCase(resolvedAddress) : undefined; return [ diff --git a/packages/core/src/build/factory.ts b/packages/core/src/build/factory.ts index c82534001..000e38c17 100644 --- a/packages/core/src/build/factory.ts +++ b/packages/core/src/build/factory.ts @@ -1,8 +1,9 @@ import type { LogFactory } from "@/sync/source.js"; import { toLowerCase } from "@/utils/lowercase.js"; import { getBytesConsumedByParam } from "@/utils/offset.js"; +import { dedupe } from "@ponder/common"; import type { AbiEvent } from "abitype"; -import { type Address, getEventSelector } from "viem"; +import { type Address, toEventSelector } from "viem"; export function buildLogFactory({ address: _address, @@ -16,9 +17,9 @@ export function buildLogFactory({ chainId: number; }): LogFactory { const address = Array.isArray(_address) - ? _address.map(toLowerCase) + ? dedupe(_address).map(toLowerCase) : toLowerCase(_address); - const eventSelector = getEventSelector(event); + const eventSelector = toEventSelector(event); // Check if the provided parameter is present in the list of indexed inputs. const indexedInputPosition = event.inputs diff --git a/packages/core/src/sync-historical/index.ts b/packages/core/src/sync-historical/index.ts index 6db986cd1..e1b97d5fd 100644 --- a/packages/core/src/sync-historical/index.ts +++ b/packages/core/src/sync-historical/index.ts @@ -184,6 +184,7 @@ export const createHistoricalSync = async ( return []; } else { // many addresses + // Note: it is assumed that `address` is deduplicated addressBatches = []; for (let i = 0; i < address.length; i += 50) { addressBatches.push(address.slice(i, i + 50)); diff --git a/packages/core/src/sync-store/index.test.ts b/packages/core/src/sync-store/index.test.ts index 1feb27b5f..c27088746 100644 --- a/packages/core/src/sync-store/index.test.ts +++ b/packages/core/src/sync-store/index.test.ts @@ -331,6 +331,50 @@ test("getChildAddresses() empty", async (context) => { await cleanup(); }); +test("getChildAddresses() distinct", async (context) => { + const { cleanup, syncStore } = await setupDatabaseServices(context); + + const network = getNetwork(); + const requestQueue = createRequestQueue({ + network, + common: context.common, + }); + + const { address } = await deployFactory({ sender: ALICE }); + const { result } = await createPair({ factory: address, sender: ALICE }); + const rpcLogs = await _eth_getLogs(requestQueue, { + fromBlock: 2, + toBlock: 2, + }); + + const { config, rawIndexingFunctions } = + getPairWithFactoryConfigAndIndexingFunctions({ + address, + }); + const { sources } = await buildConfigAndIndexingFunctions({ + config, + rawIndexingFunctions, + }); + + await syncStore.insertLogs({ + logs: [{ log: rpcLogs[0]! }, { log: { ...rpcLogs[0]!, logIndex: "0x1" } }], + shouldUpdateCheckpoint: false, + chainId: 1, + }); + + const filter = sources[0]!.filter as LogFilter; + + const addresses = await syncStore.getChildAddresses({ + filter: filter.address, + limit: 10, + }); + + expect(addresses).toHaveLength(1); + expect(addresses[0]).toBe(result); + + await cleanup(); +}); + test("filterChildAddresses()", async (context) => { const { cleanup, syncStore } = await setupDatabaseServices(context); diff --git a/packages/core/src/sync-store/index.ts b/packages/core/src/sync-store/index.ts index e646b354a..7485e6704 100644 --- a/packages/core/src/sync-store/index.ts +++ b/packages/core/src/sync-store/index.ts @@ -141,6 +141,7 @@ const logFactorySQL = ( } })().as("childAddress"), ) + .distinct() .$call((qb) => { if (Array.isArray(factory.address)) { return qb.where("address", "in", factory.address); @@ -270,7 +271,6 @@ export const createSyncStore = ({ return await db .selectFrom("logs") .$call((qb) => logFactorySQL(qb, filter)) - .orderBy("id asc") .$if(limit !== undefined, (qb) => qb.limit(limit!)) .execute() .then((addresses) => addresses.map(({ childAddress }) => childAddress));