From e6672e84392f3e9b5989bd439b8ccddb9aa926fa Mon Sep 17 00:00:00 2001 From: steveluscher Date: Thu, 20 Oct 2022 15:44:09 -0700 Subject: [PATCH] chore: create internal method for subscribing to subscription state changes --- web3.js/src/connection.ts | 95 +++++++++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 14 deletions(-) diff --git a/web3.js/src/connection.ts b/web3.js/src/connection.ts index 7794cbf3e65225..391a65288572f5 100644 --- a/web3.js/src/connection.ts +++ b/web3.js/src/connection.ts @@ -88,6 +88,10 @@ type ClientSubscriptionId = number; /** @internal */ type ServerSubscriptionId = number; /** @internal */ type SubscriptionConfigHash = string; /** @internal */ type SubscriptionDisposeFn = () => Promise; +/** @internal */ type SubscriptionStateChangeCallback = ( + nextState: StatefulSubscription['state'], +) => void; +/** @internal */ type SubscriptionStateChangeDisposeFn = () => void; /** * @internal * Every subscription contains the args used to open the subscription with @@ -2715,6 +2719,16 @@ export class Connection { | SubscriptionDisposeFn | undefined; } = {}; + /** @internal */ private _subscriptionHashByClientSubscriptionId: { + [clientSubscriptionId: ClientSubscriptionId]: + | SubscriptionConfigHash + | undefined; + } = {}; + /** @internal */ private _subscriptionStateChangeCallbacksByHash: { + [hash: SubscriptionConfigHash]: + | Set + | undefined; + } = {}; /** @internal */ private _subscriptionCallbacksByServerSubscriptionId: { [serverSubscriptionId: ServerSubscriptionId]: | Set @@ -5106,13 +5120,60 @@ export class Connection { Object.entries( this._subscriptionsByHash as Record, ).forEach(([hash, subscription]) => { - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, state: 'pending', - }; + }); }); } + /** + * @internal + */ + private _setSubscription( + hash: SubscriptionConfigHash, + nextSubscription: Subscription, + ) { + const prevState = this._subscriptionsByHash[hash]?.state; + this._subscriptionsByHash[hash] = nextSubscription; + if (prevState !== nextSubscription.state) { + const stateChangeCallbacks = + this._subscriptionStateChangeCallbacksByHash[hash]; + if (stateChangeCallbacks) { + stateChangeCallbacks.forEach(cb => { + try { + cb(nextSubscription.state); + // eslint-disable-next-line no-empty + } catch {} + }); + } + } + } + + /** + * @internal + */ // @ts-ignore + private _onSubscriptionStateChange( + clientSubscriptionId: ClientSubscriptionId, + callback: SubscriptionStateChangeCallback, + ): SubscriptionStateChangeDisposeFn { + const hash = + this._subscriptionHashByClientSubscriptionId[clientSubscriptionId]; + if (hash == null) { + return () => {}; + } + const stateChangeCallbacks = (this._subscriptionStateChangeCallbacksByHash[ + hash + ] ||= new Set()); + stateChangeCallbacks.add(callback); + return () => { + stateChangeCallbacks.delete(callback); + if (stateChangeCallbacks.size === 0) { + delete this._subscriptionStateChangeCallbacksByHash[hash]; + } + }; + } + /** * @internal */ @@ -5193,17 +5254,17 @@ export class Connection { await (async () => { const {args, method} = subscription; try { - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, state: 'subscribing', - }; + }); const serverSubscriptionId: ServerSubscriptionId = (await this._rpcWebSocket.call(method, args)) as number; - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, serverSubscriptionId, state: 'subscribed', - }; + }); this._subscriptionCallbacksByServerSubscriptionId[ serverSubscriptionId ] = subscription.callbacks; @@ -5220,10 +5281,10 @@ export class Connection { return; } // TODO: Maybe add an 'errored' state or a retry limit? - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, state: 'pending', - }; + }); await this._updateSubscriptions(); } })(); @@ -5251,10 +5312,14 @@ export class Connection { serverSubscriptionId, ); } else { - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { + ...subscription, + state: 'unsubscribing', + }); + this._setSubscription(hash, { ...subscription, state: 'unsubscribing', - }; + }); try { await this._rpcWebSocket.call(unsubscribeMethod, [ serverSubscriptionId, @@ -5267,18 +5332,18 @@ export class Connection { return; } // TODO: Maybe add an 'errored' state or a retry limit? - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, state: 'subscribed', - }; + }); await this._updateSubscriptions(); return; } } - this._subscriptionsByHash[hash] = { + this._setSubscription(hash, { ...subscription, state: 'unsubscribed', - }; + }); await this._updateSubscriptions(); })(); } @@ -5381,12 +5446,14 @@ export class Connection { } else { existingSubscription.callbacks.add(subscriptionConfig.callback); } + this._subscriptionHashByClientSubscriptionId[clientSubscriptionId] = hash; this._subscriptionDisposeFunctionsByClientSubscriptionId[ clientSubscriptionId ] = async () => { delete this._subscriptionDisposeFunctionsByClientSubscriptionId[ clientSubscriptionId ]; + delete this._subscriptionHashByClientSubscriptionId[clientSubscriptionId]; const subscription = this._subscriptionsByHash[hash]; assert( subscription !== undefined,