Skip to content

Commit

Permalink
chore: add default handlers for subscribe (#357)
Browse files Browse the repository at this point in the history
Adds default handlers for TopicClient.subscribe.
  • Loading branch information
malandis authored Mar 21, 2023
1 parent 20361f4 commit 81ba86a
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
36 changes: 27 additions & 9 deletions src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,25 @@ export class PubsubClient {
truncateString(topicName)
);

const onItem =
options.onItem ??
(() => {
return;
});
const onError =
options.onError ??
(() => {
return;
});

return await new Promise(resolve => {
const subscriptionState = new SubscriptionState();
const subscription = new TopicSubscribe.Subscription(subscriptionState);
this.sendSubscribe(
cacheName,
topicName,
options,
onItem,
onError,
subscriptionState,
subscription
);
Expand Down Expand Up @@ -210,7 +222,11 @@ export class PubsubClient {
private sendSubscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions,
onItem: (item: TopicSubscribe.Item) => void,
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
) => void,
subscriptionState: SubscriptionState,
subscription: TopicSubscribe.Subscription
): void {
Expand Down Expand Up @@ -244,15 +260,15 @@ export class PubsubClient {
subscriptionState.lastTopicSequenceNumber =
resp.item.topic_sequence_number;
if (resp.item.value.text) {
options.onItem(new TopicSubscribe.Item(resp.item.value.text));
onItem(new TopicSubscribe.Item(resp.item.value.text));
} else if (resp.item.value.binary) {
options.onItem(new TopicSubscribe.Item(resp.item.value.binary));
onItem(new TopicSubscribe.Item(resp.item.value.binary));
} else {
this.logger.error(
'Received subscription item with unknown type; topic: %s',
truncateString(topicName)
);
options.onError(
onError(
new TopicSubscribe.Error(
new UnknownError('Unknown item value type')
),
Expand All @@ -274,7 +290,7 @@ export class PubsubClient {
'Received unknown subscription item; topic: %s',
truncateString(topicName)
);
options.onError(
onError(
new TopicSubscribe.Error(new UnknownError('Unknown item type')),
subscription
);
Expand All @@ -298,7 +314,8 @@ export class PubsubClient {
this.sendSubscribe(
cacheName,
topicName,
options,
onItem,
onError,
subscriptionState,
subscription
);
Expand All @@ -307,7 +324,7 @@ export class PubsubClient {
}

// Otherwise we propagate the error to the caller.
options.onError(
onError(
new TopicSubscribe.Error(cacheServiceErrorMapper(serviceError)),
subscription
);
Expand All @@ -334,7 +351,8 @@ export class PubsubClient {
this.sendSubscribe(
cacheName,
topicName,
options,
onItem,
onError,
subscriptionState,
subscription
);
Expand Down
6 changes: 3 additions & 3 deletions src/topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ export class TopicClient {
*
* @param {string} cacheName - The name of the cache to containing the topic to subscribe to.
* @param {string} topicName - The name of the topic to subscribe to.
* @param {SubscribeCallOptions} options - The options for the subscription.
* @param {function} options.onItem - The callback to invoke when data is received.
* @param {function} options.onError - The callback to invoke when an error is received.
* @param {SubscribeCallOptions} options - The options for the subscription. Defaults to no-op handlers.
* @param {function} options.onItem - The callback to invoke when data is received. Defaults to no-op.
* @param {function} options.onError - The callback to invoke when an error is received. Defaults to no-op.
* @returns {Promise<TopicSubscribe.Response>} -
* {@link TopicSubscribe.Subscription} on success.
* {@link TopicSubscribe.Error} on failure.
Expand Down
6 changes: 3 additions & 3 deletions src/utils/topic-call-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ export interface SubscribeCallOptions {
*
* @param data The data received from the topic subscription.
*/
onItem(data: TopicSubscribe.Item): void;
onItem?: (data: TopicSubscribe.Item) => void;

/**
* The callback to invoke when an error is received from the topic subscription.
*
* @param error The error received from the topic subscription.
* @param subscription The subscription that received the error.
*/
onError(
onError?: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
): void;
) => void;
}

0 comments on commit 81ba86a

Please sign in to comment.