Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add more comments to the token refreshing topic client example #1438

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions examples/web/cache/refresh-disposable-tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
return disposableToken;
}

async function tokenVendingMachine(): Promise<{token: string; expiresAt: ExpiresAt}> {

Check warning on line 42 in examples/web/cache/refresh-disposable-tokens.ts

View workflow job for this annotation

GitHub Actions / Test web examples on node 16

'tokenVendingMachine' is defined but never used

Check warning on line 42 in examples/web/cache/refresh-disposable-tokens.ts

View workflow job for this annotation

GitHub Actions / Test web examples on node 18

'tokenVendingMachine' is defined but never used
const resp = await fetch(process.env.TVM_ENDPOINT as string);
const respJson = (await resp.json()) as {authToken: string; expiresAt: number};
const disposableToken = {
Expand All @@ -54,6 +54,11 @@
getDisposableToken: () => Promise<{token: string; expiresAt: ExpiresAt}>;
}

// This wrapper class makes it easy to use disposable auth tokens with the TopicClient.
// At some user-specified time before the token expires (refreshBeforeExpiryMs), a new
// disposable token will be fetched via the user-specified getDisposableToken function
// and the new token is used to create a new TopicClient instance. All active subscriptions
// are transferred to the new client, then the old client is replaced by the new one.
class TokenRefreshingTopicClient {
topicClient: TopicClient;
refreshBeforeExpiryMs: number;
Expand All @@ -75,6 +80,10 @@
this.getDisposableToken = props.getDisposableToken;
}

// The wrapper class requires an async initialization function to set up the
// first TopicClient instance since the constructor cannot be async and the
// getDisposableToken function is async.
// A new TopicClient requires a new CredentialProvider with the new disposable token.
private async initialize() {
const disposableToken = await this.getDisposableToken();
this.topicClient = new TopicClient({
Expand All @@ -87,12 +96,14 @@
console.log('Initialized topic client and set first timeout');
}

// create() is a factory method that creates a new instance of the wrapper class.
static async create(props: TokenRefreshingTopicClientProps) {
const client = new TokenRefreshingTopicClient(props);
await client.initialize();
return client;
}

// refreshToken() is called when the disposable token is about to expire.
private async refreshToken() {
console.log('Disposable token expiring soon, refreshing topic client with new token');
const disposableToken = await this.getDisposableToken();
Expand All @@ -104,7 +115,8 @@
await this.refreshToken();
}, getRefreshAfterMs(disposableToken.expiresAt, this.refreshBeforeExpiryMs));

// for each active subscription, unsubscribe from the old topic client and subscribe to the new one
// For each active subscription, make sure to start the same subscription on the new client,
// transfer over the existing onItem and onError callbacks, then unsubscribe from the old client.
for (const key in this.activeSubscriptions) {
const value = this.activeSubscriptions[key];
const newSubscription = await newTopicClient.subscribe(value.cacheName, value.topicName, {
Expand All @@ -113,6 +125,8 @@
});
value.unsubscribe();

// Once the new subscription is established, update the stored unsubscribe function
// in the activeSubscriptions record.
if (newSubscription.type === TopicSubscribeResponse.Error) {
console.error(`Error subscribing to topic: ${newSubscription.toString()}`);
} else {
Expand All @@ -121,9 +135,13 @@
};
}
}

// Once all subscriptions are transferred, replace the old client with the new one.
this.topicClient = newTopicClient;
}

// Simply passes a publish request to the underlying TopicClient instance.
// Calls the onError callback if the publish request fails.
async publish(cacheName: string, topicName: string, message: string, onError?: (resp: TopicPublish.Error) => void) {
const resp = await this.topicClient.publish(cacheName, topicName, message);
if (resp.type === TopicPublishResponse.Error) {
Expand All @@ -134,6 +152,11 @@
}
}

// Subscribes to a topic and stores the subscription and callbacks in the
// activeSubscriptions record. The wrappedOnItem callback is a wrapper around
// the user-provided onItem callback. The wrapper ensures that duplicate messages
// are not processed by the user code since there could be some overlap in message
// delivery between the old and new TopicClient instances when refreshing the client.
async subscribe(
cacheName: string,
topicName: string,
Expand All @@ -146,7 +169,7 @@

const wrappedOnItem = (item: TopicItem) => {
const currentSubscription = this.activeSubscriptions[`${cacheName}:${topicName}`];
// pass through to user-provided onItem only if message hasn't been processed before
// Pass item through to user-provided onItem only if message hasn't been processed before
if (item.sequenceNumber() > currentSubscription.lastSequenceNumber) {
options.onItem(item);
currentSubscription.lastSequenceNumber = item.sequenceNumber();
Expand All @@ -164,8 +187,8 @@
console.log(`Subscribed to ${cacheName}:${topicName}`);
const key = `${cacheName}:${topicName}`;

// if key already exists, update the existing subscription
// otherwise make new record
// If the subscription already exists, update the existing subscription to include the
// unsubscribe function. Otherwise, make a new record with all necessary info.
if (key in this.activeSubscriptions) {
this.activeSubscriptions[key].unsubscribe = () => {
resp.unsubscribe();
Expand All @@ -187,6 +210,7 @@
}
}

// Helper function for setting the correct SetTimeout value for refreshing the token.
function getRefreshAfterMs(expiresAt: ExpiresAt, refreshBefore: number): number {
const refreshingIn = expiresAt.epoch() * 1000 - Date.now() - refreshBefore;
console.log(`Refreshing in ${refreshingIn} ms`);
Expand Down Expand Up @@ -214,7 +238,13 @@

const wrappedTopicClient = await TokenRefreshingTopicClient.create({
refreshBeforeExpiryMs: 10_000, // 10 seconds before token expires, refresh it.
/******************************************************************
* By default, this demo uses the localTokenVendingMachine() function.
* Update this line and update the tokenVendingMachine() function as needed
* to use your own deployed token vending machine.
******************************************************************/
getDisposableToken: localTokenVendingMachine,
/*****************************************************************/
});

await wrappedTopicClient.subscribe('my-cache', 'topic-1', {
Expand Down
Loading