Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent consumers support #232

Merged
merged 14 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Multi-schema consumers support multiple message types via handler configs. They
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('AmqpPermissionConsumer', () => {
await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

expect(logger.loggedMessages.length).toBe(5)
expect(logger.loggedMessages).toEqual([
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
{
id: '1',
Expand Down
16 changes: 14 additions & 2 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,27 @@ export abstract class AbstractQueueService<
}

protected logProcessedMessage(
_message: MessagePayloadSchemas | null,
message: MessagePayloadSchemas | null,
processingResult: MessageProcessingResult,
messageId?: string,
) {
const messageTimestamp = message ? this.tryToExtractTimestamp(message) : undefined
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
const messageProcessingMilliseconds = messageTimestamp
? Date.now() - messageTimestamp.getTime()
: undefined

const messageType =
message && this.messageTypeField in message
? // @ts-ignore
message[this.messageTypeField]
: undefined

this.logger.debug(
{
processingResult,
messageId,
messageProcessingTime: messageProcessingMilliseconds,
messageType,
},
`Finished processing message ${messageId ?? `(unknown id)`}`,
)
Expand Down Expand Up @@ -206,7 +219,6 @@ export abstract class AbstractQueueService<
if (this.logMessages) {
// @ts-ignore
const resolvedMessageId: string | undefined = message?.[this.messageIdField] ?? messageId

this.logProcessedMessage(message, processingResult, resolvedMessageId)
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/core",
"version": "17.2.1",
"version": "17.2.3",
"private": false,
"license": "MIT",
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "18.0.1",
"version": "18.1.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down Expand Up @@ -35,7 +35,7 @@
"@aws-sdk/client-sts": "^3.632.0",
"@message-queue-toolkit/core": ">=15.0.0",
"@message-queue-toolkit/schemas": ">=2.0.0",
"@message-queue-toolkit/sqs": "^17.0.0"
"@message-queue-toolkit/sqs": "^17.3.0"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.670.0",
Expand Down
64 changes: 62 additions & 2 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns'
import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import { type AwilixContainer, asValue } from 'awilix'
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { type AwilixContainer, asFunction, asValue } from 'awilix'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'

import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils'
import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
Expand All @@ -13,6 +13,7 @@ import type { Dependencies } from '../utils/testContext'

import type { STSClient } from '@aws-sdk/client-sts'
import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer'
import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas'

describe('SnsSqsPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -706,6 +707,65 @@ describe('SnsSqsPermissionConsumer', () => {
})
})

describe('multiple consumers', () => {
let diContainer: AwilixContainer<Dependencies>

let publisher: SnsPermissionPublisher
let consumer: SnsSqsPermissionConsumer

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asFunction((dependencies) => {
return new SnsSqsPermissionConsumer(dependencies, {
creationConfig: {
topic: {
Name: SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME,
},
queue: {
QueueName: SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME,
},
updateAttributesIfExists: true,
},
deletionConfig: {
deleteIfExists: true,
},
concurrentConsumersAmount: 10,
})
}),
})
publisher = diContainer.cradle.permissionPublisher
consumer = diContainer.cradle.permissionConsumer

await consumer.start()
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('process all messages properly', async () => {
const messagesAmount = 50
const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map(
(_, i) => ({
id: `${i}`,
messageType: 'add',
timestamp: new Date().toISOString(),
}),
)

messages.map((m) => publisher.publish(m))
await Promise.all(
messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')),
)

// Verifies that each message is executed only once
expect(consumer.addCounter).toBe(messagesAmount)
// Verifies that no message is lost
expect(consumer.processedMessagesIds).toHaveLength(messagesAmount)
})
})

describe('visibility timeout', () => {
const topicName = 'myTestTopic'
const queueName = 'myTestQueue'
Expand Down
4 changes: 4 additions & 0 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SnsSqsPermissionConsumerOptions = Pick<
| 'consumerOverrides'
| 'maxRetryDuration'
| 'payloadStoreConfig'
| 'concurrentConsumersAmount'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand All @@ -65,6 +66,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
public addBarrierCounter = 0
public removeCounter = 0
public preHandlerCounter = 0
public processedMessagesIds: Set<string> = new Set()

constructor(
dependencies: SNSSQSConsumerDependencies,
Expand Down Expand Up @@ -101,6 +103,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
PERMISSIONS_ADD_MESSAGE_SCHEMA,
(_message, context, _preHandlingOutputs) => {
this.addCounter += context.incrementAmount
this.processedMessagesIds.add(_message.id)
return Promise.resolve({ result: 'success' })
},
{
Expand Down Expand Up @@ -164,6 +167,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
updateAttributesIfExists: false,
},
maxRetryDuration: options.maxRetryDuration,
concurrentConsumersAmount: options.concurrentConsumersAmount,
},
{
incrementAmount: 1,
Expand Down
52 changes: 34 additions & 18 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export type SQSConsumerOptions<
ConsumerOptions,
'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout'
>
concurrentConsumersAmount?: number
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
}

export abstract class AbstractSqsConsumer<
Expand Down Expand Up @@ -96,7 +97,8 @@ export abstract class AbstractSqsConsumer<
>
implements QueueConsumer
{
private consumer?: Consumer
private consumers: Consumer[]
private readonly concurrentConsumersAmount: number
kjamrog marked this conversation as resolved.
Show resolved Hide resolved
private readonly transactionObservabilityManager?: TransactionObservabilityManager
private readonly consumerOptionsOverride: Partial<ConsumerOptions>
private readonly handlerContainer: HandlerContainer<
Expand Down Expand Up @@ -129,7 +131,8 @@ export abstract class AbstractSqsConsumer<
this.deadLetterQueueOptions = options.deadLetterQueue
this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION
this.executionContext = executionContext

this.consumers = []
this.concurrentConsumersAmount = options.concurrentConsumersAmount ?? 1
this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options)
this.handlerContainer = new HandlerContainer<
MessagePayloadType,
Expand Down Expand Up @@ -174,14 +177,34 @@ export abstract class AbstractSqsConsumer<

public async start() {
await this.init()
if (this.consumer) this.consumer.stop()
this.stopExistingConsumers()

const visibilityTimeout = await this.getQueueVisibilityTimeout()

this.consumer = Consumer.create({
this.consumers = Array.from({ length: this.concurrentConsumersAmount }).map((_) =>
this.createConsumer({ visibilityTimeout }),
)

for (const consumer of this.consumers) {
consumer.on('error', (err) => {
this.handleError(err, {
queueName: this.queueName,
})
})
consumer.start()
}
}

public override async close(abort?: boolean): Promise<void> {
await super.close()
this.stopExistingConsumers(abort ?? false)
}

private createConsumer(options: { visibilityTimeout: number | undefined }): Consumer {
return Consumer.create({
sqs: this.sqsClient,
queueUrl: this.queueUrl,
visibilityTimeout,
visibilityTimeout: options.visibilityTimeout,
messageAttributeNames: [`${PAYLOAD_OFFLOADING_ATTRIBUTE_PREFIX}*`],
...this.consumerOptionsOverride,
handleMessage: async (message: SQSMessage) => {
Expand Down Expand Up @@ -250,21 +273,14 @@ export abstract class AbstractSqsConsumer<
return Promise.reject(result.error)
},
})

this.consumer.on('error', (err) => {
this.handleError(err, {
queueName: this.queueName,
})
})

this.consumer.start()
}

public override async close(abort?: boolean): Promise<void> {
await super.close()
this.consumer?.stop({
abort: abort ?? false,
})
private stopExistingConsumers(abort?: boolean) {
for (const consumer of this.consumers) {
consumer.stop({
abort,
})
}
}

private async internalProcessMessage(
Expand Down
2 changes: 1 addition & 1 deletion packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "17.2.0",
"version": "17.3.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand Down
61 changes: 61 additions & 0 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { SINGLETON_CONFIG, registerDependencies } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

import { SqsPermissionConsumer } from './SqsPermissionConsumer'
import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas'

describe('SqsPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -596,6 +597,66 @@ describe('SqsPermissionConsumer', () => {
})
})

describe('multiple consumers', () => {
let diContainer: AwilixContainer<Dependencies>
let sqsClient: SQSClient

kjamrog marked this conversation as resolved.
Show resolved Hide resolved
let publisher: SqsPermissionPublisher
let consumer: SqsPermissionConsumer

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asFunction((dependencies) => {
return new SqsPermissionConsumer(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionConsumer.QUEUE_NAME,
},
},
concurrentConsumersAmount: 5,
})
}),
})
sqsClient = diContainer.cradle.sqsClient
publisher = diContainer.cradle.permissionPublisher
consumer = diContainer.cradle.permissionConsumer

await consumer.start()

const command = new ReceiveMessageCommand({
QueueUrl: publisher.queueProps.url,
})
const reply = await sqsClient.send(command)
expect(reply.Messages).toBeUndefined()
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('process all messages properly', async () => {
const messagesAmount = 100
const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map(
(_, i) => ({
id: `${i}`,
messageType: 'add',
timestamp: new Date().toISOString(),
}),
)

messages.map((m) => publisher.publish(m))
await Promise.all(
messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')),
)

// Verifies that each message is executed only once
expect(consumer.addCounter).toBe(messagesAmount)
// Verifies that no message is lost
expect(consumer.processedMessagesIds).toHaveLength(messagesAmount)
})
})

describe('visibility timeout', () => {
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>
Expand Down
Loading
Loading