Skip to content

Commit

Permalink
feat(pubsub): add an "unsubscribe" method to close server connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ccoeurderoy committed Feb 14, 2024
1 parent e95d83f commit e5f562f
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 14 deletions.
28 changes: 14 additions & 14 deletions examples/google-pubsub/index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085'
process.env.PUBSUB_PROJECT_ID = 'test'
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085';
process.env.PUBSUB_PROJECT_ID = 'test';

const express = require('express')
const express = require('express');
const timers = require('timers/promises');
const app = express()
const app = express();
const port = 3000;
const Pubsub = require('@algoan/pubsub');
const topicName = 'my_topic';

const pubsubClient = Pubsub.PubSubFactory.create({
options: {
projectId: 'test',
}
},
});
const secondPubsubClient = Pubsub.PubSubFactory.create({
options: {
projectId: 'test',
}
},
});

let pubsubCall = 0;
Expand All @@ -27,13 +27,13 @@ app.get('/', (req, res) => {

app.get('/emit', async (req, res) => {
secondPubsubClient.emit(topicName, {});
await timers.setTimeout(1000)
await timers.setTimeout(1000);
res.redirect('/');
});

app.get('/close', async (req, res) => {
await pubsubClient.client.close();
await timers.setTimeout(1000)
await pubsubClient.unsubscribe(topicName);
await timers.setTimeout(1000);
res.redirect('/');
});

Expand All @@ -42,12 +42,12 @@ app.listen(port, async () => {
options: {
subscriptionOptions: {
name: topicName,
}
},
},
onMessage: () => {
console.log('Received message!')
console.log('Received message!');
pubsubCall++;
}
},
});
console.log(`Example app listening on port ${port}`)
});
console.log(`Example app listening on port ${port}`);
});
17 changes: 17 additions & 0 deletions src/GoogleCloudPubSub/GoogleCloudPubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,23 @@ export class GoogleCloudPubSub implements GCPubSub {
});
}

/**
* Stop listening to a specific subscription. Close the server connection.
* @param event Event name
*/
public async unsubscribe(event: string): Promise<void> {
const subscriptionName: string = this.getSubscriptionName(event);
// Cover a case where there could be a custom subscription name with a prefix.
const cachedSubscription: Subscription | undefined =
this.subscriptions.get(subscriptionName) || this.subscriptions.get(event);

if (cachedSubscription === undefined) {
return;
}

return cachedSubscription.close();
}

/**
* Get or create a topic on Google PubSub
* Also fill the topic map in-memory cache
Expand Down
5 changes: 5 additions & 0 deletions src/PubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export interface PubSub<PSClient, PSSubscription, SubscriptionOptions> {
*/
listen<MessagePayload>(event: string, options?: ListenOptions<MessagePayload, SubscriptionOptions>): Promise<void>;

/**
* Stop listening to a subscription
*/
unsubscribe(event: string): Promise<void>;

/**
* Emit an event
* @param event Event to emit
Expand Down
147 changes: 147 additions & 0 deletions test/GoogleCloudPubSub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,153 @@ test('GPS001c - should properly emit and listen with a prefix', async (t: Execut
});
});

test('GPS001d - should properly emit, but not listen to the subscription', async (t: ExecutionContext): Promise<void> => {
const topicName: string = generateRandomTopicName();
const pubSub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId,
},
});

await new Promise(async (resolve, reject) => {
await pubSub.listen(topicName, {
options: {
autoAck: true,
},
onMessage(): void {
reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`);
},
onError(error) {
reject(error);
},
});

await pubSub.unsubscribe(topicName);

setTimeout(resolve, 2000);

emitAfterDelay(pubSub, topicName);
});

t.pass('Test succeeded, because no message was received');
});

test('GPS001e - should properly emit and listen because wrong topic name to unsubscribe', async (t: ExecutionContext): Promise<void> => {
const topicName: string = generateRandomTopicName();
const pubSub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId,
},
});

await new Promise(async (resolve, reject) => {
await pubSub.listen(topicName, {
options: {
autoAck: true,
},
onMessage(message: EmittedMessage<OnMessage>): void {
const spy: sinon.SinonSpy = sinon.spy(message.getOriginalMessage(), 'ack');
if (isPayloadError(message.payload)) {
return reject('Error in payload');
}

const payload: OnMessage = message.payload;
t.deepEqual(payload, {
hello: 'world',
});
t.falsy(spy.called);
t.truthy(message.id);
t.truthy(message.ackId);
t.truthy(message.emittedAt);
t.truthy(message.receivedAt);
t.is(message.count, 0);
t.truthy(message.duration);
t.pass(`Listen successfully to the topic ${topicName}`);
resolve(true);
},
onError(error) {
reject(error);
},
});

await pubSub.unsubscribe('wrong_subscription_or_topic_name');

emitAfterDelay(pubSub, topicName);
});
});

test('GPS001f - should properly emit, but not listen to the subscription with a prefix', async (t: ExecutionContext): Promise<void> => {
const topicName: string = generateRandomTopicName();
const pubSub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId,
subscriptionsPrefix: 'my-prefix',
},
});

await new Promise(async (resolve, reject) => {
await pubSub.listen(topicName, {
options: {
autoAck: true,
},
onMessage(): void {
reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`);
},
onError(error) {
reject(error);
},
});

await pubSub.unsubscribe(topicName);

setTimeout(resolve, 2000);

emitAfterDelay(pubSub, topicName);
});

t.pass('Test succeeded, because no message was received');
});

test('GPS001g - should properly emit, but not listen to the subscription with a custom name and a prefix', async (t: ExecutionContext): Promise<void> => {
const topicName: string = generateRandomTopicName();
const customSubscriptionName = 'completely-different-name';
const pubSub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId,
subscriptionsPrefix: 'my-prefix',
},
});

await new Promise(async (resolve, reject) => {
await pubSub.listen(topicName, {
options: {
autoAck: true,
subscriptionOptions: {
name: customSubscriptionName,
},
},
onMessage(): void {
reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`);
},
onError(error) {
reject(error);
},
});

await pubSub.unsubscribe(customSubscriptionName);

setTimeout(resolve, 2000);

emitAfterDelay(pubSub, topicName);
});

t.pass('Test succeeded, because no message was received');
});

test('GPS002 - should properly emit but the ack method is never called - no ack', async (t: ExecutionContext): Promise<void> => {
const topicName: string = generateRandomTopicName();
const pubSub: GCPubSub = PubSubFactory.create({
Expand Down

0 comments on commit e5f562f

Please sign in to comment.