Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pubsub UX minor fixes and refactor #360

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import * as CacheSortedSetRemoveElement from './messages/responses/cache-sorted-
import * as CacheSortedSetRemoveElements from './messages/responses/cache-sorted-set-remove-elements';
import * as TopicPublish from './messages/responses/topic-publish';
import * as TopicSubscribe from './messages/responses/topic-subscribe';
import {TopicItem} from './messages/responses/topic-item';

import {CacheInfo} from './messages/cache-info';
import {CollectionTtl} from './utils/collection-ttl';
Expand Down Expand Up @@ -147,6 +148,7 @@ export {
CacheSortedSetRemoveElement,
CacheSortedSetRemoveElements,
TopicClient,
TopicItem,
TopicPublish,
TopicSubscribe,
MomentoErrorCode,
Expand Down
7 changes: 4 additions & 3 deletions src/internal/pubsub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {cacheServiceErrorMapper} from '../errors/cache-service-error-mapper';
import {ChannelCredentials, Interceptor, ServiceError} from '@grpc/grpc-js';
import {Status} from '@grpc/grpc-js/build/src/constants';
import {
TopicItem,
TopicPublish,
TopicSubscribe,
Configuration,
Expand All @@ -33,7 +34,7 @@ import {SubscriptionState} from './subscription-state';
interface SendSubscribeOptions {
cacheName: string;
topicName: string;
onItem: (data: TopicSubscribe.Item) => void;
onItem: (item: TopicItem) => void;
onError: (
error: TopicSubscribe.Error,
subscription: TopicSubscribe.Subscription
Expand Down Expand Up @@ -297,9 +298,9 @@ export class PubsubClient {
options.subscriptionState.lastTopicSequenceNumber =
resp.item.topic_sequence_number;
if (resp.item.value.text) {
options.onItem(new TopicSubscribe.Item(resp.item.value.text));
options.onItem(new TopicItem(resp.item.value.text));
} else if (resp.item.value.binary) {
options.onItem(new TopicSubscribe.Item(resp.item.value.binary));
options.onItem(new TopicItem(resp.item.value.binary));
} else {
this.logger.error(
'Received subscription item with unknown type; topic: %s',
Expand Down
43 changes: 43 additions & 0 deletions src/messages/responses/topic-item.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {truncateString} from '../../internal/utils/display';

/**
* Represents the data received from a topic subscription.
*
* @remarks A subscription is created by calling {@link TopicClient.subscribe}.
* The value is guaranteed to be either a {@link string} or a {@link Uint8Array}.
* Call the appropriate accessor if you know the type of the value.
*/
export class TopicItem {
malandis marked this conversation as resolved.
Show resolved Hide resolved
private readonly _value: string | Uint8Array;
constructor(_value: string | Uint8Array) {
this._value = _value;
}
/**
* Returns the data read from the stream.
* @returns string | Uint8Array
*/
public value(): string | Uint8Array {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to submit an issue on this repo to start adding a json() option for values that automatically parses the value and returns a json object. It's a nice to have feature I've seen in other places like Postman that stands out as great DX

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds interesting. Can you share a link?

return this._value;
}

/**
* Returns the data read from the stream as a {@link string}.
* @returns string
*/
public valueString(): string {
return this.value().toString();
}

/**
* Returns the data read from the stream as a {@link Uint8Array}.
* @returns Uint8Array
*/
public valueUint8Array(): Uint8Array {
return this.value() as Uint8Array;
}

public toString(): string {
const display = truncateString(this.value().toString());
return `${this.constructor.name}: ${display}`;
}
}
37 changes: 0 additions & 37 deletions src/messages/responses/topic-subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// older versions of node don't have the global util variables https://github.com/nodejs/node/issues/20365
import {SdkError} from '../../errors/errors';
import {ResponseBase, ResponseError} from './response-base';
import {truncateString} from '../../internal/utils/display';
import {SubscriptionState} from '../../internal/subscription-state';

/**
Expand All @@ -26,42 +25,6 @@ import {SubscriptionState} from '../../internal/subscription-state';
*/
export abstract class Response extends ResponseBase {}

export class Item extends Response {
private readonly _value: string | Uint8Array;
constructor(_value: string | Uint8Array) {
super();
this._value = _value;
}
/**
* Returns the data read from the stream.
* @returns string | Uint8Array
*/
public value(): string | Uint8Array {
return this._value;
}

/**
* Returns the data read from the stream as a string.
* @returns string
*/
public valueString(): string {
return this.value().toString();
}

/**
* Returns the data read from the stream as a Uint8Array.
* @returns Uint8Array
*/
public valueUint8Array(): Uint8Array {
return this.value() as Uint8Array;
}

public override toString(): string {
const display = truncateString(this.value().toString());
return `${super.toString()}: ${display}`;
}
}

/**
* Encapsulates a topic subscription.
*
Expand Down
2 changes: 1 addition & 1 deletion src/topic-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class TopicClient {
public async subscribe(
cacheName: string,
topicName: string,
options: SubscribeCallOptions = {}
options: SubscribeCallOptions
): Promise<TopicSubscribe.Response> {
return await this.client.subscribe(cacheName, topicName, options);
}
Expand Down
6 changes: 3 additions & 3 deletions src/utils/topic-call-options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {TopicSubscribe} from '..';
import {TopicItem, TopicSubscribe} from '..';

/**
* Options for the subscribe call.
Expand All @@ -7,9 +7,9 @@ export interface SubscribeCallOptions {
/**
* The callback to invoke when data is received from the topic subscription.
*
* @param data The data received from the topic subscription.
* @param item The data received from the topic subscription.
*/
onItem?: (data: TopicSubscribe.Item) => void;
onItem?: (item: TopicItem) => void;

/**
* The callback to invoke when an error is received from the topic subscription.
Expand Down
20 changes: 15 additions & 5 deletions test/integration/topic-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
DeleteCache,
TopicClient,
MomentoErrorCode,
TopicItem,
TopicPublish,
TopicSubscribe,
InvalidArgumentError,
Expand Down Expand Up @@ -117,7 +118,11 @@ describe('#subscribe', () => {
});

it('should error when subscribing to a cache that does not exist and unsubscribe from the error handler', async () => {
const response = await topicClient.subscribe(v4(), 'topic');
const response = await topicClient.subscribe(
v4(),
'topic',
trivialHandlers
);
expect((response as IResponseError).errorCode()).toEqual(
MomentoErrorCode.NOT_FOUND_ERROR
);
Expand All @@ -139,7 +144,7 @@ describe('subscribe and publish', () => {
IntegrationTestCacheName,
topicName,
{
onItem: (item: TopicSubscribe.Item) => {
onItem: (item: TopicItem) => {
receivedValues.push(item.value());
},
onError: (error: TopicSubscribe.Error) => {
Expand Down Expand Up @@ -185,7 +190,7 @@ describe('subscribe and publish', () => {
IntegrationTestCacheName,
topicName,
{
onItem: (item: TopicSubscribe.Item) => {
onItem: (item: TopicItem) => {
receivedValues.push(item.value());
},
onError: (error: TopicSubscribe.Error) => {
Expand Down Expand Up @@ -226,7 +231,12 @@ describe('subscribe and publish', () => {

const subscribeResponse = await topicClient.subscribe(
IntegrationTestCacheName,
topicName
topicName,
{
onItem: () => {
return;
},
}
);
expect(subscribeResponse).toBeInstanceOf(TopicSubscribe.Subscription);
(subscribeResponse as TopicSubscribe.Subscription).unsubscribe();
Expand All @@ -244,7 +254,7 @@ describe('subscribe and publish', () => {
randomCacheName,
v4(),
{
onItem: (item: TopicSubscribe.Item) => {
onItem: (item: TopicItem) => {
expect(1).fail(
`Should not receive an item but got one: ${item.toString()}`
);
Expand Down