Skip to content

Commit

Permalink
Merge pull request #29 from FPierre/feat/scoped-env
Browse files Browse the repository at this point in the history
Scoped env
  • Loading branch information
p-fedyukovich authored Jul 3, 2023
2 parents 0735261 + 9b0c15e commit 8f5af40
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 19 deletions.
40 changes: 23 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ $ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microservice
To use the Pub/Sub transporter, pass the following options object to the `createMicroservice()` method:

```typescript
const app = await NestFactory.createMicroservice<MicroserviceOptions>(ApplicationModule, {
strategy: new GCPubSubServer({
topic: 'cats_topic',
subscription: 'cats_subscription',
client: {
projectId: 'microservice',
},
}),
});
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
ApplicationModule,
{
strategy: new GCPubSubServer({
topic: 'cats_topic',
subscription: 'cats_subscription',
client: {
projectId: 'microservice',
},
}),
},
);
```

#### Options
Expand All @@ -44,7 +47,7 @@ The `options` property is specific to the chosen transporter. The <strong>GCloud
<tr>
<td><code>topic</code></td>
<td>Topic name which your server subscription will belong to</td>
</tr>
</tr>
<tr>
<td><code>subscription</code></td>
<td>Subscription name which your server will listen to</td>
Expand Down Expand Up @@ -85,18 +88,21 @@ The `options` property is specific to the chosen transporter. The <strong>GCloud
<td><code>subscriber</code></td>
<td>Additional subscriber options (read more <a href="https://googleapis.dev/nodejs/pubsub/latest/global.html#SubscriberOptions" rel="nofollow" target="_blank">here</a>)</td>
</tr>
<tr>
<td><code>scopedEnvKey</code></td>
<td>Scope topics and subscriptions to avoid losing messages when several people are working on the same code base. Will prefixes topics and subscriptions with this key (read more <a href="https://github.com/p-fedyukovich/nestjs-google-pubsub-microservice/pull/29" rel="nofollow" target="_blank">here</a>)</td>
</tr>
</table>

#### Client

```typescript
const client =
new GCPubSubClient({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
});
const client = new GCPubSubClient({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
});
client
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
Expand Down
21 changes: 21 additions & 0 deletions lib/gc-pubsub.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ describe('GCPubSubClient', () => {
sandbox.restore();
});

describe('constructor', () => {
describe('when the scopedEnvKey is defined', () => {
beforeEach(() => {
client = getInstance({
topic: 'topic',
replyTopic: 'replyTopic',
replySubscription: 'replySubscription',
scopedEnvKey: 'my-key',
});
});

it('should set the scopedEnvKey on topics and subscriptions', () => {
expect(client['topicName']).to.be.eq('my-keytopic');
expect(client['replyTopicName']).to.be.eq('my-keyreplyTopic');
expect(client['replySubscriptionName']).to.be.eq(
'my-keyreplySubscription',
);
});
});
});

describe('connect', () => {
describe('when is not connected', () => {
describe('when check existence is true', () => {
Expand Down
8 changes: 6 additions & 2 deletions lib/gc-pubsub.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,28 @@ export class GCPubSubClient extends ClientProxy {
protected replySubscription: Subscription | null = null;
protected topic: Topic | null = null;
protected init: boolean;
protected readonly scopedEnvKey: string | null;
protected readonly checkExistence: boolean;

constructor(protected readonly options: GCPubSubOptions) {
super();

this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG;

this.scopedEnvKey = this.options.scopedEnvKey ?? '';

this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC;
this.topicName = `${this.scopedEnvKey}${this.topicName}`;

this.subscriberConfig =
this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG;

this.publisherConfig =
this.options.publisher || GC_PUBSUB_DEFAULT_PUBLISHER_CONFIG;

this.replyTopicName = this.options.replyTopic;
this.replyTopicName = `${this.scopedEnvKey}${this.options.replyTopic}`;

this.replySubscriptionName = this.options.replySubscription;
this.replySubscriptionName = `${this.scopedEnvKey}${this.options.replySubscription}`;

this.noAck = this.options.noAck ?? GC_PUBSUB_DEFAULT_NO_ACK;
this.init = this.options.init ?? GC_PUBSUB_DEFAULT_INIT;
Expand Down
1 change: 1 addition & 0 deletions lib/gc-pubsub.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface GCPubSubOptions {
init?: boolean;
useAttributes?: boolean;
checkExistence?: boolean;
scopedEnvKey?: string | null;
publisher?: PublishOptions;
subscriber?: SubscriberOptions;
serializer?: Serializer;
Expand Down
33 changes: 33 additions & 0 deletions lib/gc-pubsub.server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ describe('GCPubSubServer', () => {
sandbox.restore();
});

describe('constructor', () => {
describe('when the scopedEnvKey is defined', () => {
it('should set the scopedEnvKey on topics and subscriptions', () => {
const scopedEnvKey = 'my-key';

server = getInstance({ scopedEnvKey } as GCPubSubOptions);

expect(server['topicName']).to.eq(`${scopedEnvKey}default_topic`);
expect(server['subscriptionName']).to.eq(
`${scopedEnvKey}default_subscription`,
);
});
});
});

describe('listen', () => {
describe('when is check existence is true', () => {
beforeEach(async () => {
Expand Down Expand Up @@ -191,6 +206,24 @@ describe('GCPubSubServer', () => {
}),
).to.be.true;
});

describe('when scopedEnvKey is defined', () => {
beforeEach(async () => {
server = getInstance({ scopedEnvKey: 'my-key' });
await server.listen(() => {});
});

it('should set scopedEnvKey on replyTo', async () => {
const message = { test: true };
const replyTo = 'test';
const correlationId = '0';

await server.sendMessage(message, replyTo, correlationId);
expect(Array.from(server['replyTopics'].values())).to.deep.eq([
'my-keytest',
]);
});
});
});

describe('handleEvent', () => {
Expand Down
7 changes: 7 additions & 0 deletions lib/gc-pubsub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
protected readonly replyTopics: Set<string>;
protected readonly init: boolean;
protected readonly checkExistence: boolean;
protected readonly scopedEnvKey: string | null;

protected client: PubSub | null = null;
protected subscription: Subscription | null = null;
Expand All @@ -56,12 +57,16 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
super();

this.clientConfig = this.options.client || GC_PUBSUB_DEFAULT_CLIENT_CONFIG;
this.scopedEnvKey = this.options.scopedEnvKey ?? '';

this.topicName = this.options.topic || GC_PUBSUB_DEFAULT_TOPIC;
this.topicName = `${this.scopedEnvKey}${this.topicName}`;

this.subscriptionName =
this.options.subscription || GC_PUBSUB_DEFAULT_SUBSCRIPTION;

this.subscriptionName = `${this.scopedEnvKey}${this.subscriptionName}`;

this.subscriberConfig =
this.options.subscriber || GC_PUBSUB_DEFAULT_SUBSCRIBER_CONFIG;

Expand Down Expand Up @@ -205,6 +210,8 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
message as unknown as OutgoingResponse,
);

replyTo = `${this.scopedEnvKey}${replyTo}`;

this.replyTopics.add(replyTo);

await this.client
Expand Down
60 changes: 60 additions & 0 deletions tests/e2e/scoped-env-gc-pubsub.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { GCPubSubServer } from '../../lib';
import {
GCPubSubScopedEnvController1,
GCPubSubScopedEnvController2,
} from '../src/gc-pubsub-scoped-env.controller';

describe('GC PubSub transport', () => {
let server;
let app: INestApplication;

describe('useAttributes=false', () => {
beforeEach(async () => {
await Test.createTestingModule({
controllers: [GCPubSubScopedEnvController2],
}).compile();
const module = await Test.createTestingModule({
controllers: [GCPubSubScopedEnvController1],
}).compile();

app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();

app.connectMicroservice({
strategy: new GCPubSubServer({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
scopedEnvKey: 'foobar',
}),
});
await app.startAllMicroservices();
await app.init();
});

it('/POST', () => {
request(server).post('/rpc').expect(200, 'scoped RPC');
});

it('/POST (event notification)', (done) => {
request(server)
.post('/notify')
.end(() => {
setTimeout(() => {
expect(GCPubSubScopedEnvController1.IS_NOTIFIED).to.be.true;
expect(GCPubSubScopedEnvController2.IS_NOTIFIED).to.be.false;
done();
}, 1000);
});
});

afterEach(async () => {
await app.close();
});
});
});
92 changes: 92 additions & 0 deletions tests/src/gc-pubsub-scoped-env.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import {
Controller,
HttpCode,
OnApplicationShutdown,
Post,
} from '@nestjs/common';
import {
ClientProxy,
EventPattern,
MessagePattern,
} from '@nestjs/microservices';
import { GCPubSubClient } from '../../lib';
import { Observable } from 'rxjs';

@Controller()
export class GCPubSubScopedEnvController1 implements OnApplicationShutdown {
static IS_NOTIFIED = false;

client: ClientProxy;

constructor() {
this.client = new GCPubSubClient({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
replyTopic: 'default_reply_topic',
replySubscription: 'default_reply_subscription',
scopedEnvKey: 'foobar',
});
}

onApplicationShutdown(signal?: string) {
return this.client.close();
}

@Post()
@HttpCode(200)
call() {
return this.client.send({ cmd: 'rpc' }, {});
}

@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<{ notification: boolean; id: string }>(
'notification',
{ notification: true, id: 'id' },
);
}

@MessagePattern({ cmd: 'rpc' })
rpc(): string {
return 'scoped RPC';
}

@EventPattern('notification')
eventHandler(data: { notification: boolean; id: string }) {
GCPubSubScopedEnvController1.IS_NOTIFIED = data.notification;
}
}

@Controller()
export class GCPubSubScopedEnvController2 implements OnApplicationShutdown {
static IS_NOTIFIED = false;

client: ClientProxy;

constructor() {
this.client = new GCPubSubClient({
client: {
apiEndpoint: 'localhost:8681',
projectId: 'microservice',
},
replyTopic: 'default_reply_topic',
replySubscription: 'default_reply_subscription',
});
}

onApplicationShutdown(signal?: string) {
return this.client.close();
}

@MessagePattern({ cmd: 'rpc' })
rpc(): string {
return 'RPC';
}

@EventPattern('notification')
eventHandler(data: { notification: boolean; id: string }) {
GCPubSubScopedEnvController2.IS_NOTIFIED = data.notification;
}
}

0 comments on commit 8f5af40

Please sign in to comment.