Skip to content

Commit

Permalink
feat(pubsub-microservice): add listen option to EventPattern extras
Browse files Browse the repository at this point in the history
  • Loading branch information
ccoeurderoy committed Jun 17, 2023
1 parent 351d368 commit 0d0e5b8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
17 changes: 9 additions & 8 deletions packages/google-pubsub-microservice/src/GooglePubSubServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,34 @@ export class GCPubSubServer extends Server implements CustomTransportStrategy {
private readonly options?: GooglePubSubOptions & { listenOptions?: GCListenOptions; topicsNames?: string[] },
) {
super();
}

/**
* Server listening method
*/
public listen(callback: (error?: Error, info?: unknown[]) => void): void {
const gcPubSub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: this.options,
});

this.gcClient = gcPubSub;
}

/**
* Server listening method
*/
public listen(callback: (error?: Error, info?: unknown[]) => void): void {
const handlers: Promise<void>[] = [];

for (const subscriptionName of this.messageHandlers.keys()) {
for (const [subscriptionName, messageHandler] of this.messageHandlers) {
if (this.options?.topicsNames !== undefined && !this.options?.topicsNames?.includes(subscriptionName)) {
continue;
}
this.logger.debug(`Registered new subscription "${subscriptionName}"`);

handlers.push(
gcPubSub.listen(subscriptionName, {
this.gcClient.listen(subscriptionName, {
onMessage: this.handleMessage(subscriptionName),
onError: this.handleError,
options: {
autoAck: true,
...this.options?.listenOptions,
...(messageHandler.extras as GCListenOptions),
},
}),
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import * as delay from 'delay';

import { GCPubSubServer } from '../src';
import { SUBSCRIPTION_NAME, SUBSCRIPTION_NAME_2 } from './test-app/app.controller';
import {
SUBSCRIPTION_NAME,
SUBSCRIPTION_NAME_2,
SUBSCRIPTION_NAME_3,
SUBSCRIPTION_NAME_4,
TOPIC_NAME,
} from './test-app/app.controller';
import { AppService } from './test-app/app.service';
import { getTestingApplication } from './test-app/main';

Expand Down Expand Up @@ -112,14 +118,54 @@ describe('GooglePubSubServer', () => {
});
const { app } = await getTestingApplication(server);
/**
* After launching the application, ensure that all subscriptions have been created
* After launching the application and creating a topic, ensure that all subscriptions have been created
*/
const [topic] = await server.gcClient.client.createTopic(TOPIC_NAME);

await app.listen();

expect(server.gcClient.subscriptions.get(SUBSCRIPTION_NAME)).toBeDefined();
expect(await server.gcClient.client.subscription(SUBSCRIPTION_NAME).exists()).toEqual([true]);
expect(server.gcClient.subscriptions.get(SUBSCRIPTION_NAME_2)).toBeDefined();
expect(await server.gcClient.client.subscription(SUBSCRIPTION_NAME_2).exists()).toEqual([true]);
expect(server.gcClient.subscriptions.get(SUBSCRIPTION_NAME_3)).toBeDefined();
expect(await server.gcClient.client.subscription(SUBSCRIPTION_NAME_3).exists()).toEqual([true]);
expect(server.gcClient.subscriptions.get(SUBSCRIPTION_NAME_4)).toBeDefined();
expect(await server.gcClient.client.subscription(SUBSCRIPTION_NAME_4).exists()).toEqual([true]);

await topic.delete();
await server.gcClient.client.subscription(SUBSCRIPTION_NAME_4).delete();
await server.gcClient.client.subscription(SUBSCRIPTION_NAME_3).delete();

await app.close();
});

it('GCPSS05 - Emit an event and test if it is received twice', async () => {
const server: GCPubSubServer = new GCPubSubServer({
projectId: 'algoan-test',
debug: true,
});

/**
* First, create a topic to ensure it already exists
*/
const [topic] = await server.gcClient.client.createTopic(TOPIC_NAME);

const { app, module } = await getTestingApplication(server);
const appService: AppService = module.get(AppService);
const spy: jest.SpyInstance = jest.spyOn(appService, 'handleTestEvent');
await app.listen();
await server.gcClient.emit(TOPIC_NAME, {
hello: 'world',
});
await delay(2000);

/**
* Since we have two listeners on the same topic, the spy must be called twice
*/
expect(spy).toHaveBeenCalledTimes(2);

await topic.delete();

await app.close();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import { AppService } from './app.service';

export const SUBSCRIPTION_NAME: string = 'test_event';
export const SUBSCRIPTION_NAME_2: string = 'test_event_2';
export const SUBSCRIPTION_NAME_3: string = 'test_event_3';
export const SUBSCRIPTION_NAME_4: string = 'test_event_4';
export const TOPIC_NAME: string = 'my_topic';

/**
* Fake app controller
Expand All @@ -30,4 +33,22 @@ export class AppController {
public async handleTestEvent2(@Payload() data: EmittedMessage<{ hello: string }>): Promise<void> {
this.appService.handleTestEvent(data);
}

/**
* Handle the test event (3)
* @param data Payload sent
*/
@EventPattern(SUBSCRIPTION_NAME_3, { topicName: TOPIC_NAME })
public async handleTestEvent3(@Payload() data: EmittedMessage<{ hello: string }>): Promise<void> {
this.appService.handleTestEvent(data);
}

/**
* Handle the test event (4) based on the same topic than (3)
* @param data Payload sent
*/
@EventPattern(SUBSCRIPTION_NAME_4, { topicName: TOPIC_NAME })
public async handleTestEvent4(@Payload() data: EmittedMessage<{ hello: string }>): Promise<void> {
this.appService.handleTestEvent(data);
}
}

0 comments on commit 0d0e5b8

Please sign in to comment.