diff --git a/README.md b/README.md index 498df0fe..653a10ff 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,59 @@ Both publishers and consumers accept a queue name and configuration as parameter If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist. +## Handler spies + +In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly. + +In order to enable this functionality, configure spyHandler on the publisher or consumer: + +```ts +export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema< + SupportedMessages, + ExecutionContext +> { + constructor( + dependencies: SQSConsumerDependencies, + userService: UserService, + ) { + super(dependencies, { + // + // rest of configuration skipped + // + handlerSpy: { + bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100 + messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id` + }, + } + } +} +``` + +Then you can use handler spies in your tests or production code to await certain events: + +```ts +await myConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') + +await myConsumer.handlerSpy.waitForMessageWithId('1') + +await myConsumer.handlerSpy.waitForMessageWithId('1') + +await myConsumer.handlerSpy.waitForMessage({ + projectId: 1, +}) + +await myPublisher.handlerSpy.waitForMessage({ + userId: 1 +}, 'consumed') +``` + +You can also check details of the message processing outcome: + +```ts +const result = await myConsumer.handlerSpy.waitForMessageWithId('1') +expect(result.processingResult).toEqual('consumed') +``` + ## Automatic Reconnects (RabbitMQ) `message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 216c0123..88440193 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -39,7 +39,7 @@ "@types/node": "^20.10.3", "@typescript-eslint/eslint-plugin": "^6.13.2", "@typescript-eslint/parser": "^6.13.2", - "@vitest/coverage-v8": "^0.34.6", + "@vitest/coverage-v8": "0.34.6", "amqplib": "^0.10.3", "awilix": "^9.0.0", "awilix-manager": "^4.0.0", @@ -51,7 +51,7 @@ "eslint-plugin-vitest": "^0.3.10", "prettier": "^3.1.0", "typescript": "^5.3.2", - "vitest": "^0.34.6", + "vitest": "0.34.6", "vite": "4.5.0" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", diff --git a/packages/core/index.ts b/packages/core/index.ts index 198c57a6..a042f77c 100644 --- a/packages/core/index.ts +++ b/packages/core/index.ts @@ -41,6 +41,8 @@ export { BarrierResultNegative, } from './lib/queues/HandlerContainer' export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer' +export { HandlerSpy } from './lib/queues/HandlerSpy' +export type { SpyResult, HandlerSpyParams, PublicHandlerSpy } from './lib/queues/HandlerSpy' export { MessageSchemaContainer } from './lib/queues/MessageSchemaContainer' export type { MessageSchemaContainerOptions } from './lib/queues/MessageSchemaContainer' diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index ae0a36e8..9f7e78f4 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -5,9 +5,15 @@ import { resolveGlobalErrorLogObject } from '@lokalise/node-core' import type { ZodSchema, ZodType } from 'zod' import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors' -import type { Logger, TransactionObservabilityManager } from '../types/MessageQueueTypes' +import type { + Logger, + TransactionObservabilityManager, + MessageProcessingResult, +} from '../types/MessageQueueTypes' import type { MessageHandlerConfig } from './HandlerContainer' +import type { HandlerSpy, PublicHandlerSpy, HandlerSpyParams } from './HandlerSpy' +import { resolveHandlerSpy } from './HandlerSpy' export type QueueDependencies = { errorReporter: ErrorReporter @@ -46,18 +52,19 @@ export type DeletionConfig = { } export type CommonQueueOptions = { + messageTypeField: string + messageIdField?: string + handlerSpy?: HandlerSpy | HandlerSpyParams | boolean logMessages?: boolean } export type NewQueueOptions = { - messageTypeField: string locatorConfig?: never deletionConfig?: DeletionConfig creationConfig: CreationConfigType } & CommonQueueOptions export type ExistingQueueOptions = { - messageTypeField: string locatorConfig: QueueLocatorType deletionConfig?: DeletionConfig creationConfig?: never @@ -94,21 +101,34 @@ export abstract class AbstractQueueService< protected readonly errorReporter: ErrorReporter public readonly logger: Logger protected readonly messageTypeField: string + protected readonly messageIdField: string protected readonly logMessages: boolean protected readonly creationConfig?: QueueConfiguration protected readonly locatorConfig?: QueueLocatorType protected readonly deletionConfig?: DeletionConfig + protected readonly _handlerSpy?: HandlerSpy + + get handlerSpy(): PublicHandlerSpy { + if (!this._handlerSpy) { + throw new Error( + 'HandlerSpy was not instantiated, please pass `handlerSpy` parameter during queue service creation.', + ) + } + return this._handlerSpy + } constructor({ errorReporter, logger }: DependenciesType, options: OptionsType) { this.errorReporter = errorReporter this.logger = logger this.messageTypeField = options.messageTypeField + this.messageIdField = options.messageIdField ?? 'id' this.creationConfig = options.creationConfig this.locatorConfig = options.locatorConfig this.deletionConfig = options.deletionConfig this.logMessages = options.logMessages ?? false + this._handlerSpy = resolveHandlerSpy(options) } protected abstract resolveSchema( @@ -148,5 +168,21 @@ export abstract class AbstractQueueService< } } + protected handleMessageProcessed( + message: MessagePayloadSchemas | null, + processingResult: MessageProcessingResult, + messageId?: string, + ) { + if (this._handlerSpy) { + this._handlerSpy.addProcessedMessage( + { + message, + processingResult, + }, + messageId, + ) + } + } + public abstract close(): Promise } diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts index ecd951c0..bf2d4e19 100644 --- a/packages/core/lib/queues/HandlerSpy.ts +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -1,14 +1,19 @@ +import { randomUUID } from 'node:crypto' + +import { isObject } from '@lokalise/node-core' import { Fifo } from 'toad-cache' import type { MessageProcessingResult } from '../types/MessageQueueTypes' +import type { CommonQueueOptions } from './AbstractQueueService' + export type HandlerSpyParams = { bufferSize?: number messageIdField?: string } export type SpyResult = { - message: MessagePayloadSchemas + message: MessagePayloadSchemas | null processingResult: MessageProcessingResult } @@ -25,19 +30,34 @@ type SpyPromiseMetadata = { ) => void } +export function isHandlerSpy(value: unknown): value is HandlerSpy { + return ( + isObject(value) && + (value instanceof HandlerSpy || (value as unknown as HandlerSpy).name === 'HandlerSpy') + ) +} + +export type PublicHandlerSpy = Omit< + HandlerSpy, + 'addProcessedMessage' +> + export class HandlerSpy { - private readonly messageBuffer: Fifo> - private readonly messageIdField: string + public name = 'HandlerSpy' + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private readonly messageBuffer: Fifo> + private readonly messageIdField: keyof MessagePayloadSchemas private readonly spyPromises: SpyPromiseMetadata[] constructor(params: HandlerSpyParams = {}) { this.messageBuffer = new Fifo(params.bufferSize ?? 100) + // @ts-ignore this.messageIdField = params.messageIdField ?? 'id' this.spyPromises = [] } private messageMatchesFilter( - spyResult: SpyResult, + spyResult: SpyResult, fields: Partial, processingResult?: MessageProcessingResult, ) { @@ -50,7 +70,17 @@ export class HandlerSpy { ) } - waitForEvent( + waitForMessageWithId(id: string, processingResult?: MessageProcessingResult) { + return this.waitForMessage( + // @ts-ignore + { + [this.messageIdField]: id, + }, + processingResult, + ) + } + + waitForMessage( fields: Partial, processingResult?: MessageProcessingResult, ): Promise> { @@ -86,18 +116,29 @@ export class HandlerSpy { this.messageBuffer.clear() } - addProcessedMessage(processingResult: SpyResult) { + addProcessedMessage(processingResult: SpyResult, messageId?: string) { + const resolvedMessageId = + processingResult.message?.[this.messageIdField] ?? messageId ?? randomUUID() + + // If we failed to parse message, let's store id at least + const resolvedProcessingResult = processingResult.message + ? processingResult + : { + ...processingResult, + message: { + [this.messageIdField]: messageId, + }, + } + // @ts-ignore - const cacheId = `${processingResult.message[this.messageIdField]}-${Date.now()}-${( - Math.random() + 1 - ) + const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1) .toString(36) .substring(7)}` - this.messageBuffer.set(cacheId, processingResult) + this.messageBuffer.set(cacheId, resolvedProcessingResult) const foundPromise = this.spyPromises.find((spyPromise) => { return this.messageMatchesFilter( - processingResult, + resolvedProcessingResult, spyPromise.fields, spyPromise.processingResult, ) @@ -115,3 +156,17 @@ export class HandlerSpy { } } } + +export function resolveHandlerSpy(queueOptions: CommonQueueOptions) { + if (isHandlerSpy(queueOptions.handlerSpy)) { + return queueOptions.handlerSpy as unknown as HandlerSpy + } + if (!queueOptions.handlerSpy) { + return undefined + } + if (queueOptions.handlerSpy === true) { + return new HandlerSpy() as unknown as HandlerSpy + } + + return new HandlerSpy(queueOptions.handlerSpy) as unknown as HandlerSpy +} diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index 4750f580..3c1dab6a 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -5,7 +5,12 @@ export interface QueueConsumer { close(): Promise } -export type MessageProcessingResult = 'retryLater' | 'success' +export type MessageProcessingResult = + | 'retryLater' + | 'consumed' + | 'published' + | 'error' + | 'invalid_message' export interface SyncPublisher { publish(message: MessagePayloadType): void diff --git a/packages/core/package.json b/packages/core/package.json index fd0cac3b..cb900180 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -26,6 +26,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { + "@lokalise/node-core": "^8.3.0", "toad-cache": "^3.4.1" }, "devDependencies": { @@ -40,8 +41,8 @@ "eslint-plugin-prettier": "^5.0.1", "prettier": "^3.1.0", "typescript": "^5.3.2", - "vitest": "^0.34.6", - "vite": "4.5.1" + "vitest": "0.34.6", + "vite": "4.5.0" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/core/test/queues/HandlerSpy.spec.ts b/packages/core/test/queues/HandlerSpy.spec.ts index 816ab286..4e782d05 100644 --- a/packages/core/test/queues/HandlerSpy.spec.ts +++ b/packages/core/test/queues/HandlerSpy.spec.ts @@ -1,4 +1,6 @@ -import { HandlerSpy } from '../../lib/queues/HandlerSpy' +import { expect } from 'vitest' + +import { HandlerSpy, isHandlerSpy } from '../../lib/queues/HandlerSpy' type Message = { id: string @@ -11,7 +13,7 @@ const TEST_MESSAGE: Message = { } const TEST_MESSAGE_2: Message = { - id: 'abc', + id: 'abcd', status: 'inprogress', } @@ -21,7 +23,7 @@ describe('HandlerSpy', () => { const spy = new HandlerSpy() spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', message: TEST_MESSAGE_2, }) @@ -32,21 +34,21 @@ describe('HandlerSpy', () => { }) }) - describe('waitForEvent', () => { + describe('waitForMessage', () => { it('Finds previously consumed event', async () => { const spy = new HandlerSpy() spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', message: TEST_MESSAGE_2, }) spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', message: TEST_MESSAGE, }) - const message = await spy.waitForEvent({ + const message = await spy.waitForMessage({ status: 'done', }) @@ -62,18 +64,18 @@ describe('HandlerSpy', () => { }) spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', message: TEST_MESSAGE, }) - const message = await spy.waitForEvent( + const message = await spy.waitForMessage( { status: 'done', }, - 'success', + 'consumed', ) - const message2 = await spy.waitForEvent( + const message2 = await spy.waitForMessage( { status: 'done', }, @@ -84,20 +86,42 @@ describe('HandlerSpy', () => { expect(message2.message).toEqual(TEST_MESSAGE) }) - it('Waits for an event to be consumed', async () => { + it('Waits for an message to be consumed', async () => { const spy = new HandlerSpy() spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', message: TEST_MESSAGE_2, }) - const spyPromise = spy.waitForEvent({ + const spyPromise = spy.waitForMessage({ status: 'done', }) spy.addProcessedMessage({ - processingResult: 'success', + processingResult: 'consumed', + message: TEST_MESSAGE, + }) + + const message = await spyPromise + + expect(message.message).toEqual(TEST_MESSAGE) + }) + }) + + describe('waitForMessageById', () => { + it('Waits for an message to be consumed by id', async () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage({ + processingResult: 'consumed', + message: TEST_MESSAGE_2, + }) + + const spyPromise = spy.waitForMessageWithId(TEST_MESSAGE.id) + + spy.addProcessedMessage({ + processingResult: 'consumed', message: TEST_MESSAGE, }) @@ -105,5 +129,39 @@ describe('HandlerSpy', () => { expect(message.message).toEqual(TEST_MESSAGE) }) + + it('Waits for an invalid message to be rejected by id', async () => { + const spy = new HandlerSpy() + + spy.addProcessedMessage( + { + processingResult: 'invalid_message', + message: null, + }, + 'abc', + ) + + const messageResult = await spy.waitForMessageWithId('abc') + + expect(messageResult.message).toEqual({ + id: 'abc', + }) + expect(messageResult.processingResult).toBe('invalid_message') + }) + }) + + describe('isHandlerSpy', () => { + it('HandlerSpy returns true', async () => { + const spy = new HandlerSpy() + + expect(isHandlerSpy(spy)).toBe(true) + }) + + it('Not a HandlerSpy returns false', async () => { + expect(isHandlerSpy({})).toBe(false) + expect(isHandlerSpy('abc')).toBe(false) + expect(isHandlerSpy(null)).toBe(false) + expect(isHandlerSpy(undefined)).toBe(false) + }) }) }) diff --git a/packages/sns/package.json b/packages/sns/package.json index 0b09fa0f..48cde241 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -44,7 +44,7 @@ "@types/node": "^20.10.3", "@typescript-eslint/eslint-plugin": "^6.13.2", "@typescript-eslint/parser": "^6.13.2", - "@vitest/coverage-v8": "^0.34.6", + "@vitest/coverage-v8": "0.34.6", "awilix": "^9.0.0", "awilix-manager": "^4.0.0", "del-cli": "^5.1.0", @@ -55,7 +55,7 @@ "eslint-plugin-vitest": "^0.3.10", "prettier": "^3.1.0", "typescript": "^5.3.2", - "vitest": "^0.34.6", + "vitest": "0.34.6", "vite": "4.5.0" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index b66c90f6..ddd5c54a 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -100,6 +100,32 @@ export abstract class AbstractSqsConsumer< barrierOutput: BarrierOutput, ): Promise> + private tryToExtractId(message: SQSMessage): Either<'abort', string> { + if (message === null) { + return ABORT_EARLY_EITHER + } + + const resolveMessageResult = this.resolveMessage(message) + if (isMessageError(resolveMessageResult.error)) { + this.handleError(resolveMessageResult.error) + return ABORT_EARLY_EITHER + } + // Empty content for whatever reason + if (!resolveMessageResult.result) { + return ABORT_EARLY_EITHER + } + + // @ts-ignore + if (this.messageIdField in resolveMessageResult.result) { + return { + // @ts-ignore + result: resolveMessageResult.result[this.messageIdField], + } + } + + return ABORT_EARLY_EITHER + } + private deserializeMessage(message: SQSMessage): Either<'abort', MessagePayloadType> { if (message === null) { return ABORT_EARLY_EITHER @@ -163,6 +189,9 @@ export abstract class AbstractSqsConsumer< const deserializedMessage = this.deserializeMessage(message) if (deserializedMessage.error === 'abort') { await this.failProcessing(message) + const messageId = this.tryToExtractId(message) + + this.handleMessageProcessed(null, 'invalid_message', messageId.result) return } // @ts-ignore @@ -192,7 +221,18 @@ export abstract class AbstractSqsConsumer< this.transactionObservabilityManager?.stop(transactionSpanId) }) - return result.result ? message : Promise.reject(result) + // success + if (result.result) { + this.handleMessageProcessed(deserializedMessage.result, 'consumed') + return message + } + + // failure + this.handleMessageProcessed( + deserializedMessage.result, + result.error === 'retryLater' ? 'retryLater' : 'error', + ) + return Promise.reject(result.error) }, sqs: this.sqsClient, ...this.consumerOptionsOverride, diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts b/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts index 1a32069e..2b050e94 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisherMonoSchema.ts @@ -58,6 +58,7 @@ export abstract class AbstractSqsPublisherMonoSchema { it('deserializes valid JSON', () => { const messagePayload: PERMISSIONS_MESSAGE_TYPE = { + id: '1', messageType: 'add', userIds: [1], permissions: ['perm'], diff --git a/packages/sqs/package.json b/packages/sqs/package.json index e790cf96..87830002 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -40,7 +40,7 @@ "@types/node": "^20.10.3", "@typescript-eslint/eslint-plugin": "^6.13.2", "@typescript-eslint/parser": "^6.13.2", - "@vitest/coverage-v8": "^0.34.6", + "@vitest/coverage-v8": "0.34.6", "awilix": "^9.0.0", "awilix-manager": "^4.0.0", "del-cli": "^5.1.0", @@ -51,7 +51,7 @@ "eslint-plugin-vitest": "^0.3.10", "prettier": "^3.1.0", "typescript": "^5.3.2", - "vitest": "^0.34.6", + "vitest": "0.34.6", "vite": "4.5.0" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", diff --git a/packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts b/packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts index 1e58cc12..065c0f5b 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts @@ -34,6 +34,7 @@ export class SqsPermissionConsumerMonoSchema extends AbstractSqsConsumerMonoSche super(dependencies, { messageSchema: PERMISSIONS_MESSAGE_SCHEMA, messageTypeField: 'messageType', + handlerSpy: true, deletionConfig: { deleteIfExists: true, }, diff --git a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts index 35cab3e4..56c6f24b 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts @@ -62,6 +62,7 @@ export class SqsPermissionConsumerMultiSchema extends AbstractSqsConsumerMultiSc dependencies, { messageTypeField: 'messageType', + handlerSpy: true, deletionConfig: { deleteIfExists: true, }, diff --git a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.errors.spec.ts b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.errors.spec.ts index 7bef858b..b510db0f 100644 --- a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.errors.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.errors.spec.ts @@ -12,12 +12,15 @@ import { userPermissionMap } from '../repositories/PermissionRepository' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { SqsPermissionConsumerMonoSchema } from './SqsPermissionConsumerMonoSchema' + const perms: [string, ...string[]] = ['perm1', 'perm2'] describe('SqsPermissionsConsumerMonoSchema', () => { describe('error handling', () => { let diContainer: AwilixContainer let publisher: SqsPermissionPublisherMonoSchema + let consumer: SqsPermissionConsumerMonoSchema let sqsClient: SQSClient beforeEach(async () => { @@ -26,6 +29,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { }) sqsClient = diContainer.cradle.sqsClient publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer delete userPermissionMap[100] delete userPermissionMap[200] @@ -53,14 +57,14 @@ describe('SqsPermissionsConsumerMonoSchema', () => { // @ts-ignore publisher['messageSchema'] = z.any() await publisher.publish({ + id: 'abc', messageType: 'add', permissions: perms, } as any) const fakeResolver = consumerErrorResolver as FakeConsumerErrorResolver - await waitAndRetry(() => { - return fakeResolver.handleErrorCallsCount > 0 - }) + const messageResult = await consumer.handlerSpy.waitForMessageWithId('abc') + expect(messageResult.processingResult).toBe('invalid_message') expect(fakeResolver.handleErrorCallsCount).toBe(1) expect(fakeResolver.errors[0].message).toContain('"received": "undefined"') diff --git a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts index 8f0f5d8c..2eb4e633 100644 --- a/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts @@ -1,6 +1,5 @@ import type { SQSClient } from '@aws-sdk/client-sqs' import { ReceiveMessageCommand } from '@aws-sdk/client-sqs' -import { waitAndRetry } from '@message-queue-toolkit/core' import type { AwilixContainer } from 'awilix' import { asClass } from 'awilix' import { describe, beforeEach, afterEach, expect, it, beforeAll } from 'vitest' @@ -17,26 +16,24 @@ import { SqsPermissionConsumerMonoSchema } from './SqsPermissionConsumerMonoSche const userIds = [100, 200, 300] const perms: [string, ...string[]] = ['perm1', 'perm2'] -async function waitForPermissions(userIds: number[]) { - return await waitAndRetry(async () => { - const usersPerms = userIds.reduce((acc, userId) => { - if (userPermissionMap[userId]) { - acc.push(userPermissionMap[userId]) - } - return acc - }, [] as string[][]) +async function retrievePermissions(userIds: number[]) { + const usersPerms = userIds.reduce((acc, userId) => { + if (userPermissionMap[userId]) { + acc.push(userPermissionMap[userId]) + } + return acc + }, [] as string[][]) + + if (usersPerms && usersPerms.length !== userIds.length) { + return null + } - if (usersPerms && usersPerms.length !== userIds.length) { + for (const userPerms of usersPerms) + if (userPerms.length !== perms.length) { return null } - for (const userPerms of usersPerms) - if (userPerms.length !== perms.length) { - return null - } - - return usersPerms - }) + return usersPerms } describe('SqsPermissionsConsumerMonoSchema', () => { @@ -85,6 +82,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { let diContainer: AwilixContainer let publisher: SqsPermissionPublisherMonoSchema let sqsClient: SQSClient + let consumer: SqsPermissionConsumerMonoSchema beforeEach(async () => { diContainer = await registerDependencies({ @@ -92,6 +90,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => { }) sqsClient = diContainer.cradle.sqsClient publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer delete userPermissionMap[100] delete userPermissionMap[200] @@ -123,12 +122,14 @@ describe('SqsPermissionsConsumerMonoSchema', () => { userPermissionMap[300] = [] await publisher.publish({ + id: 'abcd', messageType: 'add', userIds, permissions: perms, }) - const updatedUsersPermissions = await waitForPermissions(userIds) + await consumer.handlerSpy.waitForMessageWithId('abcd', 'consumed') + const updatedUsersPermissions = await retrievePermissions(userIds) if (null === updatedUsersPermissions) { throw new Error('Users permissions unexpectedly null') @@ -143,20 +144,24 @@ describe('SqsPermissionsConsumerMonoSchema', () => { expect(users).toHaveLength(0) await publisher.publish({ + id: '123', messageType: 'add', userIds, permissions: perms, }) // no users in the database, so message will go back to the queue - const usersFromDb = await waitForPermissions(userIds) + await consumer.handlerSpy.waitForMessageWithId('123', 'retryLater') + + const usersFromDb = await retrievePermissions(userIds) expect(usersFromDb).toBeNull() userPermissionMap[100] = [] userPermissionMap[200] = [] userPermissionMap[300] = [] - const usersPermissions = await waitForPermissions(userIds) + await consumer.handlerSpy.waitForMessageWithId('123', 'consumed') + const usersPermissions = await retrievePermissions(userIds) if (null === usersPermissions) { throw new Error('Users permissions unexpectedly null') @@ -173,19 +178,23 @@ describe('SqsPermissionsConsumerMonoSchema', () => { userPermissionMap[100] = [] await publisher.publish({ + id: 'abc', messageType: 'add', userIds, permissions: perms, }) // not all users are in the database, so message will go back to the queue - const usersFromDb = await waitForPermissions(userIds) + await consumer.handlerSpy.waitForMessageWithId('abc', 'retryLater') + + const usersFromDb = await retrievePermissions(userIds) expect(usersFromDb).toBeNull() userPermissionMap[200] = [] userPermissionMap[300] = [] - const usersPermissions = await waitForPermissions(userIds) + await consumer.handlerSpy.waitForMessageWithId('abc', 'consumed') + const usersPermissions = await retrievePermissions(userIds) if (null === usersPermissions) { throw new Error('Users permissions unexpectedly null') diff --git a/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts b/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts index 46c2386d..1c678984 100644 --- a/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionsConsumerMultiSchema.spec.ts @@ -1,7 +1,6 @@ import type { SQSClient } from '@aws-sdk/client-sqs' import { ReceiveMessageCommand } from '@aws-sdk/client-sqs' import type { BarrierResult } from '@message-queue-toolkit/core' -import { waitAndRetry } from '@message-queue-toolkit/core' import type { AwilixContainer } from 'awilix' import { asClass, asFunction } from 'awilix' import { describe, beforeEach, afterEach, expect, it } from 'vitest' @@ -89,12 +88,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '1', messageType: 'add', }) - await waitAndRetry(() => { - return logger.loggedMessages.length === 1 - }) + await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(logger.loggedMessages.length).toBe(1) await newConsumer.close() @@ -137,12 +135,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '2', messageType: 'add', }) - await waitAndRetry(() => { - return newConsumer.addCounter === 1 - }) + await newConsumer.handlerSpy.waitForMessageWithId('2', 'consumed') expect(newConsumer.addCounter).toBe(1) expect(barrierCounter).toBe(2) @@ -168,12 +165,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => { await newConsumer.start() await publisher.publish({ + id: '3', messageType: 'add', }) - await waitAndRetry(() => { - return newConsumer.addCounter > 0 - }) + await newConsumer.handlerSpy.waitForMessageWithId('3', 'consumed') expect(newConsumer.addCounter).toBe(1) expect(barrierCounter).toBe(2) @@ -213,18 +209,21 @@ describe('SqsPermissionsConsumerMultiSchema', () => { describe('happy path', () => { it('Processes messages', async () => { await publisher.publish({ + id: '10', messageType: 'add', }) await publisher.publish({ + id: '20', messageType: 'remove', }) await publisher.publish({ + id: '30', messageType: 'remove', }) - await waitAndRetry(() => { - return consumer.addCounter === 1 && consumer.removeCounter === 2 - }) + await consumer.handlerSpy.waitForMessageWithId('10', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('20', 'consumed') + await consumer.handlerSpy.waitForMessageWithId('30', 'consumed') expect(consumer.addCounter).toBe(1) expect(consumer.removeCounter).toBe(2) diff --git a/packages/sqs/test/consumers/userConsumerSchemas.ts b/packages/sqs/test/consumers/userConsumerSchemas.ts index 6c810490..163901b4 100644 --- a/packages/sqs/test/consumers/userConsumerSchemas.ts +++ b/packages/sqs/test/consumers/userConsumerSchemas.ts @@ -1,16 +1,19 @@ import z from 'zod' export const PERMISSIONS_MESSAGE_SCHEMA = z.object({ + id: z.string(), messageType: z.enum(['add', 'remove']), userIds: z.array(z.number()).describe('User IDs'), permissions: z.array(z.string()).nonempty().describe('List of user permissions'), }) export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({ + id: z.string(), messageType: z.literal('add'), }) export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({ + id: z.string(), messageType: z.literal('remove'), }) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts index 99b2c128..f659f718 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.spec.ts @@ -46,6 +46,7 @@ describe('SqsPermissionPublisher', () => { describe('logging', () => { it('logs a message when logging is enabled', async () => { const message = { + id: '1', userIds, messageType: 'add', permissions: perms, @@ -53,9 +54,7 @@ describe('SqsPermissionPublisher', () => { await publisher.publish(message) - await waitAndRetry(() => { - return logger.loggedMessages.length === 1 - }) + await publisher.handlerSpy.waitForMessageWithId('1') expect(logger.loggedMessages.length).toBe(1) }) @@ -76,6 +75,7 @@ describe('SqsPermissionPublisher', () => { const { permissionPublisher } = diContainer.cradle const message = { + id: '2', userIds, messageType: 'add', permissions: perms, @@ -108,6 +108,7 @@ describe('SqsPermissionPublisher', () => { }) expect(receivedMessage).toEqual({ + id: '2', messageType: 'add', permissions: ['perm1', 'perm2'], userIds: [100, 200, 300], diff --git a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts index b56fbe6a..40684b4a 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts @@ -13,6 +13,7 @@ export class SqsPermissionPublisherMonoSchema extends AbstractSqsPublisherMonoSc QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME, }, }, + handlerSpy: true, deletionConfig: { deleteIfExists: false, },