From 85868c7bac97c50c4db66ad59394f646e5985d31 Mon Sep 17 00:00:00 2001 From: Anita Ruangrotsakun <138700973+anitarua@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:33:22 -0700 Subject: [PATCH] feat: add optional onHeartbeat topics subscription callback (#1426) --- packages/client-sdk-nodejs/src/index.ts | 2 ++ .../src/internal/pubsub-client.ts | 2 ++ .../client-sdk-web/src/internal/pubsub-client.ts | 2 ++ packages/core/src/index.ts | 2 ++ .../clients/pubsub/AbstractPubsubClient.ts | 8 ++++++++ .../src/messages/responses/topic-heartbeat.ts | 10 ++++++++++ packages/core/src/utils/topic-call-options.ts | 16 ++++++++++++++-- 7 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 packages/core/src/messages/responses/topic-heartbeat.ts diff --git a/packages/client-sdk-nodejs/src/index.ts b/packages/client-sdk-nodejs/src/index.ts index ae5415120..9ffdd1aaf 100644 --- a/packages/client-sdk-nodejs/src/index.ts +++ b/packages/client-sdk-nodejs/src/index.ts @@ -80,6 +80,7 @@ import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/t import * as TopicSubscribe from '@gomomento/sdk-core/dist/src/messages/responses/topic-subscribe'; import {TopicItem} from '@gomomento/sdk-core/dist/src/messages/responses/topic-item'; import {TopicDiscontinuity} from '@gomomento/sdk-core/dist/src/messages/responses/topic-discontinuity'; +import {TopicHeartbeat} from '@gomomento/sdk-core/dist/src/messages/responses/topic-heartbeat'; // Storage Response Types import { @@ -394,6 +395,7 @@ export { TopicClient, TopicClientProps, TopicDiscontinuity, + TopicHeartbeat, TopicItem, TopicPublish, TopicSubscribe, diff --git a/packages/client-sdk-nodejs/src/internal/pubsub-client.ts b/packages/client-sdk-nodejs/src/internal/pubsub-client.ts index e989a8545..97418bcaf 100644 --- a/packages/client-sdk-nodejs/src/internal/pubsub-client.ts +++ b/packages/client-sdk-nodejs/src/internal/pubsub-client.ts @@ -10,6 +10,7 @@ import { CredentialProvider, StaticGrpcConfiguration, TopicDiscontinuity, + TopicHeartbeat, TopicGrpcConfiguration, TopicItem, TopicPublish, @@ -244,6 +245,7 @@ export class PubsubClient extends AbstractPubsubClient { 'Received heartbeat from subscription stream; topic: %s', truncateString(options.topicName) ); + options.onHeartbeat(new TopicHeartbeat()); } else if (resp.discontinuity) { this.getLogger().trace( 'Received discontinuity from subscription stream; topic: %s', diff --git a/packages/client-sdk-web/src/internal/pubsub-client.ts b/packages/client-sdk-web/src/internal/pubsub-client.ts index 771924e54..61e52fdc8 100644 --- a/packages/client-sdk-web/src/internal/pubsub-client.ts +++ b/packages/client-sdk-web/src/internal/pubsub-client.ts @@ -3,6 +3,7 @@ import * as cachepubsub_pb from '@gomomento/generated-types-webtext/dist/cachepu import { CredentialProvider, TopicDiscontinuity, + TopicHeartbeat, TopicItem, UnknownError, } from '@gomomento/sdk-core'; @@ -207,6 +208,7 @@ export class PubsubClient< 'Received heartbeat from subscription stream; topic: %s', truncateString(options.topicName) ); + options.onHeartbeat(new TopicHeartbeat()); } else if (discontinuity) { this.getLogger().trace( 'Received discontinuity from subscription stream; topic: %s', diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c11ef88f2..ee654d975 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -69,6 +69,7 @@ import * as TopicPublish from './messages/responses/topic-publish'; import * as TopicSubscribe from './messages/responses/topic-subscribe'; import {TopicItem} from './messages/responses/topic-item'; import {TopicDiscontinuity} from './messages/responses/topic-discontinuity'; +import {TopicHeartbeat} from './messages/responses/topic-heartbeat'; // AuthClient Response Types import * as GenerateApiKey from './messages/responses/generate-api-key'; @@ -293,6 +294,7 @@ export { TopicSubscribe, TopicItem, TopicDiscontinuity, + TopicHeartbeat, SubscribeCallOptions, // AuthClient Response Types GenerateApiKey, diff --git a/packages/core/src/internal/clients/pubsub/AbstractPubsubClient.ts b/packages/core/src/internal/clients/pubsub/AbstractPubsubClient.ts index 338cc961d..5b6717b8d 100644 --- a/packages/core/src/internal/clients/pubsub/AbstractPubsubClient.ts +++ b/packages/core/src/internal/clients/pubsub/AbstractPubsubClient.ts @@ -13,6 +13,7 @@ import { SubscribeCallOptions, MomentoLoggerFactory, TopicDiscontinuity, + TopicHeartbeat, } from '../../../index'; import {SubscriptionState} from '../../subscription-state'; import {IPubsubClient} from './IPubsubClient'; @@ -30,6 +31,7 @@ export interface SendSubscribeOptions { subscription: TopicSubscribe.Subscription ) => void; onDiscontinuity: (discontinuity: TopicDiscontinuity) => void; + onHeartbeat: (heartbeat: TopicHeartbeat) => void; subscriptionState: SubscriptionState; subscription: TopicSubscribe.Subscription; @@ -145,6 +147,11 @@ export abstract class AbstractPubsubClient (() => { return; }); + const onHeartbeat = + options.onHeartbeat ?? + (() => { + return; + }); const subscriptionState = new SubscriptionState(); const subscription = new TopicSubscribe.Subscription( @@ -157,6 +164,7 @@ export abstract class AbstractPubsubClient onItem: onItem, onError: onError, onDiscontinuity: onDiscontinuity, + onHeartbeat: onHeartbeat, subscriptionState: subscriptionState, subscription: subscription, restartedDueToError: false, diff --git a/packages/core/src/messages/responses/topic-heartbeat.ts b/packages/core/src/messages/responses/topic-heartbeat.ts new file mode 100644 index 000000000..ab327fe92 --- /dev/null +++ b/packages/core/src/messages/responses/topic-heartbeat.ts @@ -0,0 +1,10 @@ +/** + * Represents a heartbeat received from a topic subscription indicating the connection is alive. + * + * @remarks A subscription is created by calling {@link TopicClient.subscribe}. + */ +export class TopicHeartbeat { + public toString(): string { + return `${this.constructor.name}`; + } +} diff --git a/packages/core/src/utils/topic-call-options.ts b/packages/core/src/utils/topic-call-options.ts index 6898fc5f8..31d2de6bb 100644 --- a/packages/core/src/utils/topic-call-options.ts +++ b/packages/core/src/utils/topic-call-options.ts @@ -1,4 +1,9 @@ -import {TopicItem, TopicSubscribe, TopicDiscontinuity} from '..'; +import { + TopicItem, + TopicSubscribe, + TopicDiscontinuity, + TopicHeartbeat, +} from '..'; /** * Options for the subscribe call. @@ -25,7 +30,14 @@ export interface SubscribeCallOptions { /** * The callback to invoke when a discontinuity is received from the topic subscription. * - * @param item The discontinuity received from the topic subscription. + * @param discontinuity The discontinuity received from the topic subscription. */ onDiscontinuity?: (discontinuity: TopicDiscontinuity) => void; + + /** + * The callback to invoke when a heartbeat is received from the topic subscription. + * + * @param heartbeat The heartbeat received from the topic subscription. + */ + onHeartbeat?: (heartbeat: TopicHeartbeat) => void; }