Skip to content

Commit

Permalink
chore: create internal method for subscribing to subscription state c…
Browse files Browse the repository at this point in the history
…hanges
  • Loading branch information
steveluscher committed Oct 20, 2022
1 parent f207af7 commit e6672e8
Showing 1 changed file with 81 additions and 14 deletions.
95 changes: 81 additions & 14 deletions web3.js/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type ClientSubscriptionId = number;
/** @internal */ type ServerSubscriptionId = number;
/** @internal */ type SubscriptionConfigHash = string;
/** @internal */ type SubscriptionDisposeFn = () => Promise<void>;
/** @internal */ type SubscriptionStateChangeCallback = (
nextState: StatefulSubscription['state'],
) => void;
/** @internal */ type SubscriptionStateChangeDisposeFn = () => void;
/**
* @internal
* Every subscription contains the args used to open the subscription with
Expand Down Expand Up @@ -2715,6 +2719,16 @@ export class Connection {
| SubscriptionDisposeFn
| undefined;
} = {};
/** @internal */ private _subscriptionHashByClientSubscriptionId: {
[clientSubscriptionId: ClientSubscriptionId]:
| SubscriptionConfigHash
| undefined;
} = {};
/** @internal */ private _subscriptionStateChangeCallbacksByHash: {
[hash: SubscriptionConfigHash]:
| Set<SubscriptionStateChangeCallback>
| undefined;
} = {};
/** @internal */ private _subscriptionCallbacksByServerSubscriptionId: {
[serverSubscriptionId: ServerSubscriptionId]:
| Set<SubscriptionConfig['callback']>
Expand Down Expand Up @@ -5106,13 +5120,60 @@ export class Connection {
Object.entries(
this._subscriptionsByHash as Record<SubscriptionConfigHash, Subscription>,
).forEach(([hash, subscription]) => {
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'pending',
};
});
});
}

/**
* @internal
*/
private _setSubscription(
hash: SubscriptionConfigHash,
nextSubscription: Subscription,
) {
const prevState = this._subscriptionsByHash[hash]?.state;
this._subscriptionsByHash[hash] = nextSubscription;
if (prevState !== nextSubscription.state) {
const stateChangeCallbacks =
this._subscriptionStateChangeCallbacksByHash[hash];
if (stateChangeCallbacks) {
stateChangeCallbacks.forEach(cb => {
try {
cb(nextSubscription.state);
// eslint-disable-next-line no-empty
} catch {}
});
}
}
}

/**
* @internal
*/ // @ts-ignore
private _onSubscriptionStateChange(
clientSubscriptionId: ClientSubscriptionId,
callback: SubscriptionStateChangeCallback,
): SubscriptionStateChangeDisposeFn {
const hash =
this._subscriptionHashByClientSubscriptionId[clientSubscriptionId];
if (hash == null) {
return () => {};
}
const stateChangeCallbacks = (this._subscriptionStateChangeCallbacksByHash[
hash
] ||= new Set());
stateChangeCallbacks.add(callback);
return () => {
stateChangeCallbacks.delete(callback);
if (stateChangeCallbacks.size === 0) {
delete this._subscriptionStateChangeCallbacksByHash[hash];
}
};
}

/**
* @internal
*/
Expand Down Expand Up @@ -5193,17 +5254,17 @@ export class Connection {
await (async () => {
const {args, method} = subscription;
try {
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'subscribing',
};
});
const serverSubscriptionId: ServerSubscriptionId =
(await this._rpcWebSocket.call(method, args)) as number;
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
serverSubscriptionId,
state: 'subscribed',
};
});
this._subscriptionCallbacksByServerSubscriptionId[
serverSubscriptionId
] = subscription.callbacks;
Expand All @@ -5220,10 +5281,10 @@ export class Connection {
return;
}
// TODO: Maybe add an 'errored' state or a retry limit?
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'pending',
};
});
await this._updateSubscriptions();
}
})();
Expand Down Expand Up @@ -5251,10 +5312,14 @@ export class Connection {
serverSubscriptionId,
);
} else {
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'unsubscribing',
});
this._setSubscription(hash, {
...subscription,
state: 'unsubscribing',
};
});
try {
await this._rpcWebSocket.call(unsubscribeMethod, [
serverSubscriptionId,
Expand All @@ -5267,18 +5332,18 @@ export class Connection {
return;
}
// TODO: Maybe add an 'errored' state or a retry limit?
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'subscribed',
};
});
await this._updateSubscriptions();
return;
}
}
this._subscriptionsByHash[hash] = {
this._setSubscription(hash, {
...subscription,
state: 'unsubscribed',
};
});
await this._updateSubscriptions();
})();
}
Expand Down Expand Up @@ -5381,12 +5446,14 @@ export class Connection {
} else {
existingSubscription.callbacks.add(subscriptionConfig.callback);
}
this._subscriptionHashByClientSubscriptionId[clientSubscriptionId] = hash;
this._subscriptionDisposeFunctionsByClientSubscriptionId[
clientSubscriptionId
] = async () => {
delete this._subscriptionDisposeFunctionsByClientSubscriptionId[
clientSubscriptionId
];
delete this._subscriptionHashByClientSubscriptionId[clientSubscriptionId];
const subscription = this._subscriptionsByHash[hash];
assert(
subscription !== undefined,
Expand Down

0 comments on commit e6672e8

Please sign in to comment.