From b36dc6e8f6513cbe17c54166cad6b9208115276f Mon Sep 17 00:00:00 2001 From: FPierre Date: Thu, 1 Jun 2023 15:10:38 +0200 Subject: [PATCH 1/4] feat: scope topics and subscriptions under key if exists --- lib/gc-pubsub.client.spec.ts | 21 +++++++++++++++++++++ lib/gc-pubsub.client.ts | 15 +++++++++++++++ lib/gc-pubsub.interface.ts | 1 + lib/gc-pubsub.server.spec.ts | 33 +++++++++++++++++++++++++++++++++ lib/gc-pubsub.server.ts | 15 ++++++++++++++- 5 files changed, 84 insertions(+), 1 deletion(-) diff --git a/lib/gc-pubsub.client.spec.ts b/lib/gc-pubsub.client.spec.ts index 5c9aefe..8b8c70e 100644 --- a/lib/gc-pubsub.client.spec.ts +++ b/lib/gc-pubsub.client.spec.ts @@ -20,6 +20,27 @@ describe('GCPubSubClient', () => { sandbox.restore(); }); + describe('constructor', () => { + describe('when the scopedEnvKey is defined', () => { + beforeEach(() => { + client = getInstance({ + topic: 'topic', + replyTopic: 'replyTopic', + replySubscription: 'replySubscription', + scopedEnvKey: 'my-key', + }); + }); + + it('should set the scopedEnvKey on topics and subscriptions', () => { + expect(client['topicName']).to.be.eq('my-keytopic'); + expect(client['replyTopicName']).to.be.eq('my-keyreplyTopic'); + expect(client['replySubscriptionName']).to.be.eq( + 'my-keyreplySubscription', + ); + }); + }); + }); + describe('connect', () => { describe('when is not connected', () => { describe('when check existence is true', () => { diff --git a/lib/gc-pubsub.client.ts b/lib/gc-pubsub.client.ts index 3482d5b..826d19a 100644 --- a/lib/gc-pubsub.client.ts +++ b/lib/gc-pubsub.client.ts @@ -46,6 +46,7 @@ export class GCPubSubClient extends ClientProxy { protected replySubscription: Subscription | null = null; protected topic: Topic | null = null; protected init: boolean; + protected readonly scopedEnvKey: string | null; protected readonly checkExistence: boolean; constructor(protected readonly options: GCPubSubOptions) { @@ -53,8 +54,14 @@ export class GCPubSubClient extends ClientProxy { this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG; + this.scopedEnvKey = this.options.scopedEnvKey ?? null; + this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC; + if (this.scopedEnvKey) { + this.topicName = `${this.scopedEnvKey}${this.topicName}`; + } + this.subscriberConfig = this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG; @@ -63,8 +70,16 @@ export class GCPubSubClient extends ClientProxy { this.replyTopicName = this.options.replyTopic; + if (this.scopedEnvKey) { + this.replyTopicName = `${this.scopedEnvKey}${this.replyTopicName}`; + } + this.replySubscriptionName = this.options.replySubscription; + if (this.scopedEnvKey) { + this.replySubscriptionName = `${this.scopedEnvKey}${this.replySubscriptionName}`; + } + this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK; this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT; this.useAttributes = diff --git a/lib/gc-pubsub.interface.ts b/lib/gc-pubsub.interface.ts index b5d4caf..f8a7664 100644 --- a/lib/gc-pubsub.interface.ts +++ b/lib/gc-pubsub.interface.ts @@ -13,6 +13,7 @@ export interface GCPubSubOptions { init?: boolean; useAttributes?: boolean; checkExistence?: boolean; + scopedEnvKey?: string | null; publisher?: PublishOptions; subscriber?: SubscriberOptions; serializer?: Serializer; diff --git a/lib/gc-pubsub.server.spec.ts b/lib/gc-pubsub.server.spec.ts index 694faa3..dcf8659 100644 --- a/lib/gc-pubsub.server.spec.ts +++ b/lib/gc-pubsub.server.spec.ts @@ -20,6 +20,21 @@ describe('GCPubSubServer', () => { sandbox.restore(); }); + describe('constructor', () => { + describe('when the scopedEnvKey is defined', () => { + it('should set the scopedEnvKey on topics and subscriptions', () => { + const scopedEnvKey = 'my-key'; + + server = getInstance({ scopedEnvKey } as GCPubSubOptions); + + expect(server['topicName']).to.eq(`${scopedEnvKey}default_topic`); + expect(server['subscriptionName']).to.eq( + `${scopedEnvKey}default_subscription`, + ); + }); + }); + }); + describe('listen', () => { describe('when is check existence is true', () => { beforeEach(async () => { @@ -191,6 +206,24 @@ describe('GCPubSubServer', () => { }), ).to.be.true; }); + + describe('when scopedEnvKey is defined', () => { + beforeEach(async () => { + server = getInstance({ scopedEnvKey: 'my-key' }); + await server.listen(() => {}); + }); + + it('should set scopedEnvKey on replyTo', async () => { + const message = { test: true }; + const replyTo = 'test'; + const correlationId = '0'; + + await server.sendMessage(message, replyTo, correlationId); + expect(Array.from(server['replyTopics'].values())).to.deep.eq([ + 'my-keytest', + ]); + }); + }); }); describe('handleEvent', () => { diff --git a/lib/gc-pubsub.server.ts b/lib/gc-pubsub.server.ts index fa5c612..72594dc 100644 --- a/lib/gc-pubsub.server.ts +++ b/lib/gc-pubsub.server.ts @@ -48,6 +48,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { protected readonly replyTopics: Set; protected readonly init: boolean; protected readonly checkExistence: boolean; + protected readonly scopedEnvKey: string | null; protected client: PubSub | null = null; protected subscription: Subscription | null = null; @@ -56,12 +57,20 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { super(); this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG; - + this.scopedEnvKey = this.options.scopedEnvKey ?? null; this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC; + if (this.scopedEnvKey) { + this.topicName = `${this.scopedEnvKey}${this.topicName}`; + } + this.subscriptionName = this.options.subscription || GC_PUBSUB_DEFAULT_SUBSCRIPTION; + if (this.scopedEnvKey) { + this.subscriptionName = `${this.scopedEnvKey}${this.subscriptionName}`; + } + this.subscriberConfig = this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG; @@ -205,6 +214,10 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { message as unknown as OutgoingResponse, ); + if (this.scopedEnvKey) { + replyTo = `${this.scopedEnvKey}${replyTo}`; + } + this.replyTopics.add(replyTo); await this.client From 4c6eefdcafb9f8afcfa057f181362cd869a926ee Mon Sep 17 00:00:00 2001 From: FPierre Date: Fri, 2 Jun 2023 08:14:39 +0200 Subject: [PATCH 2/4] test: add e2e tests --- tests/e2e/scoped-env-gc-pubsub.spec.ts | 60 +++++++++++++ tests/src/gc-pubsub-scoped-env.controller.ts | 92 ++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 tests/e2e/scoped-env-gc-pubsub.spec.ts create mode 100644 tests/src/gc-pubsub-scoped-env.controller.ts diff --git a/tests/e2e/scoped-env-gc-pubsub.spec.ts b/tests/e2e/scoped-env-gc-pubsub.spec.ts new file mode 100644 index 0000000..8b6a3ff --- /dev/null +++ b/tests/e2e/scoped-env-gc-pubsub.spec.ts @@ -0,0 +1,60 @@ +import { INestApplication } from '@nestjs/common'; +import { Test } from '@nestjs/testing'; +import { expect } from 'chai'; +import * as request from 'supertest'; +import { GCPubSubServer } from '../../lib'; +import { + GCPubSubScopedEnvController1, + GCPubSubScopedEnvController2, +} from '../src/gc-pubsub-scoped-env.controller'; + +describe('GC PubSub transport', () => { + let server; + let app: INestApplication; + + describe('useAttributes=false', () => { + beforeEach(async () => { + await Test.createTestingModule({ + controllers: [GCPubSubScopedEnvController2], + }).compile(); + const module = await Test.createTestingModule({ + controllers: [GCPubSubScopedEnvController1], + }).compile(); + + app = module.createNestApplication(); + server = app.getHttpAdapter().getInstance(); + + app.connectMicroservice({ + strategy: new GCPubSubServer({ + client: { + apiEndpoint: 'localhost:8681', + projectId: 'microservice', + }, + scopedEnvKey: 'foobar', + }), + }); + await app.startAllMicroservices(); + await app.init(); + }); + + it('/POST', () => { + request(server).post('/rpc').expect(200, 'scoped RPC'); + }); + + it('/POST (event notification)', (done) => { + request(server) + .post('/notify') + .end(() => { + setTimeout(() => { + expect(GCPubSubScopedEnvController1.IS_NOTIFIED).to.be.true; + expect(GCPubSubScopedEnvController2.IS_NOTIFIED).to.be.false; + done(); + }, 1000); + }); + }); + + afterEach(async () => { + await app.close(); + }); + }); +}); diff --git a/tests/src/gc-pubsub-scoped-env.controller.ts b/tests/src/gc-pubsub-scoped-env.controller.ts new file mode 100644 index 0000000..163b49b --- /dev/null +++ b/tests/src/gc-pubsub-scoped-env.controller.ts @@ -0,0 +1,92 @@ +import { + Controller, + HttpCode, + OnApplicationShutdown, + Post, +} from '@nestjs/common'; +import { + ClientProxy, + EventPattern, + MessagePattern, +} from '@nestjs/microservices'; +import { GCPubSubClient } from '../../lib'; +import { Observable } from 'rxjs'; + +@Controller() +export class GCPubSubScopedEnvController1 implements OnApplicationShutdown { + static IS_NOTIFIED = false; + + client: ClientProxy; + + constructor() { + this.client = new GCPubSubClient({ + client: { + apiEndpoint: 'localhost:8681', + projectId: 'microservice', + }, + replyTopic: 'default_reply_topic', + replySubscription: 'default_reply_subscription', + scopedEnvKey: 'foobar', + }); + } + + onApplicationShutdown(signal?: string) { + return this.client.close(); + } + + @Post() + @HttpCode(200) + call() { + return this.client.send({ cmd: 'rpc' }, {}); + } + + @Post('notify') + async sendNotification(): Promise { + return this.client.emit<{ notification: boolean; id: string }>( + 'notification', + { notification: true, id: 'id' }, + ); + } + + @MessagePattern({ cmd: 'rpc' }) + rpc(): string { + return 'scoped RPC'; + } + + @EventPattern('notification') + eventHandler(data: { notification: boolean; id: string }) { + GCPubSubScopedEnvController1.IS_NOTIFIED = data.notification; + } +} + +@Controller() +export class GCPubSubScopedEnvController2 implements OnApplicationShutdown { + static IS_NOTIFIED = false; + + client: ClientProxy; + + constructor() { + this.client = new GCPubSubClient({ + client: { + apiEndpoint: 'localhost:8681', + projectId: 'microservice', + }, + replyTopic: 'default_reply_topic', + replySubscription: 'default_reply_subscription', + }); + } + + onApplicationShutdown(signal?: string) { + return this.client.close(); + } + + @MessagePattern({ cmd: 'rpc' }) + rpc(): string { + return 'RPC'; + } + + @EventPattern('notification') + eventHandler(data: { notification: boolean; id: string }) { + GCPubSubScopedEnvController2.IS_NOTIFIED = data.notification; + } +} From 5699070477b19db4fda5ae55a0bffc2818f918fe Mon Sep 17 00:00:00 2001 From: FPierre Date: Mon, 26 Jun 2023 10:06:57 +0200 Subject: [PATCH 3/4] docs: update readme file --- README.md | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 3ee0ce3..685ef70 100644 --- a/README.md +++ b/README.md @@ -25,15 +25,18 @@ $ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice To use the Pub/Sub transporter, pass the following options object to the `createMicroservice()` method: ```typescript -const app = await NestFactory.createMicroservice(ApplicationModule, { - strategy: new GCPubSubServer({ - topic: 'cats_topic', - subscription: 'cats_subscription', - client: { - projectId: 'microservice', - }, - }), -}); +const app = await NestFactory.createMicroservice( + ApplicationModule, + { + strategy: new GCPubSubServer({ + topic: 'cats_topic', + subscription: 'cats_subscription', + client: { + projectId: 'microservice', + }, + }), + }, +); ``` #### Options @@ -44,7 +47,7 @@ The `options` property is specific to the chosen transporter. The GCloud topic Topic name which your server subscription will belong to - + subscription Subscription name which your server will listen to @@ -85,18 +88,21 @@ The `options` property is specific to the chosen transporter. The GCloud subscriber Additional subscriber options (read more here) + + scopedEnvKey + Scope topics and subscriptions to avoid losing messages when several people are working on the same code base. Will prefixes topics and subscriptions with this key (read more here) + #### Client ```typescript -const client = - new GCPubSubClient({ - client: { - apiEndpoint: 'localhost:8681', - projectId: 'microservice', - }, - }); +const client = new GCPubSubClient({ + client: { + apiEndpoint: 'localhost:8681', + projectId: 'microservice', + }, +}); client .send('pattern', 'Hello world!') .subscribe((response) => console.log(response)); From 9b0c15e750c94d4833b2d66a990039945c68361e Mon Sep 17 00:00:00 2001 From: FPierre Date: Mon, 3 Jul 2023 12:30:05 +0200 Subject: [PATCH 4/4] refactor: scoped env key is empty strign rather than null --- lib/gc-pubsub.client.ts | 19 ++++--------------- lib/gc-pubsub.server.ts | 16 +++++----------- 2 files changed, 9 insertions(+), 26 deletions(-) diff --git a/lib/gc-pubsub.client.ts b/lib/gc-pubsub.client.ts index 826d19a..90cbe0b 100644 --- a/lib/gc-pubsub.client.ts +++ b/lib/gc-pubsub.client.ts @@ -54,13 +54,10 @@ export class GCPubSubClient extends ClientProxy { this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG; - this.scopedEnvKey = this.options.scopedEnvKey ?? null; + this.scopedEnvKey = this.options.scopedEnvKey ?? ''; this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC; - - if (this.scopedEnvKey) { - this.topicName = `${this.scopedEnvKey}${this.topicName}`; - } + this.topicName = `${this.scopedEnvKey}${this.topicName}`; this.subscriberConfig = this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG; @@ -68,17 +65,9 @@ export class GCPubSubClient extends ClientProxy { this.publisherConfig = this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG; - this.replyTopicName = this.options.replyTopic; - - if (this.scopedEnvKey) { - this.replyTopicName = `${this.scopedEnvKey}${this.replyTopicName}`; - } - - this.replySubscriptionName = this.options.replySubscription; + this.replyTopicName = `${this.scopedEnvKey}${this.options.replyTopic}`; - if (this.scopedEnvKey) { - this.replySubscriptionName = `${this.scopedEnvKey}${this.replySubscriptionName}`; - } + this.replySubscriptionName = `${this.scopedEnvKey}${this.options.replySubscription}`; this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK; this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT; diff --git a/lib/gc-pubsub.server.ts b/lib/gc-pubsub.server.ts index 72594dc..97c5c48 100644 --- a/lib/gc-pubsub.server.ts +++ b/lib/gc-pubsub.server.ts @@ -57,19 +57,15 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { super(); this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG; - this.scopedEnvKey = this.options.scopedEnvKey ?? null; - this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC; + this.scopedEnvKey = this.options.scopedEnvKey ?? ''; - if (this.scopedEnvKey) { - this.topicName = `${this.scopedEnvKey}${this.topicName}`; - } + this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC; + this.topicName = `${this.scopedEnvKey}${this.topicName}`; this.subscriptionName = this.options.subscription || GC_PUBSUB_DEFAULT_SUBSCRIPTION; - if (this.scopedEnvKey) { - this.subscriptionName = `${this.scopedEnvKey}${this.subscriptionName}`; - } + this.subscriptionName = `${this.scopedEnvKey}${this.subscriptionName}`; this.subscriberConfig = this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG; @@ -214,9 +210,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy { message as unknown as OutgoingResponse, ); - if (this.scopedEnvKey) { - replyTo = `${this.scopedEnvKey}${replyTo}`; - } + replyTo = `${this.scopedEnvKey}${replyTo}`; this.replyTopics.add(replyTo);