Skip to content

Commit

Permalink
feat: add sequence number to TopicItem and add optional onDiscontinui…
Browse files Browse the repository at this point in the history
…ty callback (#1418)

* feat: add sequence number to TopicItem

* feat: add optional onDiscontinuity handler
  • Loading branch information
anitarua authored Aug 26, 2024
1 parent c5cc0f3 commit f3e940b
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 22 deletions.
2 changes: 2 additions & 0 deletions packages/client-sdk-nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import * as CacheSetBatch from '@gomomento/sdk-core/dist/src/messages/responses/
import * as TopicPublish from '@gomomento/sdk-core/dist/src/messages/responses/topic-publish';
import * as TopicSubscribe from '@gomomento/sdk-core/dist/src/messages/responses/topic-subscribe';
import {TopicItem} from '@gomomento/sdk-core/dist/src/messages/responses/topic-item';
import {TopicDiscontinuity} from '@gomomento/sdk-core/dist/src/messages/responses/topic-discontinuity';

// Storage Response Types
import {
Expand Down Expand Up @@ -382,6 +383,7 @@ export {
TopicClientConfiguration,
TopicClient,
TopicClientProps,
TopicDiscontinuity,
TopicItem,
TopicPublish,
TopicSubscribe,
Expand Down
23 changes: 15 additions & 8 deletions packages/client-sdk-nodejs/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {middlewaresInterceptor} from './grpc/middlewares-interceptor';
import {
CredentialProvider,
StaticGrpcConfiguration,
TopicDiscontinuity,
TopicGrpcConfiguration,
TopicItem,
TopicPublish,
Expand Down Expand Up @@ -206,23 +207,23 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
}
options.firstMessage = false;

if (resp?.item) {
options.subscriptionState.lastTopicSequenceNumber =
resp.item.topic_sequence_number;
if (resp.item) {
const sequenceNumber = resp.item.topic_sequence_number;
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
this.getLogger().trace(
'Received an item on subscription stream; topic: %s; sequence number: %s',
truncateString(options.topicName),
resp.item.topic_sequence_number
sequenceNumber
);
if (resp.item.value.text) {
options.onItem(
new TopicItem(resp.item.value.text, {
new TopicItem(resp.item.value.text, sequenceNumber, {
tokenId: resp.item.publisher_id,
})
);
} else if (resp.item.value.binary) {
options.onItem(
new TopicItem(resp.item.value.binary, {
new TopicItem(resp.item.value.binary, sequenceNumber, {
tokenId: resp.item.publisher_id,
})
);
Expand All @@ -238,16 +239,22 @@ export class PubsubClient extends AbstractPubsubClient<ServiceError> {
options.subscription
);
}
} else if (resp?.heartbeat) {
} else if (resp.heartbeat) {
this.getLogger().trace(
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
);
} else if (resp?.discontinuity) {
} else if (resp.discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
);
options.onDiscontinuity(
new TopicDiscontinuity(
resp.discontinuity.last_topic_sequence,
resp.discontinuity.new_topic_sequence
)
);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
Expand Down
41 changes: 29 additions & 12 deletions packages/client-sdk-web/src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import * as pubsub from '@gomomento/generated-types-webtext/dist/CachepubsubServiceClientPb';
import * as cachepubsub_pb from '@gomomento/generated-types-webtext/dist/cachepubsub_pb';
import {CredentialProvider, TopicItem, UnknownError} from '@gomomento/sdk-core';
import {
CredentialProvider,
TopicDiscontinuity,
TopicItem,
UnknownError,
} from '@gomomento/sdk-core';
import {Request, RpcError, StatusCode, UnaryResponse} from 'grpc-web';
import {truncateString} from '@gomomento/sdk-core/dist/src/internal/utils';
import {TopicPublish, TopicSubscribe} from '../index';
Expand Down Expand Up @@ -168,17 +173,23 @@ export class PubsubClient<
}
options.firstMessage = false;

if (resp?.getItem()) {
options.subscriptionState.lastTopicSequenceNumber = resp
.getItem()
?.getTopicSequenceNumber();
const itemText = resp.getItem()?.getValue()?.getText();
const publisherId = resp.getItem()?.getPublisherId();
const itemBinary = resp.getItem()?.getValue()?.getBinary();
const item = resp.getItem();
const discontinuity = resp.getDiscontinuity();

if (item) {
const sequenceNumber = item.getTopicSequenceNumber();
options.subscriptionState.lastTopicSequenceNumber = sequenceNumber;
const publisherId = item.getPublisherId();
const itemText = item.getValue()?.getText();
const itemBinary = item.getValue()?.getBinary();
if (itemText) {
options.onItem(new TopicItem(itemText, {tokenId: publisherId}));
options.onItem(
new TopicItem(itemText, sequenceNumber, {tokenId: publisherId})
);
} else if (itemBinary) {
options.onItem(new TopicItem(itemBinary, {tokenId: publisherId}));
options.onItem(
new TopicItem(itemBinary, sequenceNumber, {tokenId: publisherId})
);
} else {
this.getLogger().error(
'Received subscription item with unknown type; topic: %s',
Expand All @@ -191,16 +202,22 @@ export class PubsubClient<
options.subscription
);
}
} else if (resp?.getHeartbeat()) {
} else if (resp.getHeartbeat()) {
this.getLogger().trace(
'Received heartbeat from subscription stream; topic: %s',
truncateString(options.topicName)
);
} else if (resp?.getDiscontinuity()) {
} else if (discontinuity) {
this.getLogger().trace(
'Received discontinuity from subscription stream; topic: %s',
truncateString(options.topicName)
);
options.onDiscontinuity(
new TopicDiscontinuity(
discontinuity.getLastTopicSequence(),
discontinuity.getNewTopicSequence()
)
);
} else {
this.getLogger().error(
'Received unknown subscription item; topic: %s',
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import * as CacheGetBatch from './messages/responses/cache-batch-get';
import * as TopicPublish from './messages/responses/topic-publish';
import * as TopicSubscribe from './messages/responses/topic-subscribe';
import {TopicItem} from './messages/responses/topic-item';
import {TopicDiscontinuity} from './messages/responses/topic-discontinuity';

// AuthClient Response Types
import * as GenerateApiKey from './messages/responses/generate-api-key';
Expand Down Expand Up @@ -291,6 +292,7 @@ export {
TopicPublish,
TopicSubscribe,
TopicItem,
TopicDiscontinuity,
SubscribeCallOptions,
// AuthClient Response Types
GenerateApiKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
TopicSubscribe,
SubscribeCallOptions,
MomentoLoggerFactory,
TopicDiscontinuity,
} from '../../../index';
import {SubscriptionState} from '../../subscription-state';
import {IPubsubClient} from './IPubsubClient';
Expand All @@ -28,6 +29,7 @@ export interface SendSubscribeOptions {
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
) => void;
onDiscontinuity: (discontinuity: TopicDiscontinuity) => void;
subscriptionState: SubscriptionState;
subscription: TopicSubscribe.Subscription;

Expand Down Expand Up @@ -138,6 +140,11 @@ export abstract class AbstractPubsubClient<TGrpcError>
(() => {
return;
});
const onDiscontinuity =
options.onDiscontinuity ??
(() => {
return;
});

const subscriptionState = new SubscriptionState();
const subscription = new TopicSubscribe.Subscription(
Expand All @@ -149,6 +156,7 @@ export abstract class AbstractPubsubClient<TGrpcError>
topicName: topicName,
onItem: onItem,
onError: onError,
onDiscontinuity: onDiscontinuity,
subscriptionState: subscriptionState,
subscription: subscription,
restartedDueToError: false,
Expand Down
35 changes: 35 additions & 0 deletions packages/core/src/messages/responses/topic-discontinuity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Represents a discontinuity in a topic subscription.
*
* @remarks A subscription is created by calling {@link TopicClient.subscribe}.
*/
export class TopicDiscontinuity {
private readonly _lastSequenceNumber: number;
private readonly _newSequenceNumber: number;

constructor(_lastSequenceNumber: number, _newSequenceNumber: number) {
this._lastSequenceNumber = _lastSequenceNumber;
this._newSequenceNumber = _newSequenceNumber;
}

/**
* Returns the last sequence number before the discontinuity.
* @returns number
*/
public lastSequenceNumber(): number {
return this._lastSequenceNumber;
}

/**
* Returns the new sequence number after the discontinuity.
* @returns number
*/
public newSequenceNumber(): number {
return this._newSequenceNumber;
}

public toString(): string {
const displayValue = `Last Sequence Number: ${this._lastSequenceNumber}; New Sequence Number: ${this._newSequenceNumber}`;
return `${this.constructor.name}: ${displayValue}`;
}
}
16 changes: 15 additions & 1 deletion packages/core/src/messages/responses/topic-item.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ export interface TopicItemOptions {
export class TopicItem {
private readonly _value: string | Uint8Array;
private readonly _tokenId?: string;
private readonly _sequenceNumber: number;

constructor(_value: string | Uint8Array, options?: TopicItemOptions) {
constructor(
_value: string | Uint8Array,
_sequenceNumber: number,
options?: TopicItemOptions
) {
this._value = _value;
this._tokenId = options?.tokenId;
this._sequenceNumber = _sequenceNumber;
}

/**
Expand Down Expand Up @@ -52,6 +58,14 @@ export class TopicItem {
return this._tokenId;
}

/**
* Returns the sequence number of the item.
* @returns number
*/
public sequenceNumber(): number {
return this._sequenceNumber;
}

public toString(): string {
const displayValue = truncateString(this.value().toString());
let displayString = `${this.constructor.name}: ${displayValue}`;
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/utils/topic-call-options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {TopicItem, TopicSubscribe} from '..';
import {TopicItem, TopicSubscribe, TopicDiscontinuity} from '..';

/**
* Options for the subscribe call.
Expand All @@ -21,4 +21,11 @@ export interface SubscribeCallOptions {
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
) => void;

/**
* The callback to invoke when a discontinuity is received from the topic subscription.
*
* @param item The discontinuity received from the topic subscription.
*/
onDiscontinuity?: (discontinuity: TopicDiscontinuity) => void;
}

0 comments on commit f3e940b

Please sign in to comment.