Skip to content

Commit

Permalink
Merge branch 'main' of github.com:kibertoad/message-queue-toolkit int…
Browse files Browse the repository at this point in the history
…o concurrent_consumers_support
  • Loading branch information
kjamrog committed Dec 11, 2024
2 parents 1b47058 + c46d63f commit 6630176
Show file tree
Hide file tree
Showing 32 changed files with 572 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:

strategy:
matrix:
node-version: [18.x, 20.x, 22.x]
node-version: [18.x, 20.x, 22.x, 23.x]

steps:
- uses: actions/checkout@v4
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
services:
rabbitmq:
image: rabbitmq:3.13.7
image: rabbitmq:4.0.4
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
volumes:
- rabbit_data:/var/lib/rabbitmq

localstack:
image: localstack/localstack:3.8.1
image: localstack/localstack:4.0.2
network_mode: bridge
hostname: localstack
ports:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq:
image: rabbitmq:3.13.7
image: rabbitmq:4.0.4
ports:
- ${DOCKER_RABBITMQ_CLIENT_PORT:-5672}:5672
- ${DOCKER_RABBITMQ_MANAGEMENT_PORT:-15672}:15672
Expand Down
8 changes: 4 additions & 4 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@
"amqplib": "^0.10.3"
},
"devDependencies": {
"@biomejs/biome": "1.9.3",
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@message-queue-toolkit/core": "*",
"@types/amqplib": "^0.10.5",
"@types/node": "^22.7.5",
"@vitest/coverage-v8": "^2.1.2",
"@vitest/coverage-v8": "^2.1.5",
"amqplib": "^0.10.4",
"awilix": "^12.0.1",
"awilix-manager": "^5.4.0",
"del-cli": "^6.0.0",
"typescript": "^5.6.3",
"vitest": "^2.1.2"
"typescript": "^5.7.2",
"vitest": "^2.1.5"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
Expand Down
8 changes: 4 additions & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@
"zod": "^3.23.8"
},
"devDependencies": {
"@biomejs/biome": "1.9.3",
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.7.5",
"@types/tmp": "^0.2.6",
"@vitest/coverage-v8": "^2.1.2",
"@vitest/coverage-v8": "^2.1.5",
"awilix": "^12.0.1",
"awilix-manager": "^5.4.0",
"del-cli": "^6.0.0",
"typescript": "^5.6.3",
"vitest": "^2.1.2"
"typescript": "^5.7.2",
"vitest": "^2.1.5"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
Expand Down
4 changes: 2 additions & 2 deletions packages/outbox-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/background-jobs-common": "^8.0.0",
"@lokalise/background-jobs-common": "^9.0.0",
"@supercharge/promise-pool": "^3.2.0",
"uuidv7": "^1.0.2"
},
Expand All @@ -32,7 +32,7 @@
"@message-queue-toolkit/schemas": ">=4.0.0"
},
"devDependencies": {
"@biomejs/biome": "1.9.3",
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.0.0",
"@vitest/coverage-v8": "^2.0.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/s3-payload-store/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
localstack:
image: localstack/localstack:3.8.1
image: localstack/localstack:4.0.2
network_mode: bridge
hostname: localstack
ports:
Expand Down
2 changes: 1 addition & 1 deletion packages/s3-payload-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
},
"devDependencies": {
"@message-queue-toolkit/core": "*",
"@biomejs/biome": "1.9.3",
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.0.0",
"@vitest/coverage-v8": "^2.0.4",
Expand Down
8 changes: 4 additions & 4 deletions packages/schemas/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
"zod": "^3.23.8"
},
"devDependencies": {
"@biomejs/biome": "1.9.3",
"@biomejs/biome": "1.9.4",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.7.5",
"@vitest/coverage-v8": "^2.1.2",
"@vitest/coverage-v8": "^2.1.5",
"del-cli": "^6.0.0",
"typescript": "^5.6.3",
"vitest": "^2.1.2"
"typescript": "^5.7.2",
"vitest": "^2.1.5"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
services:
localstack:
image: localstack/localstack:3.8.1
image: localstack/localstack:4.0.2
network_mode: bridge
hostname: localstack
ports:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
1 change: 1 addition & 0 deletions packages/sns/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export {
findSubscriptionByTopicAndQueue,
getSubscriptionAttributes,
} from './lib/utils/snsUtils'
export { clearCachedCallerIdentity } from './lib/utils/stsUtils'

export { subscribeToTopic } from './lib/utils/snsSubscriber'
export { initSns, initSnsSqs } from './lib/utils/snsInitter'
Expand Down
14 changes: 12 additions & 2 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient, Tag } from '@aws-sdk/client-sn
import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes'
import { deleteSns, initSns } from '../utils/snsInitter'

Expand All @@ -10,6 +11,7 @@ export const SNS_MESSAGE_MAX_SIZE = 256 * 1024 // 256KB

export type SNSDependencies = QueueDependencies & {
snsClient: SNSClient
stsClient: STSClient
}

export type SNSTopicAWSConfig = CreateTopicCommandInput
Expand All @@ -31,6 +33,7 @@ export type SNSTopicConfig = {
export type ExtraSNSCreationParams = {
queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[]
allowedSourceOwner?: string
forceTagUpdate?: boolean
}

export type SNSCreationConfig = {
Expand Down Expand Up @@ -59,21 +62,28 @@ export abstract class AbstractSnsService<
SNSOptionsType
> {
protected readonly snsClient: SNSClient
protected readonly stsClient: STSClient
// @ts-ignore
protected topicArn: string

constructor(dependencies: DependenciesType, options: SNSOptionsType) {
super(dependencies, options)

this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

public async init() {
if (this.deletionConfig && this.creationConfig) {
await deleteSns(this.snsClient, this.deletionConfig, this.creationConfig)
await deleteSns(this.snsClient, this.stsClient, this.deletionConfig, this.creationConfig)
}

const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig)
const initResult = await initSns(
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
)
this.topicArn = initResult.topicArn
this.isInitted = true
}
Expand Down
6 changes: 6 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter'
import { readSnsMessage } from '../utils/snsMessageReader'
import type { SNSSubscriptionOptions } from '../utils/snsSubscriber'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService'

export type SNSSQSConsumerDependencies = SQSConsumerDependencies & {
snsClient: SNSClient
stsClient: STSClient
}
export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig

Expand Down Expand Up @@ -53,6 +55,7 @@ export abstract class AbstractSnsSqsConsumer<
> {
private readonly subscriptionConfig?: SNSSubscriptionOptions
private readonly snsClient: SNSClient
private readonly stsClient: STSClient

// @ts-ignore
protected topicArn: string
Expand All @@ -74,13 +77,15 @@ export abstract class AbstractSnsSqsConsumer<

this.subscriptionConfig = options.subscriptionConfig
this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

override async init(): Promise<void> {
if (this.deletionConfig && this.creationConfig && this.subscriptionConfig) {
await deleteSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.deletionConfig,
this.creationConfig.queue,
this.creationConfig.topic,
Expand All @@ -95,6 +100,7 @@ export abstract class AbstractSnsSqsConsumer<
const initSnsSqsResult = await initSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
Expand Down
1 change: 1 addition & 0 deletions packages/sns/lib/sns/SnsPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class SnsPublisherManager<
newPublisherOptions: options.newPublisherOptions,
publisherDependencies: {
snsClient: dependencies.snsClient,
stsClient: dependencies.stsClient,
logger: dependencies.logger,
errorReporter: dependencies.errorReporter,
},
Expand Down
14 changes: 11 additions & 3 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer'

import type { STSClient } from '@aws-sdk/client-sts'
import type { Either } from '@lokalise/node-core'
import { type TopicResolutionOptions, isCreateTopicCommand } from '../types/TopicTypes'
import type { SNSSubscriptionOptions } from './snsSubscriber'
Expand All @@ -24,6 +25,7 @@ import {
export async function initSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSSQSQueueLocatorType,
creationConfig?: SNSCreationConfig & SQSCreationConfig,
subscriptionConfig?: SNSSubscriptionOptions,
Expand Down Expand Up @@ -59,6 +61,7 @@ export async function initSnsSqs(
const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
creationConfig.queue,
topicResolutionOptions,
subscriptionConfig,
Expand Down Expand Up @@ -132,6 +135,7 @@ export async function initSnsSqs(
export async function deleteSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput | undefined,
Expand All @@ -152,6 +156,7 @@ export async function deleteSnsSqs(
const { subscriptionArn } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
queueConfiguration,
topicConfiguration ?? topicLocator!,
subscriptionConfiguration,
Expand All @@ -176,13 +181,14 @@ export async function deleteSnsSqs(
if (!topicName) {
throw new Error('Failed to resolve topic name')
}
await deleteTopic(snsClient, topicName)
await deleteTopic(snsClient, stsClient, topicName)
}
await deleteSubscription(snsClient, subscriptionArn)
}

export async function deleteSns(
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
creationConfig: SNSCreationConfig,
) {
Expand All @@ -200,11 +206,12 @@ export async function deleteSns(
throw new Error('topic.Name must be set for automatic deletion')
}

await deleteTopic(snsClient, creationConfig.topic.Name)
await deleteTopic(snsClient, stsClient, creationConfig.topic.Name)
}

export async function initSns(
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSTopicLocatorType,
creationConfig?: SNSCreationConfig,
) {
Expand All @@ -227,9 +234,10 @@ export async function initSns(
'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory',
)
}
const topicArn = await assertTopic(snsClient, creationConfig.topic!, {
const topicArn = await assertTopic(snsClient, stsClient, creationConfig.topic!, {
queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: creationConfig.allowedSourceOwner,
forceTagUpdate: creationConfig.forceTagUpdate,
})
return {
topicArn,
Expand Down
Loading

0 comments on commit 6630176

Please sign in to comment.