Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for handler spies in SQS #61

Merged
merged 6 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,59 @@ Both publishers and consumers accept a queue name and configuration as parameter

If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.

## Handler spies

In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly.

In order to enable this functionality, configure spyHandler on the publisher or consumer:

```ts
export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
SupportedMessages,
ExecutionContext
> {
constructor(
dependencies: SQSConsumerDependencies,
userService: UserService,
) {
super(dependencies, {
//
// rest of configuration skipped
//
handlerSpy: {
bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100
messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id`
},
}
}
}
```

Then you can use handler spies in your tests or production code to await certain events:

```ts
await myConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

await myConsumer.handlerSpy.waitForMessageWithId('1')

await myConsumer.handlerSpy.waitForMessageWithId('1')

await myConsumer.handlerSpy.waitForMessage({
projectId: 1,
})

await myPublisher.handlerSpy.waitForMessage({
userId: 1
}, 'consumed')
```

You can also check details of the message processing outcome:

```ts
const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
expect(result.processingResult).toEqual('consumed')
```

## Automatic Reconnects (RabbitMQ)

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"@types/node": "^20.10.3",
"@typescript-eslint/eslint-plugin": "^6.13.2",
"@typescript-eslint/parser": "^6.13.2",
"@vitest/coverage-v8": "^0.34.6",
"@vitest/coverage-v8": "0.34.6",
"amqplib": "^0.10.3",
"awilix": "^9.0.0",
"awilix-manager": "^4.0.0",
Expand All @@ -51,7 +51,7 @@
"eslint-plugin-vitest": "^0.3.10",
"prettier": "^3.1.0",
"typescript": "^5.3.2",
"vitest": "^0.34.6",
"vitest": "0.34.6",
"vite": "4.5.0"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
Expand Down
2 changes: 2 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export {
BarrierResultNegative,
} from './lib/queues/HandlerContainer'
export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer'
export { HandlerSpy } from './lib/queues/HandlerSpy'
export type { SpyResult, HandlerSpyParams, PublicHandlerSpy } from './lib/queues/HandlerSpy'

export { MessageSchemaContainer } from './lib/queues/MessageSchemaContainer'
export type { MessageSchemaContainerOptions } from './lib/queues/MessageSchemaContainer'
Expand Down
42 changes: 39 additions & 3 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import { resolveGlobalErrorLogObject } from '@lokalise/node-core'
import type { ZodSchema, ZodType } from 'zod'

import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
import type { Logger, TransactionObservabilityManager } from '../types/MessageQueueTypes'
import type {
Logger,
TransactionObservabilityManager,
MessageProcessingResult,
} from '../types/MessageQueueTypes'

import type { MessageHandlerConfig } from './HandlerContainer'
import type { HandlerSpy, PublicHandlerSpy, HandlerSpyParams } from './HandlerSpy'
import { resolveHandlerSpy } from './HandlerSpy'

export type QueueDependencies = {
errorReporter: ErrorReporter
Expand Down Expand Up @@ -46,18 +52,19 @@ export type DeletionConfig = {
}

export type CommonQueueOptions = {
messageTypeField: string
messageIdField?: string
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
logMessages?: boolean
}

export type NewQueueOptions<CreationConfigType extends object> = {
messageTypeField: string
locatorConfig?: never
deletionConfig?: DeletionConfig
creationConfig: CreationConfigType
} & CommonQueueOptions

export type ExistingQueueOptions<QueueLocatorType extends object> = {
messageTypeField: string
locatorConfig: QueueLocatorType
deletionConfig?: DeletionConfig
creationConfig?: never
Expand Down Expand Up @@ -94,21 +101,34 @@ export abstract class AbstractQueueService<
protected readonly errorReporter: ErrorReporter
public readonly logger: Logger
protected readonly messageTypeField: string
protected readonly messageIdField: string
protected readonly logMessages: boolean
protected readonly creationConfig?: QueueConfiguration
protected readonly locatorConfig?: QueueLocatorType
protected readonly deletionConfig?: DeletionConfig
protected readonly _handlerSpy?: HandlerSpy<MessagePayloadSchemas>

get handlerSpy(): PublicHandlerSpy<MessagePayloadSchemas> {
if (!this._handlerSpy) {
throw new Error(
'HandlerSpy was not instantiated, please pass `handlerSpy` parameter during queue service creation.',
)
}
return this._handlerSpy
}

constructor({ errorReporter, logger }: DependenciesType, options: OptionsType) {
this.errorReporter = errorReporter
this.logger = logger

this.messageTypeField = options.messageTypeField
this.messageIdField = options.messageIdField ?? 'id'
this.creationConfig = options.creationConfig
this.locatorConfig = options.locatorConfig
this.deletionConfig = options.deletionConfig

this.logMessages = options.logMessages ?? false
this._handlerSpy = resolveHandlerSpy<MessagePayloadSchemas>(options)
}

protected abstract resolveSchema(
Expand Down Expand Up @@ -148,5 +168,21 @@ export abstract class AbstractQueueService<
}
}

protected handleMessageProcessed(
message: MessagePayloadSchemas | null,
processingResult: MessageProcessingResult,
messageId?: string,
) {
if (this._handlerSpy) {
this._handlerSpy.addProcessedMessage(
{
message,
processingResult,
},
messageId,
)
}
}

public abstract close(): Promise<unknown>
}
77 changes: 66 additions & 11 deletions packages/core/lib/queues/HandlerSpy.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { randomUUID } from 'node:crypto'

import { isObject } from '@lokalise/node-core'
import { Fifo } from 'toad-cache'

import type { MessageProcessingResult } from '../types/MessageQueueTypes'

import type { CommonQueueOptions } from './AbstractQueueService'

export type HandlerSpyParams = {
bufferSize?: number
messageIdField?: string
}

export type SpyResult<MessagePayloadSchemas extends object> = {
message: MessagePayloadSchemas
message: MessagePayloadSchemas | null
processingResult: MessageProcessingResult
}

Expand All @@ -25,19 +30,34 @@ type SpyPromiseMetadata<MessagePayloadSchemas extends object> = {
) => void
}

export function isHandlerSpy<T extends object>(value: unknown): value is HandlerSpy<T> {
return (
isObject(value) &&
(value instanceof HandlerSpy || (value as unknown as HandlerSpy<object>).name === 'HandlerSpy')
)
}

export type PublicHandlerSpy<MessagePayloadSchemas extends object> = Omit<
HandlerSpy<MessagePayloadSchemas>,
'addProcessedMessage'
>

export class HandlerSpy<MessagePayloadSchemas extends object> {
private readonly messageBuffer: Fifo<SpyResult<MessagePayloadSchemas>>
private readonly messageIdField: string
public name = 'HandlerSpy'
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private readonly messageBuffer: Fifo<SpyResult<any>>
private readonly messageIdField: keyof MessagePayloadSchemas
private readonly spyPromises: SpyPromiseMetadata<MessagePayloadSchemas>[]

constructor(params: HandlerSpyParams = {}) {
this.messageBuffer = new Fifo(params.bufferSize ?? 100)
// @ts-ignore
this.messageIdField = params.messageIdField ?? 'id'
this.spyPromises = []
}

private messageMatchesFilter(
spyResult: SpyResult<MessagePayloadSchemas>,
spyResult: SpyResult<object>,
fields: Partial<MessagePayloadSchemas>,
processingResult?: MessageProcessingResult,
) {
Expand All @@ -50,7 +70,17 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
)
}

waitForEvent(
waitForMessageWithId(id: string, processingResult?: MessageProcessingResult) {
return this.waitForMessage(
// @ts-ignore
{
[this.messageIdField]: id,
},
processingResult,
)
}

waitForMessage(
fields: Partial<MessagePayloadSchemas>,
processingResult?: MessageProcessingResult,
): Promise<SpyResult<MessagePayloadSchemas>> {
Expand Down Expand Up @@ -86,18 +116,29 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
this.messageBuffer.clear()
}

addProcessedMessage(processingResult: SpyResult<MessagePayloadSchemas>) {
addProcessedMessage(processingResult: SpyResult<MessagePayloadSchemas>, messageId?: string) {
const resolvedMessageId =
processingResult.message?.[this.messageIdField] ?? messageId ?? randomUUID()

// If we failed to parse message, let's store id at least
const resolvedProcessingResult = processingResult.message
? processingResult
: {
...processingResult,
message: {
[this.messageIdField]: messageId,
},
}

// @ts-ignore
const cacheId = `${processingResult.message[this.messageIdField]}-${Date.now()}-${(
Math.random() + 1
)
const cacheId = `${resolvedMessageId}-${Date.now()}-${(Math.random() + 1)
.toString(36)
.substring(7)}`
this.messageBuffer.set(cacheId, processingResult)
this.messageBuffer.set(cacheId, resolvedProcessingResult)

const foundPromise = this.spyPromises.find((spyPromise) => {
return this.messageMatchesFilter(
processingResult,
resolvedProcessingResult,
spyPromise.fields,
spyPromise.processingResult,
)
Expand All @@ -115,3 +156,17 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
}
}
}

export function resolveHandlerSpy<T extends object>(queueOptions: CommonQueueOptions) {
if (isHandlerSpy(queueOptions.handlerSpy)) {
return queueOptions.handlerSpy as unknown as HandlerSpy<T>
}
if (!queueOptions.handlerSpy) {
return undefined
}
if (queueOptions.handlerSpy === true) {
return new HandlerSpy() as unknown as HandlerSpy<T>
}

return new HandlerSpy(queueOptions.handlerSpy) as unknown as HandlerSpy<T>
}
7 changes: 6 additions & 1 deletion packages/core/lib/types/MessageQueueTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ export interface QueueConsumer {
close(): Promise<unknown>
}

export type MessageProcessingResult = 'retryLater' | 'success'
export type MessageProcessingResult =
| 'retryLater'
| 'consumed'
| 'published'
| 'error'
| 'invalid_message'

export interface SyncPublisher<MessagePayloadType> {
publish(message: MessagePayloadType): void
Expand Down
5 changes: 3 additions & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^8.3.0",
"toad-cache": "^3.4.1"
},
"devDependencies": {
Expand All @@ -40,8 +41,8 @@
"eslint-plugin-prettier": "^5.0.1",
"prettier": "^3.1.0",
"typescript": "^5.3.2",
"vitest": "^0.34.6",
"vite": "4.5.1"
"vitest": "0.34.6",
"vite": "4.5.0"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
Expand Down
Loading
Loading