Skip to content

Commit

Permalink
feat: add optional onHeartbeat topics subscription callback (#1426)
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua authored Aug 28, 2024
1 parent 7f08a90 commit 85868c7
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 2 deletions.
2 changes: 2 additions & 0 deletions packages/client-sdk-nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/t
import * as TopicSubscribe from '@gomomento/sdk-core/dist/src/messages/responses/topic-subscribe';
import {TopicItem} from '@gomomento/sdk-core/dist/src/messages/responses/topic-item';
import {TopicDiscontinuity} from '@gomomento/sdk-core/dist/src/messages/responses/topic-discontinuity';
import {TopicHeartbeat} from '@gomomento/sdk-core/dist/src/messages/responses/topic-heartbeat';

// Storage Response Types
import {
Expand Down Expand Up @@ -394,6 +395,7 @@ export {
TopicClient,
TopicClientProps,
TopicDiscontinuity,
TopicHeartbeat,
TopicItem,
TopicPublish,
TopicSubscribe,
Expand Down
2 changes: 2 additions & 0 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
CredentialProvider,
StaticGrpcConfiguration,
TopicDiscontinuity,
TopicHeartbeat,
TopicGrpcConfiguration,
TopicItem,
TopicPublish,
Expand Down Expand Up @@ -244,6 +245,7 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
);
options.onHeartbeat(new TopicHeartbeat());
} else if (resp.discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
Expand Down
2 changes: 2 additions & 0 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as cachepubsub_pb from '@gomomento/generated-types-webtext/dist/cachepu
import {
CredentialProvider,
TopicDiscontinuity,
TopicHeartbeat,
TopicItem,
UnknownError,
} from '@gomomento/sdk-core';
Expand Down Expand Up @@ -207,6 +208,7 @@ export class PubsubClient<
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
);
options.onHeartbeat(new TopicHeartbeat());
} else if (discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import * as TopicPublish from './messages/responses/topic-publish';
import * as TopicSubscribe from './messages/responses/topic-subscribe';
import {TopicItem} from './messages/responses/topic-item';
import {TopicDiscontinuity} from './messages/responses/topic-discontinuity';
import {TopicHeartbeat} from './messages/responses/topic-heartbeat';

// AuthClient Response Types
import * as GenerateApiKey from './messages/responses/generate-api-key';
Expand Down Expand Up @@ -293,6 +294,7 @@ export {
TopicSubscribe,
TopicItem,
TopicDiscontinuity,
TopicHeartbeat,
SubscribeCallOptions,
// AuthClient Response Types
GenerateApiKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
SubscribeCallOptions,
MomentoLoggerFactory,
TopicDiscontinuity,
TopicHeartbeat,
} from '../../../index';
import {SubscriptionState} from '../../subscription-state';
import {IPubsubClient} from './IPubsubClient';
Expand All @@ -30,6 +31,7 @@ export interface SendSubscribeOptions {
subscription: TopicSubscribe.Subscription
) => void;
onDiscontinuity: (discontinuity: TopicDiscontinuity) => void;
onHeartbeat: (heartbeat: TopicHeartbeat) => void;
subscriptionState: SubscriptionState;
subscription: TopicSubscribe.Subscription;

Expand Down Expand Up @@ -145,6 +147,11 @@ export abstract class AbstractPubsubClient<TGrpcError>
(() => {
return;
});
const onHeartbeat =
options.onHeartbeat ??
(() => {
return;
});

const subscriptionState = new SubscriptionState();
const subscription = new TopicSubscribe.Subscription(
Expand All @@ -157,6 +164,7 @@ export abstract class AbstractPubsubClient<TGrpcError>
onItem: onItem,
onError: onError,
onDiscontinuity: onDiscontinuity,
onHeartbeat: onHeartbeat,
subscriptionState: subscriptionState,
subscription: subscription,
restartedDueToError: false,
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/messages/responses/topic-heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* Represents a heartbeat received from a topic subscription indicating the connection is alive.
*
* @remarks A subscription is created by calling {@link TopicClient.subscribe}.
*/
export class TopicHeartbeat {
public toString(): string {
return `${this.constructor.name}`;
}
}
16 changes: 14 additions & 2 deletions packages/core/src/utils/topic-call-options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import {TopicItem, TopicSubscribe, TopicDiscontinuity} from '..';
import {
TopicItem,
TopicSubscribe,
TopicDiscontinuity,
TopicHeartbeat,
} from '..';

/**
* Options for the subscribe call.
Expand All @@ -25,7 +30,14 @@ export interface SubscribeCallOptions {
/**
* The callback to invoke when a discontinuity is received from the topic subscription.
*
* @param item The discontinuity received from the topic subscription.
* @param discontinuity The discontinuity received from the topic subscription.
*/
onDiscontinuity?: (discontinuity: TopicDiscontinuity) => void;

/**
* The callback to invoke when a heartbeat is received from the topic subscription.
*
* @param heartbeat The heartbeat received from the topic subscription.
*/
onHeartbeat?: (heartbeat: TopicHeartbeat) => void;
}

0 comments on commit 85868c7

Please sign in to comment.