diff --git a/src/internal/pubsub-client.ts b/src/internal/pubsub-client.ts index 625bfcce0..ec231e79d 100644 --- a/src/internal/pubsub-client.ts +++ b/src/internal/pubsub-client.ts @@ -24,6 +24,7 @@ import {TopicClientProps} from '../topic-client-props'; import {middlewaresInterceptor} from './grpc/middlewares-interceptor'; import {truncateString} from './utils/display'; import {SubscribeCallOptions} from '../utils/topic-call-options'; +import {SubscriptionState} from './subscription-state'; export class PubsubClient { private readonly clientWrapper: GrpcClientWrapper; @@ -109,7 +110,7 @@ export class PubsubClient { validateCacheName(cacheName); // todo: validate topic name } catch (err) { - throw normalizeSdkError(err as Error); + return new TopicPublish.Error(normalizeSdkError(err as Error)); } this.logger.trace( 'Issuing publish request; topic: %s, message length: %s', @@ -159,12 +160,12 @@ export class PubsubClient { cacheName: string, topicName: string, options: SubscribeCallOptions - ): Promise { + ): Promise { try { validateCacheName(cacheName); // TODO: validate topic name } catch (err) { - throw normalizeSdkError(err as Error); + return new TopicSubscribe.Error(normalizeSdkError(err as Error)); } this.logger.trace( 'Issuing subscribe request; topic: %s', @@ -172,41 +173,72 @@ export class PubsubClient { ); return await new Promise(resolve => { - this.sendSubscribe(cacheName, topicName, options, 0); - resolve(); + const subscriptionState: SubscriptionState = new SubscriptionState(); + const subscription = new TopicSubscribe.Subscription(subscriptionState); + this.sendSubscribe( + cacheName, + topicName, + options, + subscriptionState, + subscription + ); + resolve(subscription); }); } + /** + * + * + * @private + * @param {string} cacheName + * @param {string} topicName + * @param {SubscribeCallOptions} options + * @param {SubscriptionState} subscriptionState + * @param {TopicSubscribe.Subscription} [subscription] + * @return {*} {void} + * @memberof PubsubClient + * + * @remark This method is responsible for reconnecting the stream if it ends unexpectedly. + * Since we return a single subscription object to the user, we need to update it with the + * unsubscribe function should we restart the stream. This is why we pass the subscription + * state and subscription object to this method. + */ private sendSubscribe( cacheName: string, topicName: string, options: SubscribeCallOptions, - resumeAtTopicSequenceNumber = 0 + subscriptionState: SubscriptionState, + subscription: TopicSubscribe.Subscription ): void { const request = new grpcPubsub._SubscriptionRequest({ cache_name: cacheName, topic: topicName, - resume_at_topic_sequence_number: resumeAtTopicSequenceNumber, + resume_at_topic_sequence_number: + subscriptionState.resumeAtTopicSequenceNumber, }); const call = this.clientWrapper.getClient().Subscribe(request, { interceptors: this.streamingInterceptors, }); + subscriptionState.setSubscribed(); - // The following are the outer handlers for the stream. - // They are responsible for reconnecting the stream if it ends unexpectedly, and for - // building the API facing response objects. - - // The last topic sequence number we received. This is used to resume the stream. - // If resumeAtTopicSequenceNumber is 0, then we reconnect from the beginning again. - // Otherwise we resume starting from the next sequence number. - let lastTopicSequenceNumber = - resumeAtTopicSequenceNumber === 0 ? -1 : resumeAtTopicSequenceNumber; + // Whether the stream was restarted due to an error. If so, we short circuit the end stream handler + // as the error handler will restart the stream. let restartedDueToError = false; + + // Allow the caller to cancel the stream. + // Note that because we restart the stream on error or stream end, + // we need to ensure we keep the same subscription object. That way + // stream restarts are transparent to the caller. + subscriptionState.unsubscribeFn = () => { + call.cancel(); + }; + call .on('data', (resp: grpcPubsub._SubscriptionItem) => { if (resp?.item) { - lastTopicSequenceNumber = resp.item.topic_sequence_number; + subscriptionState.lastTopicSequenceNumber = + resp.item.topic_sequence_number; if (resp.item.value.text) { options.onItem(new TopicSubscribe.Item(resp.item.value.text)); } else if (resp.item.value.binary) { @@ -225,8 +257,13 @@ export class PubsubClient { } }) .on('error', (err: Error) => { + // When the caller unsubscribes, we may get a follow on error, which we ignore. + if (!subscriptionState.isSubscribed) { + return; + } + const serviceError = err as unknown as ServiceError; - // The service cuts the the stream after ~1 minute. Hence we reconnect. + // The service cuts the the stream after a period of time. Hence we reconnect. if ( serviceError.code === Status.INTERNAL && serviceError.details === 'Received RST_STREAM with code 0' @@ -238,7 +275,8 @@ export class PubsubClient { cacheName, topicName, options, - lastTopicSequenceNumber + 1 + subscriptionState, + subscription ); restartedDueToError = true; return; @@ -246,17 +284,26 @@ export class PubsubClient { // Otherwise we propagate the error to the caller. options.onError( - new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError)) + new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError)), + subscription ); }) .on('end', () => { - // The stream could have already been restarted due to an error. + // We want to reconnect on stream end, except if: + // 1. The stream was cancelled by the caller. + // 2. The stream was restarted due to an error. if (restartedDueToError) { this.logger.trace( 'Stream ended after error but was restarted on topic: %s', topicName ); return; + } else if (!subscriptionState.isSubscribed) { + this.logger.trace( + 'Stream ended after unsubscribe on topic: %s', + topicName + ); + return; } this.logger.trace('Stream ended on topic: %s; restarting.', topicName); @@ -264,7 +311,8 @@ export class PubsubClient { cacheName, topicName, options, - lastTopicSequenceNumber + 1 + subscriptionState, + subscription ); }); } diff --git a/src/internal/subscription-state.ts b/src/internal/subscription-state.ts new file mode 100644 index 000000000..56a5c6baf --- /dev/null +++ b/src/internal/subscription-state.ts @@ -0,0 +1,41 @@ +/** + * Encapsulates a topic subscription stream state. + */ +export class SubscriptionState { + private _unsubscribeFn: () => void; + public lastTopicSequenceNumber?: number; + private _isSubscribed: boolean; + constructor() { + this._unsubscribeFn = () => { + return; + }; + this._isSubscribed = false; + } + + public get resumeAtTopicSequenceNumber(): number { + return (this.lastTopicSequenceNumber ?? -1) + 1; + } + + public setSubscribed(): void { + this._isSubscribed = true; + } + + public setUnsubscribed(): void { + this._isSubscribed = false; + } + + public get isSubscribed(): boolean { + return this._isSubscribed; + } + + public set unsubscribeFn(unsubscribeFn: () => void) { + this._unsubscribeFn = unsubscribeFn; + } + + public unsubscribe(): void { + if (this.isSubscribed) { + this._unsubscribeFn(); + this.setUnsubscribed(); + } + } +} diff --git a/src/messages/responses/topic-subscribe.ts b/src/messages/responses/topic-subscribe.ts index 475e036f3..cf4f06033 100644 --- a/src/messages/responses/topic-subscribe.ts +++ b/src/messages/responses/topic-subscribe.ts @@ -2,21 +2,22 @@ import {SdkError} from '../../errors/errors'; import {ResponseBase, ResponseError} from './response-base'; import {truncateString} from '../../internal/utils/display'; +import {SubscriptionState} from '../../internal/subscription-state'; /** * Parent response type for a cache get request. The * response object is resolved to a type-safe object of one of * the following subtypes: * - * - {Hit} - * - {Miss} + * - {Subscription} + * - {Item} * - {Error} * * `instanceof` type guards can be used to operate on the appropriate subtype. * @example * For example: * ``` - * if (response instanceof CacheGet.Error) { + * if (response instanceof TopicSubscribe.Error) { * // Handle error as appropriate. The compiler will smart-cast `response` to type * // `CacheGet.Error` in this block, so you will have access to the properties * // of the Error class; e.g. `response.errorCode()`. @@ -45,6 +46,31 @@ export class Item extends Response { } } +/** + * Encapsulates a topic subscription. + * + * @remarks Currently allows unsubscribing from the topic. + * In the future, this may be extended to include additional + * statistics about the subscription. + */ +export class Subscription extends Response { + private subscriptionState: SubscriptionState; + + constructor(subscriptionState: SubscriptionState) { + super(); + this.subscriptionState = subscriptionState; + } + + /** + * Unsubscribes from the topic. + * + * @returns void + */ + public unsubscribe(): void { + this.subscriptionState.unsubscribe(); + } +} + class _Error extends Response { constructor(protected _innerException: SdkError) { super(); @@ -52,7 +78,7 @@ class _Error extends Response { } /** - * Indicates that an error occurred during the cache get request. + * Indicates that an error occurred during the topic subscribe request. * * This response object includes the following fields that you can use to determine * how you would like to handle the error: diff --git a/src/topic-client.ts b/src/topic-client.ts index 1f8d2104c..2d332c187 100644 --- a/src/topic-client.ts +++ b/src/topic-client.ts @@ -1,5 +1,5 @@ import {PubsubClient} from './internal/pubsub-client'; -import {TopicPublish, MomentoLogger} from '.'; +import {TopicPublish, MomentoLogger, TopicSubscribe} from '.'; import {TopicClientProps} from './topic-client-props'; import {SubscribeCallOptions} from './utils/topic-call-options'; @@ -48,13 +48,15 @@ export class TopicClient { * @param {SubscribeCallOptions} options - The options for the subscription. * @param {function} options.onItem - The callback to invoke when data is received. * @param {function} options.onError - The callback to invoke when an error is received. - * @returns + * @returns {Promise} - + * {@link TopicSubscribe.Subscription} on success. + * {@link TopicSubscribe.Error} on failure. */ public async subscribe( cacheName: string, topicName: string, options: SubscribeCallOptions - ): Promise { + ): Promise { return await this.client.subscribe(cacheName, topicName, options); } } diff --git a/src/utils/topic-call-options.ts b/src/utils/topic-call-options.ts index c8f7849f7..f20d25163 100644 --- a/src/utils/topic-call-options.ts +++ b/src/utils/topic-call-options.ts @@ -15,6 +15,10 @@ export interface SubscribeCallOptions { * The callback to invoke when an error is received from the topic subscription. * * @param error The error received from the topic subscription. + * @param subscription The subscription that received the error. */ - onError(error: TopicSubscribe.Error): void; + onError( + error: TopicSubscribe.Error, + subscription: TopicSubscribe.Subscription + ): void; }