diff --git a/packages/store-sync/src/wiresaw.ts b/packages/store-sync/src/wiresaw.ts index cbbb432d1a..fcb9e4da1e 100644 --- a/packages/store-sync/src/wiresaw.ts +++ b/packages/store-sync/src/wiresaw.ts @@ -31,113 +31,113 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog let client: SocketRpcClient; - function setupClient(): void { + async function setupClient(): Promise { debug("setupClient called"); // Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed let caughtUp = false; const logBuffer: StoreEventsLog[] = []; - getWebSocketRpcClient(url, { + client = await getWebSocketRpcClient(url, { keepAlive: false, // keepAlive is handled below - }).then(async (_client) => { - debug("got websocket rpc client"); - client = _client; - - // Keep websocket alive and reconnect if it's not alive anymore - keepAliveInterval = setInterval(async () => { - if (client.socket.readyState !== client.socket.OPEN) { - debug("wanted to keep socket alive, but socket not open", client.socket.readyState); - return; - } + }); + debug("got websocket rpc client"); - try { - debug("keeping socket alive"); - client.requestAsync({ body: { method: "net_version" }, timeout: 2000 }); - } catch (error) { - debug("no response to keep alive, closing...", error); - clearInterval(keepAliveInterval); - client.close(); - } - }, 3000); + // Keep websocket alive and reconnect if it's not alive anymore + keepAliveInterval = setInterval(async () => { + if (client.socket.readyState !== client.socket.OPEN) { + debug("wanted to keep socket alive, but socket not open", client.socket.readyState); + return; + } - client.socket.addEventListener("error", (error) => { - debug("socket error", error); + try { + debug("keeping socket alive"); + client.requestAsync({ body: { method: "net_version" }, timeout: 2000 }); + } catch (error) { + debug("no response to keep alive, closing...", error); clearInterval(keepAliveInterval); - subscriber.error({ code: -32603, message: "WebSocket error", data: error }); - }); + client.close(); + } + }, 3000); - client.socket.addEventListener("close", () => { - debug("socket closed, trying to setup again..."); - clearInterval(keepAliveInterval); - setupClient(); + client.socket.addEventListener("error", (error) => { + debug("socket error", error); + clearInterval(keepAliveInterval); + subscriber.error({ code: -32603, message: "WebSocket error", data: error }); + }); + + client.socket.addEventListener("close", async () => { + debug("socket closed, trying to setup again..."); + clearInterval(keepAliveInterval); + setupClient().catch((error) => { + debug("error trying to setup new client", error); + subscriber.error(error); }); + }); - // Start watching pending logs - const subscriptionId: Hex = ( - await client.requestAsync({ - body: { - method: "wiresaw_watchLogs", - params: [{ address, topics }], - }, - }) - ).result; - debug("got watchLogs subscription", subscriptionId); - - // Listen for wiresaw_watchLogs subscription - // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. - // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) - client.socket.addEventListener("message", (message) => { - debug("got socket message", message); - - const response = JSON.parse(message.data); - if ("error" in response) { - debug("was error, returning error to subscriber"); - // Return JSON-RPC errors to the subscriber - subscriber.error(response.error); - return; - } + // Start watching pending logs + const subscriptionId: Hex = ( + await client.requestAsync({ + body: { + method: "wiresaw_watchLogs", + params: [{ address, topics }], + }, + }) + ).result; + debug("got watchLogs subscription", subscriptionId); + + // Listen for wiresaw_watchLogs subscription + // Need to use low level methods since viem's socekt client only handles `eth_subscription` messages. + // (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178) + client.socket.addEventListener("message", (message) => { + const response = JSON.parse(message.data); + if ("error" in response) { + debug("was error, returning error to subscriber"); + // Return JSON-RPC errors to the subscriber + subscriber.error(response.error); + return; + } - // Parse the logs from wiresaw_watchLogs - if ("params" in response && response.params.subscription === subscriptionId) { - debug("parsing logs"); - const logs: RpcLog[] = response.params.result; - const formattedLogs = logs.map((log) => formatLog(log)); - const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); - debug("got logs", parsedLogs); - if (caughtUp) { - debug("handing off logs to subscriber"); - const blockNumber = parsedLogs[0].blockNumber; - subscriber.next({ blockNumber, logs: parsedLogs }); - resumeBlock = blockNumber + 1n; - } else { - debug("buffering logs"); - logBuffer.push(...parsedLogs); - } - return; + // Parse the logs from wiresaw_watchLogs + if ("params" in response && response.params.subscription === subscriptionId) { + debug("parsing logs"); + const logs: RpcLog[] = response.params.result; + const formattedLogs = logs.map((log) => formatLog(log)); + const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs }); + debug("got logs", parsedLogs); + if (caughtUp) { + debug("handing off logs to subscriber"); + const blockNumber = parsedLogs[0].blockNumber; + subscriber.next({ blockNumber, logs: parsedLogs }); + resumeBlock = blockNumber + 1n; + } else { + debug("buffering logs"); + logBuffer.push(...parsedLogs); } - - debug("unknown message, skipping"); - }); - - // Catch up to the pending logs - try { - debug("fetching initial logs"); - const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); - debug("got logs", initialLogs); - const logs = [...initialLogs, ...logBuffer].sort(logSort); - const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; - subscriber.next({ blockNumber, logs: initialLogs }); - resumeBlock = blockNumber + 1n; - caughtUp = true; - } catch (error) { - debug("could not get initial logs", error); - subscriber.error("Could not fetch initial wiresaw logs"); + return; } }); + + // Catch up to the pending logs + try { + debug("fetching initial logs"); + const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics }); + debug("got logs", initialLogs); + const logs = [...initialLogs, ...logBuffer].sort(logSort); + const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock; + subscriber.next({ blockNumber, logs: initialLogs }); + resumeBlock = blockNumber + 1n; + caughtUp = true; + } catch (error) { + debug("could not get initial logs", error); + subscriber.error("Could not fetch initial wiresaw logs"); + } } - setupClient(); + setupClient().catch((error) => { + debug("error setting up initial client", error); + subscriber.error(error); + }); return () => { debug("logs$ subscription closed");