Skip to content

Commit

Permalink
try more async
Browse files Browse the repository at this point in the history
  • Loading branch information
holic committed Oct 26, 2024
1 parent 9f0ee91 commit cb78e29
Showing 1 changed file with 88 additions and 88 deletions.
176 changes: 88 additions & 88 deletions packages/store-sync/src/wiresaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,113 +31,113 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog

let client: SocketRpcClient<WebSocket>;

function setupClient(): void {
async function setupClient(): Promise<void> {
debug("setupClient called");

// Buffer the live logs received until the gap from `startBlock` to `currentBlock` is closed
let caughtUp = false;
const logBuffer: StoreEventsLog[] = [];

getWebSocketRpcClient(url, {
client = await getWebSocketRpcClient(url, {
keepAlive: false, // keepAlive is handled below
}).then(async (_client) => {
debug("got websocket rpc client");
client = _client;

// Keep websocket alive and reconnect if it's not alive anymore
keepAliveInterval = setInterval(async () => {
if (client.socket.readyState !== client.socket.OPEN) {
debug("wanted to keep socket alive, but socket not open", client.socket.readyState);
return;
}
});
debug("got websocket rpc client");

try {
debug("keeping socket alive");
client.requestAsync({ body: { method: "net_version" }, timeout: 2000 });
} catch (error) {
debug("no response to keep alive, closing...", error);
clearInterval(keepAliveInterval);
client.close();
}
}, 3000);
// Keep websocket alive and reconnect if it's not alive anymore
keepAliveInterval = setInterval(async () => {
if (client.socket.readyState !== client.socket.OPEN) {
debug("wanted to keep socket alive, but socket not open", client.socket.readyState);
return;
}

client.socket.addEventListener("error", (error) => {
debug("socket error", error);
try {
debug("keeping socket alive");
client.requestAsync({ body: { method: "net_version" }, timeout: 2000 });
} catch (error) {
debug("no response to keep alive, closing...", error);
clearInterval(keepAliveInterval);
subscriber.error({ code: -32603, message: "WebSocket error", data: error });
});
client.close();
}
}, 3000);

client.socket.addEventListener("close", () => {
debug("socket closed, trying to setup again...");
clearInterval(keepAliveInterval);
setupClient();
client.socket.addEventListener("error", (error) => {
debug("socket error", error);
clearInterval(keepAliveInterval);
subscriber.error({ code: -32603, message: "WebSocket error", data: error });
});

client.socket.addEventListener("close", async () => {
debug("socket closed, trying to setup again...");
clearInterval(keepAliveInterval);
setupClient().catch((error) => {
debug("error trying to setup new client", error);
subscriber.error(error);
});
});

// Start watching pending logs
const subscriptionId: Hex = (
await client.requestAsync({
body: {
method: "wiresaw_watchLogs",
params: [{ address, topics }],
},
})
).result;
debug("got watchLogs subscription", subscriptionId);

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
debug("got socket message", message);

const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}
// Start watching pending logs
const subscriptionId: Hex = (
await client.requestAsync({
body: {
method: "wiresaw_watchLogs",
params: [{ address, topics }],
},
})
).result;
debug("got watchLogs subscription", subscriptionId);

// Listen for wiresaw_watchLogs subscription
// Need to use low level methods since viem's socekt client only handles `eth_subscription` messages.
// (https://github.com/wevm/viem/blob/f81d497f2afc11b9b81a79057d1f797694b69793/src/utils/rpc/socket.ts#L178)
client.socket.addEventListener("message", (message) => {
const response = JSON.parse(message.data);
if ("error" in response) {
debug("was error, returning error to subscriber");
// Return JSON-RPC errors to the subscriber
subscriber.error(response.error);
return;
}

// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
}
return;
// Parse the logs from wiresaw_watchLogs
if ("params" in response && response.params.subscription === subscriptionId) {
debug("parsing logs");
const logs: RpcLog[] = response.params.result;
const formattedLogs = logs.map((log) => formatLog(log));
const parsedLogs = parseEventLogs({ abi: storeEventsAbi, logs: formattedLogs });
debug("got logs", parsedLogs);
if (caughtUp) {
debug("handing off logs to subscriber");
const blockNumber = parsedLogs[0].blockNumber;
subscriber.next({ blockNumber, logs: parsedLogs });
resumeBlock = blockNumber + 1n;
} else {
debug("buffering logs");
logBuffer.push(...parsedLogs);
}

debug("unknown message, skipping");
});

// Catch up to the pending logs
try {
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (error) {
debug("could not get initial logs", error);
subscriber.error("Could not fetch initial wiresaw logs");
return;
}
});

// Catch up to the pending logs
try {
debug("fetching initial logs");
const initialLogs = await fetchInitialLogs({ client, address, fromBlock: resumeBlock, topics });
debug("got logs", initialLogs);
const logs = [...initialLogs, ...logBuffer].sort(logSort);
const blockNumber = logs.at(-1)?.blockNumber ?? resumeBlock;
subscriber.next({ blockNumber, logs: initialLogs });
resumeBlock = blockNumber + 1n;
caughtUp = true;
} catch (error) {
debug("could not get initial logs", error);
subscriber.error("Could not fetch initial wiresaw logs");
}
}

setupClient();
setupClient().catch((error) => {
debug("error setting up initial client", error);
subscriber.error(error);
});

return () => {
debug("logs$ subscription closed");
Expand Down

0 comments on commit cb78e29

Please sign in to comment.