Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support synchronous check for whether the message was published #184

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,14 @@ await myPublisher.handlerSpy.waitForMessage({
}, 'published')
```

In case you do not want to await for message to be published, but want to merely check whether or not it was published by this moment, you can use `checkForMessage` method:

```ts
const notEmittedMessage = myPublisher.handlerSpy.checkForMessage({
type: 'entity.created',
}) // this will resolve to undefined if such message wasn't published up to this moment
```

You can also check details of the message processing outcome:

```ts
Expand Down
30 changes: 27 additions & 3 deletions packages/core/lib/events/DomainEventEmitter.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { randomUUID } from 'node:crypto'

import { waitAndRetry } from '@lokalise/node-core'
import type { CommonEventDefinitionPublisherSchemaType } from '@message-queue-toolkit/schemas'
import type {
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageSchema,
} from '@message-queue-toolkit/schemas'
import type { AwilixContainer } from 'awilix'
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
import type { z } from 'zod'

import type { Dependencies } from '../../test/testContext'
import { TestEvents, registerDependencies } from '../../test/testContext'
Expand Down Expand Up @@ -67,7 +69,7 @@ describe('AutopilotEventEmitter', () => {
const emittedEvent = await eventEmitter.emit(TestEvents.created, createdEventPayload)

const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId<
z.infer<typeof TestEvents.created.consumerSchema>
ConsumerMessageSchema<typeof TestEvents.created>
>(emittedEvent.id)

expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value)
Expand All @@ -80,6 +82,28 @@ describe('AutopilotEventEmitter', () => {
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
})

it('can check spy for messages not being sent', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
eventEmitter.onAny(fakeListener)

await eventEmitter.emit(TestEvents.created, createdEventPayload)

const notEmittedEvent = eventEmitter.handlerSpy.checkForMessage<
ConsumerMessageSchema<typeof TestEvents.updated>
>({
type: 'entity.updated',
})
const emittedEvent = eventEmitter.handlerSpy.checkForMessage<
ConsumerMessageSchema<typeof TestEvents.created>
>({
type: 'entity.created',
})

expect(notEmittedEvent).toBeUndefined()
expect(emittedEvent).toBeDefined()
})

it('emits event to anyListener with metadata', async () => {
const { eventEmitter } = diContainer.cradle
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
Expand Down
51 changes: 25 additions & 26 deletions packages/core/lib/queues/HandlerSpy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,14 @@ export type SpyResultOutput<MessagePayloadSchemas extends object> = {
processingResult: MessageProcessingResult
}

type SpyResultCacheEntry<MessagePayloadSchemas extends object> = {
value: SpyResultInput<MessagePayloadSchemas>
}

type SpyPromiseMetadata<MessagePayloadSchemas extends object> = {
fields: DeepPartial<MessagePayloadSchemas>
processingResult?: MessageProcessingResult
promise: Promise<SpyResultInput<MessagePayloadSchemas>>
promise: Promise<SpyResultOutput<MessagePayloadSchemas>>
resolve: (
value:
| SpyResultInput<MessagePayloadSchemas>
| PromiseLike<SpyResultInput<MessagePayloadSchemas>>,
| PromiseLike<SpyResultOutput<MessagePayloadSchemas>>,
) => void
}

Expand Down Expand Up @@ -79,13 +75,12 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
this.spyPromises = []
}

private messageMatchesFilter(
spyResult: SpyResultInput<object>,
private messageMatchesFilter<T extends object>(
spyResult: SpyResultOutput<T>,
fields: DeepPartial<MessagePayloadSchemas>,
processingResult?: MessageProcessingResult,
) {
): boolean {
return (
// @ts-ignore
objectMatches(fields, spyResult.message) &&
(!processingResult || spyResult.processingResult === processingResult)
)
Expand All @@ -104,29 +99,33 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
)
}

checkForMessage<T extends MessagePayloadSchemas>(
expectedFields: DeepPartial<T>,
expectedProcessingResult?: MessageProcessingResult,
): SpyResultOutput<T> | undefined {
return Object.values(this.messageBuffer.items).find((spyResult) => {
return this.messageMatchesFilter(spyResult.value, expectedFields, expectedProcessingResult)
})?.value
}

waitForMessage<T extends MessagePayloadSchemas>(
fields: DeepPartial<T>,
processingResult?: MessageProcessingResult,
expectedFields: DeepPartial<T>,
expectedProcessingResult?: MessageProcessingResult,
): Promise<SpyResultOutput<T>> {
const processedMessageEntry = Object.values(this.messageBuffer.items).find(
// @ts-ignore
(spyResult: SpyResultCacheEntry<T>) => {
return this.messageMatchesFilter(spyResult.value, fields, processingResult)
},
)
const processedMessageEntry = this.checkForMessage(expectedFields, expectedProcessingResult)
if (processedMessageEntry) {
return Promise.resolve(processedMessageEntry.value)
return Promise.resolve(processedMessageEntry)
}

let resolve: (value: SpyResultInput<T> | PromiseLike<SpyResultInput<T>>) => void
const spyPromise = new Promise<SpyResultInput<T>>((_resolve) => {
let resolve: (value: SpyResultOutput<T> | PromiseLike<SpyResultOutput<T>>) => void
const spyPromise = new Promise<SpyResultOutput<T>>((_resolve) => {
resolve = _resolve
})

this.spyPromises.push({
promise: spyPromise,
processingResult,
fields,
processingResult: expectedProcessingResult,
fields: expectedFields,
// @ts-ignore
resolve,
})
Expand All @@ -148,14 +147,14 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {

// If we failed to parse message, let's store id and type at least
const resolvedProcessingResult = processingResult.message
? processingResult
: {
? (processingResult as SpyResultOutput<MessagePayloadSchemas>)
: ({
...processingResult,
message: {
[this.messageIdField]: messageId,
[this.messageTypeField]: resolvedMessageType,
},
}
} as SpyResultOutput<MessagePayloadSchemas>)

// @ts-ignore
const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1)
Expand Down
5 changes: 2 additions & 3 deletions packages/schemas/lib/utils/messageTypeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ export type PublisherMessageSchema<MessageDefinitionType extends CommonEventDefi
/**
* Resolves schema of all possible publisher messages for a given list of event definitions
*/
export type allPublisherMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> = z.infer<
MessageDefinitionTypes[number]['publisherSchema']
>
export type allPublisherMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> =
z.infer<MessageDefinitionTypes[number]['publisherSchema']>
2 changes: 1 addition & 1 deletion packages/sns/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^10.0.0",
"@lokalise/node-core": "^10.0.1",
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
"sqs-consumer": "^10.3.0",
"zod": "^3.23.8"
},
Expand Down
Loading