From 850ed3ce5b57dc1bc5f27ac843640b3fbe479a87 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 4 Jul 2024 19:14:14 +0300 Subject: [PATCH] Support synchronous check for whether the message was published --- README.md | 8 +++ .../lib/events/DomainEventEmitter.spec.ts | 30 +++++++++-- packages/core/lib/queues/HandlerSpy.ts | 51 +++++++++---------- .../schemas/lib/utils/messageTypeUtils.ts | 5 +- packages/sns/package.json | 2 +- 5 files changed, 63 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index b3be469d..fd82b538 100644 --- a/README.md +++ b/README.md @@ -269,6 +269,14 @@ await myPublisher.handlerSpy.waitForMessage({ }, 'published') ``` +In case you do not want to await for message to be published, but want to merely check whether or not it was published by this moment, you can use `checkForMessage` method: + +```ts +const notEmittedMessage = myPublisher.handlerSpy.checkForMessage({ + type: 'entity.created', +}) // this will resolve to undefined if such message wasn't published up to this moment +``` + You can also check details of the message processing outcome: ```ts diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index e1b029db..48a19c88 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -1,10 +1,12 @@ import { randomUUID } from 'node:crypto' import { waitAndRetry } from '@lokalise/node-core' -import type { CommonEventDefinitionPublisherSchemaType } from '@message-queue-toolkit/schemas' +import type { + CommonEventDefinitionPublisherSchemaType, + ConsumerMessageSchema, +} from '@message-queue-toolkit/schemas' import type { AwilixContainer } from 'awilix' import { afterEach, beforeEach, describe, expect, it } from 'vitest' -import type { z } from 'zod' import type { Dependencies } from '../../test/testContext' import { TestEvents, registerDependencies } from '../../test/testContext' @@ -67,7 +69,7 @@ describe('AutopilotEventEmitter', () => { const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload) const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId< - z.infer + ConsumerMessageSchema >(emittedEvent.id) expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value) @@ -80,6 +82,28 @@ describe('AutopilotEventEmitter', () => { expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload) }) + it('can check spy for messages not being sent', async () => { + const { eventEmitter } = diContainer.cradle + const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) + eventEmitter.onAny(fakeListener) + + await eventEmitter.emit(TestEvents.created, createdEventPayload) + + const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage< + ConsumerMessageSchema + >({ + type: 'entity.updated', + }) + const emittedEvent = eventEmitter.handlerSpy.checkForMessage< + ConsumerMessageSchema + >({ + type: 'entity.created', + }) + + expect(notEmittedEvent).toBeUndefined() + expect(emittedEvent).toBeDefined() + }) + it('emits event to anyListener with metadata', async () => { const { eventEmitter } = diContainer.cradle const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents) diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts index 0b4733d6..2fcf5ce9 100644 --- a/packages/core/lib/queues/HandlerSpy.ts +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -22,18 +22,14 @@ export type SpyResultOutput = { processingResult: MessageProcessingResult } -type SpyResultCacheEntry = { - value: SpyResultInput -} - type SpyPromiseMetadata = { fields: DeepPartial processingResult?: MessageProcessingResult - promise: Promise> + promise: Promise> resolve: ( value: | SpyResultInput - | PromiseLike>, + | PromiseLike>, ) => void } @@ -79,13 +75,12 @@ export class HandlerSpy { this.spyPromises = [] } - private messageMatchesFilter( - spyResult: SpyResultInput, + private messageMatchesFilter( + spyResult: SpyResultOutput, fields: DeepPartial, processingResult?: MessageProcessingResult, - ) { + ): boolean { return ( - // @ts-ignore objectMatches(fields, spyResult.message) && (!processingResult || spyResult.processingResult === processingResult) ) @@ -104,29 +99,33 @@ export class HandlerSpy { ) } + checkForMessage( + expectedFields: DeepPartial, + expectedProcessingResult?: MessageProcessingResult, + ): SpyResultOutput | undefined { + return Object.values(this.messageBuffer.items).find((spyResult) => { + return this.messageMatchesFilter(spyResult.value, expectedFields, expectedProcessingResult) + })?.value + } + waitForMessage( - fields: DeepPartial, - processingResult?: MessageProcessingResult, + expectedFields: DeepPartial, + expectedProcessingResult?: MessageProcessingResult, ): Promise> { - const processedMessageEntry = Object.values(this.messageBuffer.items).find( - // @ts-ignore - (spyResult: SpyResultCacheEntry) => { - return this.messageMatchesFilter(spyResult.value, fields, processingResult) - }, - ) + const processedMessageEntry = this.checkForMessage(expectedFields, expectedProcessingResult) if (processedMessageEntry) { - return Promise.resolve(processedMessageEntry.value) + return Promise.resolve(processedMessageEntry) } - let resolve: (value: SpyResultInput | PromiseLike>) => void - const spyPromise = new Promise>((_resolve) => { + let resolve: (value: SpyResultOutput | PromiseLike>) => void + const spyPromise = new Promise>((_resolve) => { resolve = _resolve }) this.spyPromises.push({ promise: spyPromise, - processingResult, - fields, + processingResult: expectedProcessingResult, + fields: expectedFields, // @ts-ignore resolve, }) @@ -148,14 +147,14 @@ export class HandlerSpy { // If we failed to parse message, let's store id and type at least const resolvedProcessingResult = processingResult.message - ? processingResult - : { + ? (processingResult as SpyResultOutput) + : ({ ...processingResult, message: { [this.messageIdField]: messageId, [this.messageTypeField]: resolvedMessageType, }, - } + } as SpyResultOutput) // @ts-ignore const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1) diff --git a/packages/schemas/lib/utils/messageTypeUtils.ts b/packages/schemas/lib/utils/messageTypeUtils.ts index 1c3e9100..12bebd4c 100644 --- a/packages/schemas/lib/utils/messageTypeUtils.ts +++ b/packages/schemas/lib/utils/messageTypeUtils.ts @@ -19,6 +19,5 @@ export type PublisherMessageSchema = z.infer< - MessageDefinitionTypes[number]['publisherSchema'] -> +export type allPublisherMessageSchemas = + z.infer diff --git a/packages/sns/package.json b/packages/sns/package.json index 00987c6c..5b5b5832 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^10.0.0", + "@lokalise/node-core": "^10.0.1", "sqs-consumer": "^10.3.0", "zod": "^3.23.8" },