Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
kyscott18 committed Dec 30, 2024
1 parent 608da6c commit 9decd01
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 65 deletions.
138 changes: 79 additions & 59 deletions packages/core/src/sync-historical/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
isTransferFilterMatched,
} from "@/sync-realtime/filter.js";
import type { SyncStore } from "@/sync-store/index.js";
import { type FragmentId, recoverFilter } from "@/sync/fragments.js";
import {
type BlockFilter,
type Factory,
Expand All @@ -22,10 +23,10 @@ import type { SyncBlock, SyncLog, SyncTrace } from "@/types/sync.js";
import {
type Interval,
getChunks,
intervalBounds,
intervalDifference,
intervalRange,
} from "@/utils/interval.js";
import { never } from "@/utils/never.js";
import type { RequestQueue } from "@/utils/requestQueue.js";
import {
_debug_traceBlockByNumber,
Expand All @@ -44,7 +45,7 @@ import {
} from "viem";

export type HistoricalSync = {
intervalsCache: Map<Filter, Interval[]>;
intervalsCache: Map<Filter, [FragmentId, Interval[]][]>;
/**
* Extract raw data for `interval` and return the closest-to-tip block
* that is synced.
Expand Down Expand Up @@ -100,7 +101,7 @@ export const createHistoricalSync = async (
*
* Note: `intervalsCache` is not updated after a new interval is synced.
*/
let intervalsCache: Map<Filter, Interval[]>;
let intervalsCache: Map<Filter, [FragmentId, Interval[]][]>;
if (args.network.disableCache) {
intervalsCache = new Map();
for (const { filter } of args.sources) {
Expand Down Expand Up @@ -592,71 +593,90 @@ export const createHistoricalSync = async (
return {
intervalsCache,
async sync(_interval) {
const syncedIntervals: { filter: Filter; interval: Interval }[] = [];
const syncedIntervals: {
interval: Interval;
filter: Omit<Filter, "fromBlock" | "toBlock">;
}[] = [];

await Promise.all(
args.sources.map(async (source) => {
const filter = source.filter;

// Compute the required interval to sync, accounting for cached
// intervals and start + end block.

// Skip sync if the interval is after the `toBlock` or before
// the `fromBlock`.
if (
(filter.fromBlock !== undefined &&
filter.fromBlock > _interval[1]) ||
(filter.toBlock !== undefined && filter.toBlock < _interval[0])
) {
return;
}
const interval: Interval = [
Math.max(filter.fromBlock ?? 0, _interval[0]),
Math.min(filter.toBlock ?? Number.POSITIVE_INFINITY, _interval[1]),
];
const completedIntervals = intervalsCache.get(filter)!;
const requiredIntervals = intervalDifference(
// Determine the requests that need to be made, and which intervals need to be inserted.
// Fragments are used to create a minimal filter, to avoid refetching data even if a filter
// is only partially synced.

for (const { filter } of args.sources) {
if (
(filter.fromBlock !== undefined && filter.fromBlock > _interval[1]) ||
(filter.toBlock !== undefined && filter.toBlock < _interval[0])
) {
continue;
}

const interval: Interval = [
Math.max(filter.fromBlock ?? 0, _interval[0]),
Math.min(filter.toBlock ?? Number.POSITIVE_INFINITY, _interval[1]),
];

const completedIntervals = intervalsCache.get(filter)!;
const requiredIntervals: [FragmentId, Interval[]][] = [];

for (const [fragmentId, fragmentInterval] of completedIntervals) {
const requiredFragmentIntervals = intervalDifference(
[interval],
completedIntervals,
fragmentInterval,
);

// Skip sync if the interval is already complete.
if (requiredIntervals.length === 0) return;
if (requiredFragmentIntervals.length > 0) {
requiredIntervals.push([fragmentId, requiredFragmentIntervals]);
}
}

const requiredInterval = intervalBounds(
requiredIntervals.flatMap(([_, interval]) => interval),
);

const requiredFilter = recoverFilter(
requiredIntervals.map(([fragmentId]) => fragmentId),
);

syncedIntervals.push({
filter: requiredFilter,
interval: requiredInterval,
});
}

await Promise.all(
syncedIntervals.map(async ({ filter, interval }) => {
// Request last block of interval
const blockPromise = syncBlock(interval[1]);

try {
// sync required intervals, account for chunk sizes
await Promise.all(
requiredIntervals.map(async (interval) => {
switch (filter.type) {
case "log": {
await syncLogFilter(filter, interval);
break;
}

case "block": {
await syncBlockFilter(filter, interval);
break;
}

case "transaction": {
await syncTransactionFilter(filter, interval);
break;
}

case "trace":
case "transfer": {
await syncTraceOrTransferFilter(filter, interval);
break;
}

default:
never(filter);
}
}),
);
switch (filter.type) {
case "log": {
await syncLogFilter(filter as LogFilter, interval);
break;
}

case "block": {
await syncBlockFilter(filter as BlockFilter, interval);
break;
}

case "transaction": {
await syncTransactionFilter(
filter as TransactionFilter,
interval,
);
break;
}

case "trace":
case "transfer": {
await syncTraceOrTransferFilter(
filter as TraceFilter | TransferFilter,
interval,
);
break;
}
}
} catch (_error) {
const error = _error as Error;

Expand Down
16 changes: 11 additions & 5 deletions packages/core/src/sync-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type {
SyncTransactionReceipt,
} from "@/types/sync.js";
import type { NonNull } from "@/types/utils.js";
import { type Interval, intervalIntersectionMany } from "@/utils/interval.js";
import type { Interval } from "@/utils/interval.js";
import { type Kysely, type SelectQueryBuilder, sql as ksql } from "kysely";
import type { InsertObject } from "kysely";
import {
Expand All @@ -47,14 +47,14 @@ import {
export type SyncStore = {
insertIntervals(args: {
intervals: {
filter: Filter;
filter: Omit<Filter, "fromBlock" | "toBlock">;
interval: Interval;
}[];
chainId: number;
}): Promise<void>;
getIntervals(args: {
filters: Filter[];
}): Promise<Map<Filter, Interval[]>>;
}): Promise<Map<Filter, [FragmentId, Interval[]][]>>;
getChildAddresses(args: {
filter: Factory;
limit?: number;
Expand Down Expand Up @@ -239,7 +239,7 @@ export const createSyncStore = ({

const rows = await query!.execute();

const result: Map<Filter, Interval[]> = new Map();
const result: Map<Filter, [FragmentId, Interval[]]> = new Map();

// intervals use "union" for the same fragment, and
// "intersection" for the same filter
Expand All @@ -260,7 +260,13 @@ export const createSyncStore = ({
).map((interval) => [interval[0], interval[1] - 1] as Interval),
);

result.set(filter, intervalIntersectionMany(intervals));
// result.set(filt)

// for (const fragment of getFragmentIds(filter)) {
// result.
// }

// result.set(filter, intervalIntersectionMany(intervals));
}

return result;
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/sync/fragments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type FragmentId =
| `transfer_${number}_${FragmentAddress}_${FragmentAddress}_${0 | 1}`;

export const getFragmentIds = (
filter: Omit<Filter, "startBlock" | "endBlock">,
filter: Omit<Filter, "fromBlock" | "toBlock">,
): FragmentReturnType => {
switch (filter.type) {
case "block":
Expand Down Expand Up @@ -273,3 +273,7 @@ export const getTransferFilterFragmentIds = ({

return fragments;
};

export const recoverFilter = (
fragmentIds: FragmentId[],
): Omit<Filter, "fromBlock" | "toBlock"> => {};
13 changes: 13 additions & 0 deletions packages/core/src/utils/interval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ export function intervalDifference(
return result;
}

/**
* Return an interval that encompasses all the intervals in the list.
*
* @param intervals List of numeric intervals to find the bounds of.
* @returns Bounds of the intervals.
*/
export function intervalBounds(intervals: Interval[]): Interval {
const start = Math.min(...intervals.map((interval) => interval[0]));
const end = Math.max(...intervals.map((interval) => interval[1]));

return [start, end];
}

export function sortIntervals(intervals: Interval[]) {
return intervals.sort((a, b) => (a[0] < b[0] ? -1 : 1));
}
Expand Down

0 comments on commit 9decd01

Please sign in to comment.