Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc refactoring #1296

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
20 changes: 11 additions & 9 deletions examples/feature-multichain/ponder.config.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,43 @@
import { createConfig } from "ponder";
import { http, createPublicClient } from "viem";
import { createPublicClient, webSocket } from "viem";

import { weth9Abi } from "./abis/weth9Abi";

const latestBlockMainnet = await createPublicClient({
transport: http(process.env.PONDER_RPC_URL_1),
transport: webSocket(process.env.PONDER_RPC_URL_1),
}).getBlock();
const latestBlockBase = await createPublicClient({
transport: http(process.env.PONDER_RPC_URL_8453),
transport: webSocket(process.env.PONDER_RPC_URL_8453),
}).getBlock();
const latestBlockOptimism = await createPublicClient({
transport: http(process.env.PONDER_RPC_URL_10),
transport: webSocket(process.env.PONDER_RPC_URL_10),
}).getBlock();
const latestBlockPolygon = await createPublicClient({
transport: http(process.env.PONDER_RPC_URL_137),
transport: webSocket(process.env.PONDER_RPC_URL_137),
}).getBlock();

export default createConfig({
networks: {
mainnet: {
chainId: 1,
transport: http(process.env.PONDER_RPC_URL_1),
transport: webSocket(process.env.PONDER_RPC_URL_1),
pollingInterval: 15_000,
},
base: {
chainId: 8453,
transport: http(process.env.PONDER_RPC_URL_8453),
transport: webSocket(
"wss://base-mainnet.g.alchemy.com/v2/Ni1nSzvVMNzPF6w5yFvmUcHiOCmuAX60",
),
pollingInterval: 15_000,
},
optimism: {
chainId: 10,
transport: http(process.env.PONDER_RPC_URL_10),
transport: webSocket(process.env.PONDER_RPC_URL_10),
pollingInterval: 15_000,
},
polygon: {
chainId: 137,
transport: http(process.env.PONDER_RPC_URL_137),
transport: webSocket(process.env.PONDER_RPC_URL_137),
pollingInterval: 15_000,
},
},
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/rpc/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { setupAnvil, setupCommon } from "@/_test/setup.js";
import { getNetwork } from "@/_test/utils.js";
import { beforeEach, expect, test } from "vitest";
import { createRpc } from "./index.js";

beforeEach(setupCommon);
beforeEach(setupAnvil);

test("requests", async ({ common }) => {
const network = getNetwork();
network.maxRequestsPerSecond = 1;

const rpc = createRpc({ common, network });

const chainId = await rpc.request({ method: "eth_chainId" });

expect(chainId).toBe("0x1");
});
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { Common } from "@/common/common.js";
import { NonRetryableError } from "@/common/errors.js";
import type { Network } from "@/config/networks.js";
import type { DebugRpcSchema } from "@/utils/debug.js";
import { startClock } from "@/utils/timer.js";
import { wait } from "@/utils/wait.js";
import { type Queue, createQueue } from "@ponder/common";
import {
type GetLogsRetryHelperParameters,
Expand All @@ -14,42 +18,44 @@ import {
LimitExceededRpcError,
type PublicRpcSchema,
type RpcError,
type Transport,
type WebSocketTransport,
isHex,
} from "viem";
import type { DebugRpcSchema } from "./debug.js";
import { startClock } from "./timer.js";
import { wait } from "./wait.js";

type Schema = [...PublicRpcSchema, ...DebugRpcSchema];

type RequestReturnType<method extends EIP1193Parameters<Schema>["method"]> =
Extract<Schema[number], { Method: method }>["ReturnType"];

export type RequestQueue = Omit<
Queue<
RequestReturnType<EIP1193Parameters<Schema>["method"]>,
EIP1193Parameters<Schema>
>,
"add"
> & {
export type SubscribeParameters = Parameters<
NonNullable<ReturnType<WebSocketTransport>["value"]>["subscribe"]
>[0];

export type SubscribeReturnType = Awaited<
ReturnType<NonNullable<ReturnType<WebSocketTransport>["value"]>["subscribe"]>
>;

export type Rpc = {
request: <TParameters extends EIP1193Parameters<Schema>>(
parameters: TParameters,
) => Promise<RequestReturnType<TParameters["method"]>>;
subscribe: (params: SubscribeParameters) => Promise<SubscribeReturnType>;
supports: (
method: EIP1193Parameters<Schema>["method"] | "eth_subscribe",
) => boolean;
};

const RETRY_COUNT = 9;
const BASE_DURATION = 125;

/**
* Creates a queue built to manage rpc requests.
*/
export const createRequestQueue = ({
export const createRpc = ({
network,
common,
}: {
network: Network;
common: Common;
}): RequestQueue => {
}): Rpc => {
// @ts-ignore
const fetchRequest = async (request: EIP1193Parameters<PublicRpcSchema>) => {
for (let i = 0; i <= RETRY_COUNT; i++) {
Expand Down Expand Up @@ -106,7 +112,50 @@ export const createRequestQueue = ({
}
};

const requestQueue: Queue<
// @ts-ignore
const subscribe = async (request: SubscribeParameters) => {
for (let i = 0; i <= RETRY_COUNT; i++) {
try {
const stopClock = startClock();
const wsTransport = resolveWebsocketTransport(network.transport);

if (wsTransport?.value === undefined) {
throw new NonRetryableError(
`No webSocket transport found for ${network.transport.config.type} transport.`,
);
}

const response = await wsTransport.value.subscribe(request);

common.metrics.ponder_rpc_request_duration.observe(
{ method: "eth_subscribe", network: network.name },
stopClock(),
);
return response;
} catch (_error) {
const error = _error as Error;

if (i === RETRY_COUNT) {
common.logger.warn({
service: "sync",
msg: `Failed 'eth_subscribe' RPC request after ${i + 1} attempts`,
error,
});
throw error;
}

const duration = BASE_DURATION * 2 ** i;
common.logger.debug({
service: "sync",
msg: `Failed 'eth_subscribe' RPC request, retrying after ${duration} milliseconds`,
error,
});
await wait(duration);
}
}
};

const queue: Queue<
unknown,
{
request: EIP1193Parameters<PublicRpcSchema>;
Expand All @@ -117,29 +166,43 @@ export const createRequestQueue = ({
concurrency: Math.ceil(network.maxRequestsPerSecond / 4),
initialStart: true,
browser: false,
worker: async (task: {
worker: async ({
request,
stopClockLag,
}: {
request: EIP1193Parameters<PublicRpcSchema>;
stopClockLag: () => number;
}) => {
common.metrics.ponder_rpc_request_lag.observe(
{ method: task.request.method, network: network.name },
task.stopClockLag(),
{ method: request.method, network: network.name },
stopClockLag(),
);

return await fetchRequest(task.request);
return await fetchRequest(request);
},
});

return {
...requestQueue,
request: <TParameters extends EIP1193Parameters<PublicRpcSchema>>(
params: TParameters,
) => {
const stopClockLag = startClock();

return requestQueue.add({ request: params, stopClockLag });
return queue.add({ request: params, stopClockLag });
},
subscribe: async (params: SubscribeParameters) => {
return await subscribe(params);
},
} as RequestQueue;
supports: (
method: EIP1193Parameters<PublicRpcSchema>["method"] | "eth_subscribe",
) => {
if (method === "eth_subscribe") {
return resolveWebsocketTransport(network.transport) !== undefined;
}

return true;
},
} as Rpc;
};

/**
Expand Down Expand Up @@ -175,3 +238,11 @@ function shouldRetry(error: Error) {
}
return true;
}

function resolveWebsocketTransport(transport: ReturnType<Transport>) {
if (transport.config.type === "webSocket") {
return transport as NonNullable<Awaited<ReturnType<WebSocketTransport>>>;
}

return undefined;
}
Loading
Loading