Skip to content

Commit

Permalink
Implement handlerSpy support for SQS
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Dec 7, 2023
1 parent d330d42 commit 2b00d07
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 29 deletions.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,52 @@ Both publishers and consumers accept a queue name and configuration as parameter

If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.

## Handler spies

In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly.

In order to enable this functionality, configure spyHandler on the publisher or consumer:

```ts
export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
SupportedMessages,
ExecutionContext
> {
constructor(
dependencies: SQSConsumerDependencies,
userService: UserService,
) {
super(dependencies, {
//
// rest of configuration skipped
//
handlerSpy: {
bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100
messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id`
},
}
}
}
```
Then you can use handler spies in your tests or production code to await certain events:
```ts
await myConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

await myConsumer.handlerSpy.waitForMessageWithId('1')

await myConsumer.handlerSpy.waitForMessageWithId('1')

await myConsumer.handlerSpy.waitForMessage({
projectId: 1,
})

await myPublisher.handlerSpy.waitForMessage({
userId: 1
}, 'consumed')
```
## Automatic Reconnects (RabbitMQ)
`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.
Expand Down
2 changes: 2 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export {
BarrierResultNegative,
} from './lib/queues/HandlerContainer'
export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer'
export { HandlerSpy } from './lib/queues/HandlerSpy'
export type { SpyResult, HandlerSpyParams, PublicHandlerSpy } from './lib/queues/HandlerSpy'

export { MessageSchemaContainer } from './lib/queues/MessageSchemaContainer'
export type { MessageSchemaContainerOptions } from './lib/queues/MessageSchemaContainer'
Expand Down
6 changes: 3 additions & 3 deletions packages/core/lib/queues/HandlerSpy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
)
}

waitForEventWithId(id: string, processingResult?: MessageProcessingResult) {
return this.waitForEvent(
waitForMessageWithId(id: string, processingResult?: MessageProcessingResult) {
return this.waitForMessage(
// @ts-ignore
{
[this.messageIdField]: id,
Expand All @@ -76,7 +76,7 @@ export class HandlerSpy<MessagePayloadSchemas extends object> {
)
}

waitForEvent(
waitForMessage(
fields: Partial<MessagePayloadSchemas>,
processingResult?: MessageProcessingResult,
): Promise<SpyResult<MessagePayloadSchemas>> {
Expand Down
10 changes: 5 additions & 5 deletions packages/core/test/queues/HandlerSpy.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('HandlerSpy', () => {
})
})

describe('waitForEvent', () => {
describe('waitForMessage', () => {
it('Finds previously consumed event', async () => {
const spy = new HandlerSpy<Message>()

Expand All @@ -46,7 +46,7 @@ describe('HandlerSpy', () => {
message: TEST_MESSAGE,
})

const message = await spy.waitForEvent({
const message = await spy.waitForMessage({
status: 'done',
})

Expand All @@ -66,14 +66,14 @@ describe('HandlerSpy', () => {
message: TEST_MESSAGE,
})

const message = await spy.waitForEvent(
const message = await spy.waitForMessage(
{
status: 'done',
},
'consumed',
)

const message2 = await spy.waitForEvent(
const message2 = await spy.waitForMessage(
{
status: 'done',
},
Expand All @@ -92,7 +92,7 @@ describe('HandlerSpy', () => {
message: TEST_MESSAGE_2,
})

const spyPromise = spy.waitForEvent({
const spyPromise = spy.waitForMessage({
status: 'done',
})

Expand Down
1 change: 1 addition & 0 deletions packages/sqs/lib/utils/sqsMessageDeserializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { deserializeSQSMessage } from './sqsMessageDeserializer'
describe('messageDeserializer', () => {
it('deserializes valid JSON', () => {
const messagePayload: PERMISSIONS_MESSAGE_TYPE = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class SqsPermissionConsumerMultiSchema extends AbstractSqsConsumerMultiSc
dependencies,
{
messageTypeField: 'messageType',
handlerSpy: true,
deletionConfig: {
deleteIfExists: true,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => {
permissions: perms,
})

await consumer.handlerSpy.waitForEventWithId('abcd', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('abcd', 'consumed')
const updatedUsersPermissions = await retrievePermissions(userIds)

if (null === updatedUsersPermissions) {
Expand All @@ -151,7 +151,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => {
})

// no users in the database, so message will go back to the queue
await consumer.handlerSpy.waitForEventWithId('123', 'retryLater')
await consumer.handlerSpy.waitForMessageWithId('123', 'retryLater')

const usersFromDb = await retrievePermissions(userIds)
expect(usersFromDb).toBeNull()
Expand All @@ -160,7 +160,7 @@ describe('SqsPermissionsConsumerMonoSchema', () => {
userPermissionMap[200] = []
userPermissionMap[300] = []

await consumer.handlerSpy.waitForEventWithId('123', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('123', 'consumed')
const usersPermissions = await retrievePermissions(userIds)

if (null === usersPermissions) {
Expand All @@ -185,15 +185,15 @@ describe('SqsPermissionsConsumerMonoSchema', () => {
})

// not all users are in the database, so message will go back to the queue
await consumer.handlerSpy.waitForEventWithId('abc', 'retryLater')
await consumer.handlerSpy.waitForMessageWithId('abc', 'retryLater')

const usersFromDb = await retrievePermissions(userIds)
expect(usersFromDb).toBeNull()

userPermissionMap[200] = []
userPermissionMap[300] = []

await consumer.handlerSpy.waitForEventWithId('abc', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('abc', 'consumed')
const usersPermissions = await retrievePermissions(userIds)

if (null === usersPermissions) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { SQSClient } from '@aws-sdk/client-sqs'
import { ReceiveMessageCommand } from '@aws-sdk/client-sqs'
import type { BarrierResult } from '@message-queue-toolkit/core'
import { waitAndRetry } from '@message-queue-toolkit/core'
import type { AwilixContainer } from 'awilix'
import { asClass, asFunction } from 'awilix'
import { describe, beforeEach, afterEach, expect, it } from 'vitest'
Expand Down Expand Up @@ -89,12 +88,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => {
await newConsumer.start()

await publisher.publish({
id: '1',
messageType: 'add',
})

await waitAndRetry(() => {
return logger.loggedMessages.length === 1
})
await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

expect(logger.loggedMessages.length).toBe(1)
await newConsumer.close()
Expand Down Expand Up @@ -137,12 +135,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => {
await newConsumer.start()

await publisher.publish({
id: '2',
messageType: 'add',
})

await waitAndRetry(() => {
return newConsumer.addCounter === 1
})
await newConsumer.handlerSpy.waitForMessageWithId('2', 'consumed')

expect(newConsumer.addCounter).toBe(1)
expect(barrierCounter).toBe(2)
Expand All @@ -168,12 +165,11 @@ describe('SqsPermissionsConsumerMultiSchema', () => {
await newConsumer.start()

await publisher.publish({
id: '3',
messageType: 'add',
})

await waitAndRetry(() => {
return newConsumer.addCounter > 0
})
await newConsumer.handlerSpy.waitForMessageWithId('3', 'consumed')

expect(newConsumer.addCounter).toBe(1)
expect(barrierCounter).toBe(2)
Expand Down Expand Up @@ -213,18 +209,21 @@ describe('SqsPermissionsConsumerMultiSchema', () => {
describe('happy path', () => {
it('Processes messages', async () => {
await publisher.publish({
id: '10',
messageType: 'add',
})
await publisher.publish({
id: '20',
messageType: 'remove',
})
await publisher.publish({
id: '30',
messageType: 'remove',
})

await waitAndRetry(() => {
return consumer.addCounter === 1 && consumer.removeCounter === 2
})
await consumer.handlerSpy.waitForMessageWithId('10', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('20', 'consumed')
await consumer.handlerSpy.waitForMessageWithId('30', 'consumed')

expect(consumer.addCounter).toBe(1)
expect(consumer.removeCounter).toBe(2)
Expand Down
2 changes: 2 additions & 0 deletions packages/sqs/test/consumers/userConsumerSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({
})

export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('add'),
})

export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('remove'),
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,15 @@ describe('SqsPermissionPublisher', () => {
describe('logging', () => {
it('logs a message when logging is enabled', async () => {
const message = {
id: '1',
userIds,
messageType: 'add',
permissions: perms,
} satisfies PERMISSIONS_MESSAGE_TYPE

await publisher.publish(message)

await waitAndRetry(() => {
return logger.loggedMessages.length === 1
})
await publisher.handlerSpy.waitForMessageWithId('1')

expect(logger.loggedMessages.length).toBe(1)
})
Expand All @@ -76,6 +75,7 @@ describe('SqsPermissionPublisher', () => {
const { permissionPublisher } = diContainer.cradle

const message = {
id: '2',
userIds,
messageType: 'add',
permissions: perms,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export class SqsPermissionPublisherMonoSchema extends AbstractSqsPublisherMonoSc
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
},
},
handlerSpy: true,
deletionConfig: {
deleteIfExists: false,
},
Expand Down

0 comments on commit 2b00d07

Please sign in to comment.