Skip to content

Commit

Permalink
fix: batch socket initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jxom committed May 17, 2023
1 parent 65a0896 commit e8c512a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 63 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-moons-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"viem": patch
---

Batched websocket initialization.
31 changes: 24 additions & 7 deletions src/utils/promise/createBatchScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@ type PendingPromise<TReturnType extends readonly unknown[] = any> = {

type SchedulerItem = { args: unknown; pendingPromise: PendingPromise }

export type CreateBatchSchedulerArguments<
TParameters = unknown,
TReturnType extends readonly unknown[] = readonly unknown[],
> = {
fn: (args: TParameters[]) => Promise<TReturnType>
id: number | string
shouldSplitBatch?: (args: TParameters[]) => boolean
wait?: number
}
export type CreateBatchSchedulerReturnType<
TParameters = unknown,
TReturnType extends readonly unknown[] = readonly unknown[],
> = {
flush: () => void
schedule: TParameters extends undefined
? (args?: TParameters) => Promise<Resolved<TReturnType>>
: (args: TParameters) => Promise<Resolved<TReturnType>>
}

const schedulerCache = new Map<number | string, SchedulerItem[]>()

export function createBatchScheduler<
Expand All @@ -20,12 +39,10 @@ export function createBatchScheduler<
id,
shouldSplitBatch,
wait = 0,
}: {
fn: (args: TParameters[]) => Promise<TReturnType>
id: number | string
shouldSplitBatch?: (args: TParameters[]) => boolean
wait?: number
}) {
}: CreateBatchSchedulerArguments<
TParameters,
TReturnType
>): CreateBatchSchedulerReturnType<TParameters, TReturnType> {
const exec = async () => {
const scheduler = getScheduler()
flush()
Expand Down Expand Up @@ -78,5 +95,5 @@ export function createBatchScheduler<
setTimeout(exec, wait)
return promise
},
}
} as unknown as CreateBatchSchedulerReturnType<TParameters, TReturnType>
}
15 changes: 8 additions & 7 deletions src/utils/rpc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,16 @@ describe('getSocket', () => {
})

test('multiple invocations on a url only opens one socket', async () => {
const url = 'ws://127.0.0.1:8545/69420'
const [socket, socket2, socket3, socket4] = await Promise.all([
getSocket(localWsUrl),
getSocket(localWsUrl),
getSocket(localWsUrl),
getSocket(localWsUrl),
getSocket(url),
getSocket(url),
getSocket(url),
getSocket(url),
])
expect(socket).toBe(socket2)
expect(socket).toBe(socket3)
expect(socket).toBe(socket4)
expect(socket).toEqual(socket2)
expect(socket).toEqual(socket3)
expect(socket).toEqual(socket4)
})
})

Expand Down
108 changes: 59 additions & 49 deletions src/utils/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
WebSocketRequestError,
} from '../errors/request.js'

import { createBatchScheduler } from './promise/createBatchScheduler.js'
import { withTimeout } from './promise/withTimeout.js'
import { stringify } from './stringify.js'

Expand Down Expand Up @@ -146,61 +147,70 @@ export async function getSocket(url_: string) {
// If the socket already exists, return it.
if (socket) return socket

let WebSocket = await import('isomorphic-ws')
// Workaround for Vite.
// https://github.com/vitejs/vite/issues/9703
// TODO: Remove when issue is resolved.
if (
(WebSocket as unknown as { default?: typeof WebSocket }).default
?.constructor
)
WebSocket = (WebSocket as unknown as { default: typeof WebSocket }).default
else WebSocket = WebSocket.WebSocket

const webSocket = new WebSocket(url)

// Set up a cache for incoming "synchronous" requests.
const requests = new Map<Id, CallbackFn>()
const { schedule } = createBatchScheduler<undefined, [Socket]>({
id: urlKey,
fn: async () => {
let WebSocket = await import('isomorphic-ws')
// Workaround for Vite.
// https://github.com/vitejs/vite/issues/9703
// TODO: Remove when issue is resolved.
if (
(WebSocket as unknown as { default?: typeof WebSocket }).default
?.constructor
)
WebSocket = (WebSocket as unknown as { default: typeof WebSocket })
.default
else WebSocket = WebSocket.WebSocket

const webSocket = new WebSocket(url)

// Set up a cache for incoming "synchronous" requests.
const requests = new Map<Id, CallbackFn>()

// Set up a cache for subscriptions (eth_subscribe).
const subscriptions = new Map<Id, CallbackFn>()

const onMessage: (event: MessageEvent) => void = ({ data }) => {
const message: RpcResponse = JSON.parse(data as string)
const isSubscription = message.method === 'eth_subscription'
const id = isSubscription ? message.params.subscription : message.id
const cache = isSubscription ? subscriptions : requests
const callback = cache.get(id)
if (callback) callback({ data })
if (!isSubscription) cache.delete(id)
}
const onClose = () => {
sockets.delete(urlKey)
webSocket.removeEventListener('close', onClose)
webSocket.removeEventListener('message', onMessage)
}

// Set up a cache for subscriptions (eth_subscribe).
const subscriptions = new Map<Id, CallbackFn>()
// Setup event listeners for RPC & subscription responses.
webSocket.addEventListener('close', onClose)
webSocket.addEventListener('message', onMessage)

const onMessage: (event: MessageEvent) => void = ({ data }) => {
const message: RpcResponse = JSON.parse(data as string)
const isSubscription = message.method === 'eth_subscription'
const id = isSubscription ? message.params.subscription : message.id
const cache = isSubscription ? subscriptions : requests
const callback = cache.get(id)
if (callback) callback({ data })
if (!isSubscription) cache.delete(id)
}
const onClose = () => {
sockets.delete(urlKey)
webSocket.removeEventListener('close', onClose)
webSocket.removeEventListener('message', onMessage)
}

// Setup event listeners for RPC & subscription responses.
webSocket.addEventListener('close', onClose)
webSocket.addEventListener('message', onMessage)
// Wait for the socket to open.
if (webSocket.readyState === WebSocket.CONNECTING) {
await new Promise((resolve, reject) => {
if (!webSocket) return
webSocket.onopen = resolve
webSocket.onerror = reject
})
}

// Wait for the socket to open.
if (webSocket.readyState === WebSocket.CONNECTING) {
await new Promise((resolve, reject) => {
if (!webSocket) return
webSocket.onopen = resolve
webSocket.onerror = reject
})
}
// Create a new socket instance.
socket = Object.assign(webSocket, {
requests,
subscriptions,
})
sockets.set(urlKey, socket)

// Create a new socket instance.
socket = Object.assign(webSocket, {
requests,
subscriptions,
return [socket]
},
})
sockets.set(urlKey, socket)

return socket
const [_, [socket_]] = await schedule()
return socket_
}

function webSocket(
Expand Down

0 comments on commit e8c512a

Please sign in to comment.