Skip to content

Commit

Permalink
fix: duplicate logs (#1397)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 authored Jan 5, 2025
1 parent 34894bd commit 787a8dc
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 10 deletions.
5 changes: 5 additions & 0 deletions .changeset/stale-timers-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ponder": patch
---

Fixed a bug resulting in `error: ON CONFLICT DO UPDATE command cannot affect row a second time`.
5 changes: 4 additions & 1 deletion packages/common/src/dedupe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
* ) // [{a: 1, b: 2}, {a: 2, b: 2}]
*
*/
export function dedupe<item, id>(arr: item[], getId?: (x: item) => id): item[] {
export function dedupe<item, id>(
arr: item[] | readonly item[],
getId?: (x: item) => id,
): item[] {
const seen = new Set<id | item>();

return arr.filter((x) => {
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/build/configAndIndexingFunctions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
} from "@/sync/source.js";
import { chains } from "@/utils/chains.js";
import { toLowerCase } from "@/utils/lowercase.js";
import type { Address, Hex, LogTopic } from "viem";
import { dedupe } from "@ponder/common";
import type { Hex, LogTopic } from "viem";
import { buildLogFactory } from "./factory.js";

export type RawIndexingFunctions = {
Expand Down Expand Up @@ -488,9 +489,9 @@ export async function buildConfigAndIndexingFunctions({
}

const validatedAddress = Array.isArray(resolvedAddress)
? (resolvedAddress.map((r) => toLowerCase(r)) as Address[])
? dedupe(resolvedAddress).map((r) => toLowerCase(r))
: resolvedAddress !== undefined
? (toLowerCase(resolvedAddress) as Address)
? toLowerCase(resolvedAddress)
: undefined;

const logSource = {
Expand Down Expand Up @@ -680,9 +681,9 @@ export async function buildConfigAndIndexingFunctions({
}

const validatedAddress = Array.isArray(resolvedAddress)
? (resolvedAddress.map((r) => toLowerCase(r)) as Address[])
? dedupe(resolvedAddress).map((r) => toLowerCase(r))
: resolvedAddress !== undefined
? (toLowerCase(resolvedAddress) as Address)
? toLowerCase(resolvedAddress)
: undefined;

return [
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/build/factory.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { LogFactory } from "@/sync/source.js";
import { toLowerCase } from "@/utils/lowercase.js";
import { getBytesConsumedByParam } from "@/utils/offset.js";
import { dedupe } from "@ponder/common";
import type { AbiEvent } from "abitype";
import { type Address, getEventSelector } from "viem";
import { type Address, toEventSelector } from "viem";

export function buildLogFactory({
address: _address,
Expand All @@ -16,9 +17,9 @@ export function buildLogFactory({
chainId: number;
}): LogFactory {
const address = Array.isArray(_address)
? _address.map(toLowerCase)
? dedupe(_address).map(toLowerCase)
: toLowerCase(_address);
const eventSelector = getEventSelector(event);
const eventSelector = toEventSelector(event);

// Check if the provided parameter is present in the list of indexed inputs.
const indexedInputPosition = event.inputs
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/sync-historical/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export const createHistoricalSync = async (
return [];
} else {
// many addresses
// Note: it is assumed that `address` is deduplicated
addressBatches = [];
for (let i = 0; i < address.length; i += 50) {
addressBatches.push(address.slice(i, i + 50));
Expand Down
44 changes: 44 additions & 0 deletions packages/core/src/sync-store/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,50 @@ test("getChildAddresses() empty", async (context) => {
await cleanup();
});

test("getChildAddresses() distinct", async (context) => {
const { cleanup, syncStore } = await setupDatabaseServices(context);

const network = getNetwork();
const requestQueue = createRequestQueue({
network,
common: context.common,
});

const { address } = await deployFactory({ sender: ALICE });
const { result } = await createPair({ factory: address, sender: ALICE });
const rpcLogs = await _eth_getLogs(requestQueue, {
fromBlock: 2,
toBlock: 2,
});

const { config, rawIndexingFunctions } =
getPairWithFactoryConfigAndIndexingFunctions({
address,
});
const { sources } = await buildConfigAndIndexingFunctions({
config,
rawIndexingFunctions,
});

await syncStore.insertLogs({
logs: [{ log: rpcLogs[0]! }, { log: { ...rpcLogs[0]!, logIndex: "0x1" } }],
shouldUpdateCheckpoint: false,
chainId: 1,
});

const filter = sources[0]!.filter as LogFilter<Factory>;

const addresses = await syncStore.getChildAddresses({
filter: filter.address,
limit: 10,
});

expect(addresses).toHaveLength(1);
expect(addresses[0]).toBe(result);

await cleanup();
});

test("filterChildAddresses()", async (context) => {
const { cleanup, syncStore } = await setupDatabaseServices(context);

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/sync-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ const logFactorySQL = (
}
})().as("childAddress"),
)
.distinct()
.$call((qb) => {
if (Array.isArray(factory.address)) {
return qb.where("address", "in", factory.address);
Expand Down Expand Up @@ -270,7 +271,6 @@ export const createSyncStore = ({
return await db
.selectFrom("logs")
.$call((qb) => logFactorySQL(qb, filter))
.orderBy("id asc")
.$if(limit !== undefined, (qb) => qb.limit(limit!))
.execute()
.then((addresses) => addresses.map(({ childAddress }) => childAddress));
Expand Down

0 comments on commit 787a8dc

Please sign in to comment.