From 2b00d07681a7de3b3e5686c40fbe4ba152cb7a7d Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 7 Dec 2023 13:30:04 +0200 Subject: [PATCH] Implement handlerSpy support for SQS --- README.md | 46 +++++++++++++++++++ packages/core/index.ts | 2 + packages/core/lib/queues/HandlerSpy.ts | 6 +-- packages/core/test/queues/HandlerSpy.spec.ts | 10 ++-- .../lib/utils/sqsMessageDeserializer.spec.ts | 1 + .../SqsPermissionConsumerMultiSchema.ts | 1 + .../SqsPermissionsConsumerMonoSchema.spec.ts | 10 ++-- .../SqsPermissionsConsumerMultiSchema.spec.ts | 25 +++++----- .../sqs/test/consumers/userConsumerSchemas.ts | 2 + .../SqsPermissionPublisherMonoSchema.spec.ts | 6 +-- .../SqsPermissionPublisherMonoSchema.ts | 1 + 11 files changed, 81 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 498df0fe..69384e25 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,52 @@ Both publishers and consumers accept a queue name and configuration as parameter If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist. +## Handler spies + +In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly. + +In order to enable this functionality, configure spyHandler on the publisher or consumer: + +```ts +export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema< + SupportedMessages, + ExecutionContext +> { + constructor( + dependencies: SQSConsumerDependencies, + userService: UserService, + ) { + super(dependencies, { + // + // rest of configuration skipped + // + handlerSpy: { + bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100 + messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id` + }, + } + } +} +``` + +Then you can use handler spies in your tests or production code to await certain events: + +```ts + await myConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') + + await myConsumer.handlerSpy.waitForMessageWithId('1') + + await myConsumer.handlerSpy.waitForMessageWithId('1') + + await myConsumer.handlerSpy.waitForMessage({ + projectId: 1, + }) + + await myPublisher.handlerSpy.waitForMessage({ + userId: 1 + }, 'consumed') +``` + ## Automatic Reconnects (RabbitMQ) `message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. diff --git a/packages/core/index.ts b/packages/core/index.ts index 198c57a6..a042f77c 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -41,6 +41,8 @@ export { BarrierResultNegative, } from './lib/queues/HandlerContainer' export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer' +export { HandlerSpy } from './lib/queues/HandlerSpy' +export type { SpyResult, HandlerSpyParams, PublicHandlerSpy } from './lib/queues/HandlerSpy' export { MessageSchemaContainer } from './lib/queues/MessageSchemaContainer' export type { MessageSchemaContainerOptions } from './lib/queues/MessageSchemaContainer' diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts index a30a7cd8..d680eb9d 100644 --- a/packages/core/lib/queues/HandlerSpy.ts +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -66,8 +66,8 @@ export class HandlerSpy { ) } - waitForEventWithId(id: string, processingResult?: MessageProcessingResult) { - return this.waitForEvent( + waitForMessageWithId(id: string, processingResult?: MessageProcessingResult) { + return this.waitForMessage( // @ts-ignore { [this.messageIdField]: id, @@ -76,7 +76,7 @@ export class HandlerSpy { ) } - waitForEvent( + waitForMessage( fields: Partial, processingResult?: MessageProcessingResult, ): Promise> { diff --git a/packages/core/test/queues/HandlerSpy.spec.ts b/packages/core/test/queues/HandlerSpy.spec.ts index c4689ee9..27ab1fad 100644 --- a/packages/core/test/queues/HandlerSpy.spec.ts +++ b/packages/core/test/queues/HandlerSpy.spec.ts @@ -32,7 +32,7 @@ describe('HandlerSpy', () => { }) }) - describe('waitForEvent', () => { + describe('waitForMessage', () => { it('Finds previously consumed event', async () => { const spy = new HandlerSpy() @@ -46,7 +46,7 @@ describe('HandlerSpy', () => { message: TEST_MESSAGE, }) - const message = await spy.waitForEvent({ + const message = await spy.waitForMessage({ status: 'done', }) @@ -66,14 +66,14 @@ describe('HandlerSpy', () => { message: TEST_MESSAGE, }) - const message = await spy.waitForEvent( + const message = await spy.waitForMessage( { status: 'done', }, 'consumed', ) - const message2 = await spy.waitForEvent( + const message2 = await spy.waitForMessage( { status: 'done', }, @@ -92,7 +92,7 @@ describe('HandlerSpy', () => { message: TEST_MESSAGE_2, }) - const spyPromise = spy.waitForEvent({ + const spyPromise = spy.waitForMessage({ status: 'done', }) diff --git a/packages/sqs/lib/utils/sqsMessageDeserializer.spec.ts b/packages/sqs/lib/utils/sqsMessageDeserializer.spec.ts index 454787fb..1952b4a3 100644 --- a/packages/sqs/lib/utils/sqsMessageDeserializer.spec.ts +++ b/packages/sqs/lib/utils/sqsMessageDeserializer.spec.ts @@ -8,6 +8,7 @@ import { deserializeSQSMessage } from './sqsMessageDeserializer' describe('messageDeserializer', () => { it('deserializes valid JSON', () => { const messagePayload: PERMISSIONS_MESSAGE_TYPE = { + id: '1', messageType: 'add', userIds: [1], permissions: ['perm'], diff --git a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts index 35cab3e4..56c6f24b 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts @@ -62,6 +62,7 @@ export class SqsPermissionConsumerMultiSchema extends AbstractSqsConsumerMultiSc dependencies, { messageTypeField: 'messageType', + handlerSpy: true, deletionConfig: { deleteIfExists: true, }, diff --git a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts index edeb383c..2eb4e633 100644 --- a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts @@ -128,7 +128,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { permissions: perms, }) - await consumer.handlerSpy.waitForEventWithId('abcd', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('abcd', 'consumed') const updatedUsersPermissions = await retrievePermissions(userIds) if (null === updatedUsersPermissions) { @@ -151,7 +151,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { }) // no users in the database, so message will go back to the queue - await consumer.handlerSpy.waitForEventWithId('123', 'retryLater') + await consumer.handlerSpy.waitForMessageWithId('123', 'retryLater') const usersFromDb = await retrievePermissions(userIds) expect(usersFromDb).toBeNull() @@ -160,7 +160,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { userPermissionMap[200] = [] userPermissionMap[300] = [] - await consumer.handlerSpy.waitForEventWithId('123', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('123', 'consumed') const usersPermissions = await retrievePermissions(userIds) if (null === usersPermissions) { @@ -185,7 +185,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { }) // not all users are in the database, so message will go back to the queue - await consumer.handlerSpy.waitForEventWithId('abc', 'retryLater') + await consumer.handlerSpy.waitForMessageWithId('abc', 'retryLater') const usersFromDb = await retrievePermissions(userIds) expect(usersFromDb).toBeNull() @@ -193,7 +193,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { userPermissionMap[200] = [] userPermissionMap[300] = [] - await consumer.handlerSpy.waitForEventWithId('abc', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('abc', 'consumed') const usersPermissions = await retrievePermissions(userIds) if (null === usersPermissions) { diff --git a/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts b/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts index 46c2386d..1c678984 100644 --- a/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts @@ -1,7 +1,6 @@ import type { SQSClient } from '@aws-sdk/client-sqs' import { ReceiveMessageCommand } from '@aws-sdk/client-sqs' import type { BarrierResult } from '@message-queue-toolkit/core' -import { waitAndRetry } from '@message-queue-toolkit/core' import type { AwilixContainer } from 'awilix' import { asClass, asFunction } from 'awilix' import { describe, beforeEach, afterEach, expect, it } from 'vitest' @@ -89,12 +88,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '1', messageType: 'add', }) - await waitAndRetry(() => { - return logger.loggedMessages.length === 1 - }) + await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(logger.loggedMessages.length).toBe(1) await newConsumer.close() @@ -137,12 +135,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '2', messageType: 'add', }) - await waitAndRetry(() => { - return newConsumer.addCounter === 1 - }) + await newConsumer.handlerSpy.waitForMessageWithId('2', 'consumed') expect(newConsumer.addCounter).toBe(1) expect(barrierCounter).toBe(2) @@ -168,12 +165,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '3', messageType: 'add', }) - await waitAndRetry(() => { - return newConsumer.addCounter > 0 - }) + await newConsumer.handlerSpy.waitForMessageWithId('3', 'consumed') expect(newConsumer.addCounter).toBe(1) expect(barrierCounter).toBe(2) @@ -213,18 +209,21 @@ describe('SqsPermissionsConsumerMultiSchema', () => { describe('happy path', () => { it('Processes messages', async () => { await publisher.publish({ + id: '10', messageType: 'add', }) await publisher.publish({ + id: '20', messageType: 'remove', }) await publisher.publish({ + id: '30', messageType: 'remove', }) - await waitAndRetry(() => { - return consumer.addCounter === 1 && consumer.removeCounter === 2 - }) + await consumer.handlerSpy.waitForMessageWithId('10', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('20', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('30', 'consumed') expect(consumer.addCounter).toBe(1) expect(consumer.removeCounter).toBe(2) diff --git a/packages/sqs/test/consumers/userConsumerSchemas.ts b/packages/sqs/test/consumers/userConsumerSchemas.ts index 192957ca..163901b4 100644 --- a/packages/sqs/test/consumers/userConsumerSchemas.ts +++ b/packages/sqs/test/consumers/userConsumerSchemas.ts @@ -8,10 +8,12 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({ }) export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({ + id: z.string(), messageType: z.literal('add'), }) export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({ + id: z.string(), messageType: z.literal('remove'), }) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts index 99b2c128..af0922af 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts @@ -46,6 +46,7 @@ describe('SqsPermissionPublisher', () => { describe('logging', () => { it('logs a message when logging is enabled', async () => { const message = { + id: '1', userIds, messageType: 'add', permissions: perms, @@ -53,9 +54,7 @@ describe('SqsPermissionPublisher', () => { await publisher.publish(message) - await waitAndRetry(() => { - return logger.loggedMessages.length === 1 - }) + await publisher.handlerSpy.waitForMessageWithId('1') expect(logger.loggedMessages.length).toBe(1) }) @@ -76,6 +75,7 @@ describe('SqsPermissionPublisher', () => { const { permissionPublisher } = diContainer.cradle const message = { + id: '2', userIds, messageType: 'add', permissions: perms, diff --git a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts index b56fbe6a..40684b4a 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts @@ -13,6 +13,7 @@ export class SqsPermissionPublisherMonoSchema extends AbstractSqsPublisherMonoSc QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME, }, }, + handlerSpy: true, deletionConfig: { deleteIfExists: false, },