Skip to content

Commit

Permalink
feat: allow user to unsubscribe when receiving an error (#351)
Browse files Browse the repository at this point in the history
In the prior PR, the user had no mechanism to unsubscribe / cancel the
stream. In this PR we allow the user to unsubscribe via the subscription response.
We return the response from subscribe and also pass it as an argument to the error handler.
  • Loading branch information
malandis authored Mar 17, 2023
1 parent bc6d74b commit 8e9544b
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 30 deletions.
92 changes: 70 additions & 22 deletions src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {TopicClientProps} from '../topic-client-props';
import {middlewaresInterceptor} from './grpc/middlewares-interceptor';
import {truncateString} from './utils/display';
import {SubscribeCallOptions} from '../utils/topic-call-options';
import {SubscriptionState} from './subscription-state';

export class PubsubClient {
private readonly clientWrapper: GrpcClientWrapper<grpcPubsub.PubsubClient>;
Expand Down Expand Up @@ -109,7 +110,7 @@ export class PubsubClient {
validateCacheName(cacheName);
// todo: validate topic name
} catch (err) {
throw normalizeSdkError(err as Error);
return new TopicPublish.Error(normalizeSdkError(err as Error));
}
this.logger.trace(
'Issuing publish request; topic: %s, message length: %s',
Expand Down Expand Up @@ -159,54 +160,85 @@ export class PubsubClient {
cacheName: string,
topicName: string,
options: SubscribeCallOptions
): Promise<void> {
): Promise<TopicSubscribe.Response> {
try {
validateCacheName(cacheName);
// TODO: validate topic name
} catch (err) {
throw normalizeSdkError(err as Error);
return new TopicSubscribe.Error(normalizeSdkError(err as Error));
}
this.logger.trace(
'Issuing subscribe request; topic: %s',
truncateString(topicName)
);

return await new Promise(resolve => {
this.sendSubscribe(cacheName, topicName, options, 0);
resolve();
const subscriptionState: SubscriptionState = new SubscriptionState();
const subscription = new TopicSubscribe.Subscription(subscriptionState);
this.sendSubscribe(
cacheName,
topicName,
options,
subscriptionState,
subscription
);
resolve(subscription);
});
}

/**
*
*
* @private
* @param {string} cacheName
* @param {string} topicName
* @param {SubscribeCallOptions} options
* @param {SubscriptionState} subscriptionState
* @param {TopicSubscribe.Subscription} [subscription]
* @return {*} {void}
* @memberof PubsubClient
*
* @remark This method is responsible for reconnecting the stream if it ends unexpectedly.
* Since we return a single subscription object to the user, we need to update it with the
* unsubscribe function should we restart the stream. This is why we pass the subscription
* state and subscription object to this method.
*/
private sendSubscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions,
resumeAtTopicSequenceNumber = 0
subscriptionState: SubscriptionState,
subscription: TopicSubscribe.Subscription
): void {
const request = new grpcPubsub._SubscriptionRequest({
cache_name: cacheName,
topic: topicName,
resume_at_topic_sequence_number: resumeAtTopicSequenceNumber,
resume_at_topic_sequence_number:
subscriptionState.resumeAtTopicSequenceNumber,
});

const call = this.clientWrapper.getClient().Subscribe(request, {
interceptors: this.streamingInterceptors,
});
subscriptionState.setSubscribed();

// The following are the outer handlers for the stream.
// They are responsible for reconnecting the stream if it ends unexpectedly, and for
// building the API facing response objects.

// The last topic sequence number we received. This is used to resume the stream.
// If resumeAtTopicSequenceNumber is 0, then we reconnect from the beginning again.
// Otherwise we resume starting from the next sequence number.
let lastTopicSequenceNumber =
resumeAtTopicSequenceNumber === 0 ? -1 : resumeAtTopicSequenceNumber;
// Whether the stream was restarted due to an error. If so, we short circuit the end stream handler
// as the error handler will restart the stream.
let restartedDueToError = false;

// Allow the caller to cancel the stream.
// Note that because we restart the stream on error or stream end,
// we need to ensure we keep the same subscription object. That way
// stream restarts are transparent to the caller.
subscriptionState.unsubscribeFn = () => {
call.cancel();
};

call
.on('data', (resp: grpcPubsub._SubscriptionItem) => {
if (resp?.item) {
lastTopicSequenceNumber = resp.item.topic_sequence_number;
subscriptionState.lastTopicSequenceNumber =
resp.item.topic_sequence_number;
if (resp.item.value.text) {
options.onItem(new TopicSubscribe.Item(resp.item.value.text));
} else if (resp.item.value.binary) {
Expand All @@ -225,8 +257,13 @@ export class PubsubClient {
}
})
.on('error', (err: Error) => {
// When the caller unsubscribes, we may get a follow on error, which we ignore.
if (!subscriptionState.isSubscribed) {
return;
}

const serviceError = err as unknown as ServiceError;
// The service cuts the the stream after ~1 minute. Hence we reconnect.
// The service cuts the the stream after a period of time. Hence we reconnect.
if (
serviceError.code === Status.INTERNAL &&
serviceError.details === 'Received RST_STREAM with code 0'
Expand All @@ -238,33 +275,44 @@ export class PubsubClient {
cacheName,
topicName,
options,
lastTopicSequenceNumber + 1
subscriptionState,
subscription
);
restartedDueToError = true;
return;
}

// Otherwise we propagate the error to the caller.
options.onError(
new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError))
new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError)),
subscription
);
})
.on('end', () => {
// The stream could have already been restarted due to an error.
// We want to reconnect on stream end, except if:
// 1. The stream was cancelled by the caller.
// 2. The stream was restarted due to an error.
if (restartedDueToError) {
this.logger.trace(
'Stream ended after error but was restarted on topic: %s',
topicName
);
return;
} else if (!subscriptionState.isSubscribed) {
this.logger.trace(
'Stream ended after unsubscribe on topic: %s',
topicName
);
return;
}

this.logger.trace('Stream ended on topic: %s; restarting.', topicName);
this.sendSubscribe(
cacheName,
topicName,
options,
lastTopicSequenceNumber + 1
subscriptionState,
subscription
);
});
}
Expand Down
41 changes: 41 additions & 0 deletions src/internal/subscription-state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Encapsulates a topic subscription stream state.
*/
export class SubscriptionState {
private _unsubscribeFn: () => void;
public lastTopicSequenceNumber?: number;
private _isSubscribed: boolean;
constructor() {
this._unsubscribeFn = () => {
return;
};
this._isSubscribed = false;
}

public get resumeAtTopicSequenceNumber(): number {
return (this.lastTopicSequenceNumber ?? -1) + 1;
}

public setSubscribed(): void {
this._isSubscribed = true;
}

public setUnsubscribed(): void {
this._isSubscribed = false;
}

public get isSubscribed(): boolean {
return this._isSubscribed;
}

public set unsubscribeFn(unsubscribeFn: () => void) {
this._unsubscribeFn = unsubscribeFn;
}

public unsubscribe(): void {
if (this.isSubscribed) {
this._unsubscribeFn();
this.setUnsubscribed();
}
}
}
34 changes: 30 additions & 4 deletions src/messages/responses/topic-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@
import {SdkError} from '../../errors/errors';
import {ResponseBase, ResponseError} from './response-base';
import {truncateString} from '../../internal/utils/display';
import {SubscriptionState} from '../../internal/subscription-state';

/**
* Parent response type for a cache get request. The
* response object is resolved to a type-safe object of one of
* the following subtypes:
*
* - {Hit}
* - {Miss}
* - {Subscription}
* - {Item}
* - {Error}
*
* `instanceof` type guards can be used to operate on the appropriate subtype.
* @example
* For example:
* ```
* if (response instanceof CacheGet.Error) {
* if (response instanceof TopicSubscribe.Error) {
* // Handle error as appropriate. The compiler will smart-cast `response` to type
* // `CacheGet.Error` in this block, so you will have access to the properties
* // of the Error class; e.g. `response.errorCode()`.
Expand Down Expand Up @@ -45,14 +46,39 @@ export class Item extends Response {
}
}

/**
* Encapsulates a topic subscription.
*
* @remarks Currently allows unsubscribing from the topic.
* In the future, this may be extended to include additional
* statistics about the subscription.
*/
export class Subscription extends Response {
private subscriptionState: SubscriptionState;

constructor(subscriptionState: SubscriptionState) {
super();
this.subscriptionState = subscriptionState;
}

/**
* Unsubscribes from the topic.
*
* @returns void
*/
public unsubscribe(): void {
this.subscriptionState.unsubscribe();
}
}

class _Error extends Response {
constructor(protected _innerException: SdkError) {
super();
}
}

/**
* Indicates that an error occurred during the cache get request.
* Indicates that an error occurred during the topic subscribe request.
*
* This response object includes the following fields that you can use to determine
* how you would like to handle the error:
Expand Down
8 changes: 5 additions & 3 deletions src/topic-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {PubsubClient} from './internal/pubsub-client';
import {TopicPublish, MomentoLogger} from '.';
import {TopicPublish, MomentoLogger, TopicSubscribe} from '.';
import {TopicClientProps} from './topic-client-props';
import {SubscribeCallOptions} from './utils/topic-call-options';

Expand Down Expand Up @@ -48,13 +48,15 @@ export class TopicClient {
* @param {SubscribeCallOptions} options - The options for the subscription.
* @param {function} options.onItem - The callback to invoke when data is received.
* @param {function} options.onError - The callback to invoke when an error is received.
* @returns
* @returns {Promise<TopicSubscribe.Response>} -
* {@link TopicSubscribe.Subscription} on success.
* {@link TopicSubscribe.Error} on failure.
*/
public async subscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions
): Promise<void> {
): Promise<TopicSubscribe.Response> {
return await this.client.subscribe(cacheName, topicName, options);
}
}
6 changes: 5 additions & 1 deletion src/utils/topic-call-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export interface SubscribeCallOptions {
* The callback to invoke when an error is received from the topic subscription.
*
* @param error The error received from the topic subscription.
* @param subscription The subscription that received the error.
*/
onError(error: TopicSubscribe.Error): void;
onError(
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
): void;
}

0 comments on commit 8e9544b

Please sign in to comment.