diff --git a/examples/google-pubsub/index.js b/examples/google-pubsub/index.js index 804cf10..09ee5a9 100644 --- a/examples/google-pubsub/index.js +++ b/examples/google-pubsub/index.js @@ -1,9 +1,9 @@ -process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085' -process.env.PUBSUB_PROJECT_ID = 'test' +process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085'; +process.env.PUBSUB_PROJECT_ID = 'test'; -const express = require('express') +const express = require('express'); const timers = require('timers/promises'); -const app = express() +const app = express(); const port = 3000; const Pubsub = require('@algoan/pubsub'); const topicName = 'my_topic'; @@ -11,12 +11,12 @@ const topicName = 'my_topic'; const pubsubClient = Pubsub.PubSubFactory.create({ options: { projectId: 'test', - } + }, }); const secondPubsubClient = Pubsub.PubSubFactory.create({ options: { projectId: 'test', - } + }, }); let pubsubCall = 0; @@ -27,13 +27,13 @@ app.get('/', (req, res) => { app.get('/emit', async (req, res) => { secondPubsubClient.emit(topicName, {}); - await timers.setTimeout(1000) + await timers.setTimeout(1000); res.redirect('/'); }); app.get('/close', async (req, res) => { - await pubsubClient.client.close(); - await timers.setTimeout(1000) + await pubsubClient.unsubscribe(topicName); + await timers.setTimeout(1000); res.redirect('/'); }); @@ -42,12 +42,12 @@ app.listen(port, async () => { options: { subscriptionOptions: { name: topicName, - } + }, }, onMessage: () => { - console.log('Received message!') + console.log('Received message!'); pubsubCall++; - } + }, }); - console.log(`Example app listening on port ${port}`) -}); \ No newline at end of file + console.log(`Example app listening on port ${port}`); +}); diff --git a/src/GoogleCloudPubSub/GoogleCloudPubSub.ts b/src/GoogleCloudPubSub/GoogleCloudPubSub.ts index cbd3704..03156e3 100644 --- a/src/GoogleCloudPubSub/GoogleCloudPubSub.ts +++ b/src/GoogleCloudPubSub/GoogleCloudPubSub.ts @@ -199,6 +199,23 @@ export class GoogleCloudPubSub implements GCPubSub { }); } + /** + * Stop listening to a specific subscription. Close the server connection. + * @param event Event name + */ + public async unsubscribe(event: string): Promise { + const subscriptionName: string = this.getSubscriptionName(event); + // Cover a case where there could be a custom subscription name with a prefix. + const cachedSubscription: Subscription | undefined = + this.subscriptions.get(subscriptionName) || this.subscriptions.get(event); + + if (cachedSubscription === undefined) { + return; + } + + return cachedSubscription.close(); + } + /** * Get or create a topic on Google PubSub * Also fill the topic map in-memory cache diff --git a/src/PubSub.ts b/src/PubSub.ts index 3ed9490..d0ec67d 100644 --- a/src/PubSub.ts +++ b/src/PubSub.ts @@ -23,6 +23,11 @@ export interface PubSub { */ listen(event: string, options?: ListenOptions): Promise; + /** + * Stop listening to a subscription + */ + unsubscribe(event: string): Promise; + /** * Emit an event * @param event Event to emit diff --git a/test/GoogleCloudPubSub.test.ts b/test/GoogleCloudPubSub.test.ts index b2e13db..b2d23c9 100644 --- a/test/GoogleCloudPubSub.test.ts +++ b/test/GoogleCloudPubSub.test.ts @@ -102,6 +102,153 @@ test('GPS001c - should properly emit and listen with a prefix', async (t: Execut }); }); +test('GPS001d - should properly emit, but not listen to the subscription', async (t: ExecutionContext): Promise => { + const topicName: string = generateRandomTopicName(); + const pubSub: GCPubSub = PubSubFactory.create({ + transport: Transport.GOOGLE_PUBSUB, + options: { + projectId, + }, + }); + + await new Promise(async (resolve, reject) => { + await pubSub.listen(topicName, { + options: { + autoAck: true, + }, + onMessage(): void { + reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`); + }, + onError(error) { + reject(error); + }, + }); + + await pubSub.unsubscribe(topicName); + + setTimeout(resolve, 2000); + + emitAfterDelay(pubSub, topicName); + }); + + t.pass('Test succeeded, because no message was received'); +}); + +test('GPS001e - should properly emit and listen because wrong topic name to unsubscribe', async (t: ExecutionContext): Promise => { + const topicName: string = generateRandomTopicName(); + const pubSub: GCPubSub = PubSubFactory.create({ + transport: Transport.GOOGLE_PUBSUB, + options: { + projectId, + }, + }); + + await new Promise(async (resolve, reject) => { + await pubSub.listen(topicName, { + options: { + autoAck: true, + }, + onMessage(message: EmittedMessage): void { + const spy: sinon.SinonSpy = sinon.spy(message.getOriginalMessage(), 'ack'); + if (isPayloadError(message.payload)) { + return reject('Error in payload'); + } + + const payload: OnMessage = message.payload; + t.deepEqual(payload, { + hello: 'world', + }); + t.falsy(spy.called); + t.truthy(message.id); + t.truthy(message.ackId); + t.truthy(message.emittedAt); + t.truthy(message.receivedAt); + t.is(message.count, 0); + t.truthy(message.duration); + t.pass(`Listen successfully to the topic ${topicName}`); + resolve(true); + }, + onError(error) { + reject(error); + }, + }); + + await pubSub.unsubscribe('wrong_subscription_or_topic_name'); + + emitAfterDelay(pubSub, topicName); + }); +}); + +test('GPS001f - should properly emit, but not listen to the subscription with a prefix', async (t: ExecutionContext): Promise => { + const topicName: string = generateRandomTopicName(); + const pubSub: GCPubSub = PubSubFactory.create({ + transport: Transport.GOOGLE_PUBSUB, + options: { + projectId, + subscriptionsPrefix: 'my-prefix', + }, + }); + + await new Promise(async (resolve, reject) => { + await pubSub.listen(topicName, { + options: { + autoAck: true, + }, + onMessage(): void { + reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`); + }, + onError(error) { + reject(error); + }, + }); + + await pubSub.unsubscribe(topicName); + + setTimeout(resolve, 2000); + + emitAfterDelay(pubSub, topicName); + }); + + t.pass('Test succeeded, because no message was received'); +}); + +test('GPS001g - should properly emit, but not listen to the subscription with a custom name and a prefix', async (t: ExecutionContext): Promise => { + const topicName: string = generateRandomTopicName(); + const customSubscriptionName = 'completely-different-name'; + const pubSub: GCPubSub = PubSubFactory.create({ + transport: Transport.GOOGLE_PUBSUB, + options: { + projectId, + subscriptionsPrefix: 'my-prefix', + }, + }); + + await new Promise(async (resolve, reject) => { + await pubSub.listen(topicName, { + options: { + autoAck: true, + subscriptionOptions: { + name: customSubscriptionName, + }, + }, + onMessage(): void { + reject(`Should not listen to anything, because unsubscribed from subscription ${topicName}`); + }, + onError(error) { + reject(error); + }, + }); + + await pubSub.unsubscribe(customSubscriptionName); + + setTimeout(resolve, 2000); + + emitAfterDelay(pubSub, topicName); + }); + + t.pass('Test succeeded, because no message was received'); +}); + test('GPS002 - should properly emit but the ack method is never called - no ack', async (t: ExecutionContext): Promise => { const topicName: string = generateRandomTopicName(); const pubSub: GCPubSub = PubSubFactory.create({