diff --git a/docker-compose.yml b/docker-compose.yml index 8e7d177d..395348e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock diff --git a/packages/sns/docker-compose.yml b/packages/sns/docker-compose.yml index f7d6d6fc..f11269f0 100644 --- a/packages/sns/docker-compose.yml +++ b/packages/sns/docker-compose.yml @@ -7,7 +7,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock diff --git a/packages/sns/index.ts b/packages/sns/index.ts index ba79d820..604a9f08 100644 --- a/packages/sns/index.ts +++ b/packages/sns/index.ts @@ -33,6 +33,7 @@ export { findSubscriptionByTopicAndQueue, getSubscriptionAttributes, } from './lib/utils/snsUtils' +export { clearCachedCallerIdentity } from './lib/utils/stsUtils' export { subscribeToTopic } from './lib/utils/snsSubscriber' export { initSns, initSnsSqs } from './lib/utils/snsInitter' diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index cbae6ae1..13b1bbcf 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient, Tag } from '@aws-sdk/client-sn import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core' import { AbstractQueueService } from '@message-queue-toolkit/core' +import type { STSClient } from '@aws-sdk/client-sts' import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes' import { deleteSns, initSns } from '../utils/snsInitter' @@ -10,6 +11,7 @@ export const SNS_MESSAGE_MAX_SIZE = 256 * 1024 // 256KB export type SNSDependencies = QueueDependencies & { snsClient: SNSClient + stsClient: STSClient } export type SNSTopicAWSConfig = CreateTopicCommandInput @@ -31,6 +33,7 @@ export type SNSTopicConfig = { export type ExtraSNSCreationParams = { queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[] allowedSourceOwner?: string + forceTagUpdate?: boolean } export type SNSCreationConfig = { @@ -59,6 +62,7 @@ export abstract class AbstractSnsService< SNSOptionsType > { protected readonly snsClient: SNSClient + protected readonly stsClient: STSClient // @ts-ignore protected topicArn: string @@ -66,14 +70,20 @@ export abstract class AbstractSnsService< super(dependencies, options) this.snsClient = dependencies.snsClient + this.stsClient = dependencies.stsClient } public async init() { if (this.deletionConfig && this.creationConfig) { - await deleteSns(this.snsClient, this.deletionConfig, this.creationConfig) + await deleteSns(this.snsClient, this.stsClient, this.deletionConfig, this.creationConfig) } - const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig) + const initResult = await initSns( + this.snsClient, + this.stsClient, + this.locatorConfig, + this.creationConfig, + ) this.topicArn = initResult.topicArn this.isInitted = true } diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 69c16c12..f180c068 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -12,10 +12,12 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter' import { readSnsMessage } from '../utils/snsMessageReader' import type { SNSSubscriptionOptions } from '../utils/snsSubscriber' +import type { STSClient } from '@aws-sdk/client-sts' import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService' export type SNSSQSConsumerDependencies = SQSConsumerDependencies & { snsClient: SNSClient + stsClient: STSClient } export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig @@ -53,6 +55,7 @@ export abstract class AbstractSnsSqsConsumer< > { private readonly subscriptionConfig?: SNSSubscriptionOptions private readonly snsClient: SNSClient + private readonly stsClient: STSClient // @ts-ignore protected topicArn: string @@ -74,6 +77,7 @@ export abstract class AbstractSnsSqsConsumer< this.subscriptionConfig = options.subscriptionConfig this.snsClient = dependencies.snsClient + this.stsClient = dependencies.stsClient } override async init(): Promise { @@ -81,6 +85,7 @@ export abstract class AbstractSnsSqsConsumer< await deleteSnsSqs( this.sqsClient, this.snsClient, + this.stsClient, this.deletionConfig, this.creationConfig.queue, this.creationConfig.topic, @@ -95,6 +100,7 @@ export abstract class AbstractSnsSqsConsumer< const initSnsSqsResult = await initSnsSqs( this.sqsClient, this.snsClient, + this.stsClient, this.locatorConfig, this.creationConfig, this.subscriptionConfig, diff --git a/packages/sns/lib/sns/SnsPublisherManager.ts b/packages/sns/lib/sns/SnsPublisherManager.ts index 572cc4fb..b20a5ff8 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.ts @@ -79,6 +79,7 @@ export class SnsPublisherManager< newPublisherOptions: options.newPublisherOptions, publisherDependencies: { snsClient: dependencies.snsClient, + stsClient: dependencies.stsClient, logger: dependencies.logger, errorReporter: dependencies.errorReporter, }, diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 2ef2be97..ebaad77c 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -8,6 +8,7 @@ import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService' import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer' +import type { STSClient } from '@aws-sdk/client-sts' import type { Either } from '@lokalise/node-core' import { type TopicResolutionOptions, isCreateTopicCommand } from '../types/TopicTypes' import type { SNSSubscriptionOptions } from './snsSubscriber' @@ -24,6 +25,7 @@ import { export async function initSnsSqs( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, locatorConfig?: SNSSQSQueueLocatorType, creationConfig?: SNSCreationConfig & SQSCreationConfig, subscriptionConfig?: SNSSubscriptionOptions, @@ -59,6 +61,7 @@ export async function initSnsSqs( const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( sqsClient, snsClient, + stsClient, creationConfig.queue, topicResolutionOptions, subscriptionConfig, @@ -132,6 +135,7 @@ export async function initSnsSqs( export async function deleteSnsSqs( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, deletionConfig: DeletionConfig, queueConfiguration: CreateQueueCommandInput, topicConfiguration: CreateTopicCommandInput | undefined, @@ -152,6 +156,7 @@ export async function deleteSnsSqs( const { subscriptionArn } = await subscribeToTopic( sqsClient, snsClient, + stsClient, queueConfiguration, topicConfiguration ?? topicLocator!, subscriptionConfiguration, @@ -176,13 +181,14 @@ export async function deleteSnsSqs( if (!topicName) { throw new Error('Failed to resolve topic name') } - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) } await deleteSubscription(snsClient, subscriptionArn) } export async function deleteSns( snsClient: SNSClient, + stsClient: STSClient, deletionConfig: DeletionConfig, creationConfig: SNSCreationConfig, ) { @@ -200,11 +206,12 @@ export async function deleteSns( throw new Error('topic.Name must be set for automatic deletion') } - await deleteTopic(snsClient, creationConfig.topic.Name) + await deleteTopic(snsClient, stsClient, creationConfig.topic.Name) } export async function initSns( snsClient: SNSClient, + stsClient: STSClient, locatorConfig?: SNSTopicLocatorType, creationConfig?: SNSCreationConfig, ) { @@ -227,9 +234,10 @@ export async function initSns( 'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory', ) } - const topicArn = await assertTopic(snsClient, creationConfig.topic!, { + const topicArn = await assertTopic(snsClient, stsClient, creationConfig.topic!, { queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: creationConfig.allowedSourceOwner, + forceTagUpdate: creationConfig.forceTagUpdate, }) return { topicArn, diff --git a/packages/sns/lib/utils/snsSubscriber.ts b/packages/sns/lib/utils/snsSubscriber.ts index d53f1dde..e09a594f 100644 --- a/packages/sns/lib/utils/snsSubscriber.ts +++ b/packages/sns/lib/utils/snsSubscriber.ts @@ -8,6 +8,7 @@ import { assertQueue } from '@message-queue-toolkit/sqs' import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' +import type { STSClient } from '@aws-sdk/client-sts' import { type TopicResolutionOptions, isCreateTopicCommand, @@ -21,8 +22,9 @@ export type SNSSubscriptionOptions = Omit< > & { updateAttributesIfExists: boolean } async function resolveTopicArnToSubscribeTo( - topicConfiguration: TopicResolutionOptions, snsClient: SNSClient, + stsClient: STSClient, + topicConfiguration: TopicResolutionOptions, extraParams: (ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams) | undefined, ) { //If topicArn is present, let's use it and return early. @@ -32,9 +34,10 @@ async function resolveTopicArnToSubscribeTo( //If input configuration is capable of creating a topic, let's create it and return its ARN. if (isCreateTopicCommand(topicConfiguration)) { - return await assertTopic(snsClient, topicConfiguration, { + return await assertTopic(snsClient, stsClient, topicConfiguration, { queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: extraParams?.allowedSourceOwner, + forceTagUpdate: extraParams?.forceTagUpdate, }) } @@ -45,12 +48,18 @@ async function resolveTopicArnToSubscribeTo( export async function subscribeToTopic( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, queueConfiguration: CreateQueueCommandInput, topicConfiguration: TopicResolutionOptions, subscriptionConfiguration: SNSSubscriptionOptions, extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams, ) { - const topicArn = await resolveTopicArnToSubscribeTo(topicConfiguration, snsClient, extraParams) + const topicArn = await resolveTopicArnToSubscribeTo( + snsClient, + stsClient, + topicConfiguration, + extraParams, + ) const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, { topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix, diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index 925c06d6..700ad5a5 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,6 +1,7 @@ import { type CreateTopicCommandInput, type SNSClient, + TagResourceCommand, paginateListTopics, } from '@aws-sdk/client-sns' import { @@ -12,12 +13,14 @@ import { SetTopicAttributesCommand, UnsubscribeCommand, } from '@aws-sdk/client-sns' -import type { Either } from '@lokalise/node-core' +import { type Either, isError } from '@lokalise/node-core' import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from '@message-queue-toolkit/sqs' import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' +import type { STSClient } from '@aws-sdk/client-sts' import { generateTopicSubscriptionPolicy } from './snsAttributeUtils' +import { buildTopicArn } from './stsUtils' type AttributesResult = { attributes?: Record @@ -79,16 +82,23 @@ export async function getSubscriptionAttributes( export async function assertTopic( snsClient: SNSClient, + stsClient: STSClient, topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { - const command = new CreateTopicCommand(topicOptions) - const response = await snsClient.send(command) - - if (!response.TopicArn) { - throw new Error('No topic arn in response') + let topicArn: string + try { + const command = new CreateTopicCommand(topicOptions) + const response = await snsClient.send(command) + if (!response.TopicArn) throw new Error('No topic arn in response') + topicArn = response.TopicArn + } catch (err) { + // We only manually build ARN in case of tag update + if (!extraParams?.forceTagUpdate) throw err + // To build ARN we need topic name and error should be "topic already exist with different tags" + if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err + topicArn = await buildTopicArn(stsClient, topicOptions.Name) } - const topicArn = response.TopicArn if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) { const setTopicAttributesCommand = new SetTopicAttributesCommand({ @@ -102,21 +112,28 @@ export async function assertTopic( }) await snsClient.send(setTopicAttributesCommand) } + if (extraParams?.forceTagUpdate && topicOptions.Tags) { + const tagTopicCommand = new TagResourceCommand({ + ResourceArn: topicArn, + Tags: topicOptions.Tags, + }) + await snsClient.send(tagTopicCommand) + } return topicArn } -export async function deleteTopic(client: SNSClient, topicName: string) { +export async function deleteTopic(snsClient: SNSClient, stsClient: STSClient, topicName: string) { try { - const topicArn = await assertTopic(client, { + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName, }) - const command = new DeleteTopicCommand({ - TopicArn: topicArn, - }) - - await client.send(command) + await snsClient.send( + new DeleteTopicCommand({ + TopicArn: topicArn, + }), + ) } catch (_) { // we don't care it operation has failed } @@ -178,3 +195,15 @@ export async function getTopicArnByName(snsClient: SNSClient, topicName?: string */ export const calculateOutgoingMessageSize = (message: unknown) => sqsCalculateOutgoingMessageSize(message) + +const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) => + !!error && + isError(error) && + 'Error' in error && + !!error.Error && + typeof error.Error === 'object' && + 'Code' in error.Error && + 'Message' in error.Error && + typeof error.Error.Message === 'string' && + error.Error.Code === 'InvalidParameter' && + error.Error.Message.includes('already exists with different tags') diff --git a/packages/sns/lib/utils/stsUtils.spec.ts b/packages/sns/lib/utils/stsUtils.spec.ts new file mode 100644 index 00000000..e9584c91 --- /dev/null +++ b/packages/sns/lib/utils/stsUtils.spec.ts @@ -0,0 +1,56 @@ +import type { SNSClient } from '@aws-sdk/client-sns' +import type { STSClient } from '@aws-sdk/client-sts' +import { beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { registerDependencies } from '../../test/utils/testContext' +import { assertTopic, deleteTopic } from './snsUtils' +import { buildTopicArn, clearCachedCallerIdentity } from './stsUtils' + +describe('stsUtils', () => { + let stsClient: STSClient + let snsClient: SNSClient + + beforeAll(async () => { + const diContainer = await registerDependencies({}, false) + stsClient = diContainer.cradle.stsClient + snsClient = diContainer.cradle.snsClient + }) + + describe('buildTopicArn', () => { + const topicName = 'my-test-topic' + + beforeEach(async () => { + await deleteTopic(snsClient, stsClient, topicName) + clearCachedCallerIdentity() + }) + + it('build ARN for topic', async () => { + const buildedTopicArn = await buildTopicArn(stsClient, topicName) + expect(buildedTopicArn).toMatchInlineSnapshot( + `"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`, + ) + + // creating real topic to make sure arn is correct + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + expect(topicArn).toBe(buildedTopicArn) + }) + + it('should be able to handle parallel calls getting caller identity only once', async () => { + const stsClientSpy = vi.spyOn(stsClient, 'send') + + const result = await Promise.all([ + buildTopicArn(stsClient, topicName), + buildTopicArn(stsClient, topicName), + buildTopicArn(stsClient, topicName), + ]) + + expect(result).toMatchInlineSnapshot(` + [ + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + ] + `) + expect(stsClientSpy).toHaveBeenCalledOnce() + }) + }) +}) diff --git a/packages/sns/lib/utils/stsUtils.ts b/packages/sns/lib/utils/stsUtils.ts new file mode 100644 index 00000000..7d9a93fa --- /dev/null +++ b/packages/sns/lib/utils/stsUtils.ts @@ -0,0 +1,43 @@ +import { + GetCallerIdentityCommand, + type GetCallerIdentityCommandOutput, + type STSClient, +} from '@aws-sdk/client-sts' + +let callerIdentityPromise: Promise | undefined +let callerIdentityCached: GetCallerIdentityCommandOutput | undefined + +/** + * Manually builds the ARN of a topic based on the current AWS account and the topic name. + * It follows the following pattern: arn:aws:sns::: + * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + */ +export const buildTopicArn = async (client: STSClient, topicName: string) => { + const identityResponse = await getAndCacheCallerIdentity(client) + const region = + typeof client.config.region === 'string' ? client.config.region : await client.config.region() + + return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` +} + +export const clearCachedCallerIdentity = () => { + callerIdentityPromise = undefined + callerIdentityCached = undefined +} + +const getAndCacheCallerIdentity = async ( + client: STSClient, +): Promise => { + if (!callerIdentityCached) { + if (!callerIdentityPromise) { + callerIdentityPromise = client.send(new GetCallerIdentityCommand({})).then((response) => { + callerIdentityCached = response + }) + } + + await callerIdentityPromise + callerIdentityPromise = undefined + } + + return callerIdentityCached! +} diff --git a/packages/sns/package.json b/packages/sns/package.json index 6903e69d..1763697f 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "17.3.0", + "version": "18.0.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -32,6 +32,7 @@ "peerDependencies": { "@aws-sdk/client-sns": "^3.632.0", "@aws-sdk/client-sqs": "^3.632.0", + "@aws-sdk/client-sts": "^3.632.0", "@message-queue-toolkit/core": ">=15.0.0", "@message-queue-toolkit/schemas": ">=2.0.0", "@message-queue-toolkit/sqs": "^17.0.0" @@ -40,11 +41,11 @@ "@aws-sdk/client-s3": "^3.670.0", "@aws-sdk/client-sns": "^3.670.0", "@aws-sdk/client-sqs": "^3.670.0", + "@biomejs/biome": "1.9.3", + "@kibertoad/biome-config": "^1.2.1", "@message-queue-toolkit/core": "*", "@message-queue-toolkit/s3-payload-store": "*", "@message-queue-toolkit/sqs": "*", - "@biomejs/biome": "1.9.3", - "@kibertoad/biome-config": "^1.2.1", "@types/node": "^22.7.5", "@vitest/coverage-v8": "^2.1.2", "awilix": "^12.0.1", diff --git a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts index f42f7981..8b54b9ec 100644 --- a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts +++ b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts @@ -1,5 +1,6 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' +import type { STSClient } from '@aws-sdk/client-sts' import { deleteQueue } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' import { beforeAll, beforeEach, describe, it } from 'vitest' @@ -11,20 +12,22 @@ describe('CreateLocateConfigMixConsumer', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient beforeAll(async () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient }) beforeEach(async () => { await deleteQueue(sqsClient, CreateLocateConfigMixConsumer.CONSUMED_QUEUE_NAME) - await deleteTopic(snsClient, CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME) + await deleteTopic(snsClient, stsClient, CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME) }) it('accepts mixed config of create and locate', async () => { - await assertTopic(snsClient, { + await assertTopic(snsClient, stsClient, { Name: CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME, }) const consumer = new CreateLocateConfigMixConsumer(diContainer.cradle) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts index 183f9820..6cee19a0 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts @@ -16,6 +16,7 @@ import type { SnsPermissionPublisher } from '../publishers/SnsPermissionPublishe import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' import type { PERMISSIONS_REMOVE_MESSAGE_TYPE } from './userConsumerSchemas' @@ -28,6 +29,7 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient let publisher: SnsPermissionPublisher let consumer: SnsSqsPermissionConsumer | undefined @@ -36,13 +38,14 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient publisher = diContainer.cradle.permissionPublisher }) beforeEach(async () => { await deleteQueue(sqsClient, queueName) await deleteQueue(sqsClient, deadLetterQueueName) - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) }) afterEach(async () => { @@ -61,7 +64,7 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { beforeEach(async () => { await deleteQueue(sqsClient, queueName) await deleteQueue(sqsClient, deadLetterQueueName) - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) }) it('creates a new dead letter queue', async () => { diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts index c66b1e28..ef282149 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts @@ -11,6 +11,8 @@ import { assertBucket, emptyBucket } from '../utils/s3Utils' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import { deleteQueue } from '@message-queue-toolkit/sqs' +import { deleteTopic } from '../../lib/utils/snsUtils' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' @@ -49,6 +51,14 @@ describe('SnsSqsPermissionConsumer', () => { publisher = new SnsPermissionPublisher(diContainer.cradle, { payloadStoreConfig, }) + + await deleteQueue(diContainer.cradle.sqsClient, SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME) + await deleteTopic( + diContainer.cradle.snsClient, + diContainer.cradle.stsClient, + SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME, + ) + await consumer.start() await publisher.init() }) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 3a09fe43..bf928bfd 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -1,35 +1,38 @@ import { setTimeout } from 'node:timers/promises' - -import type { SNSClient } from '@aws-sdk/client-sns' +import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' -import type { AwilixContainer } from 'awilix' -import { asValue } from 'awilix' -import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { type AwilixContainer, asValue } from 'awilix' +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' describe('SnsSqsPermissionConsumer', () => { describe('init', () => { const queueName = 'some-queue' + const topicNome = 'some-topic' let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient beforeAll(async () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient }) beforeEach(async () => { await deleteQueue(sqsClient, queueName) + await deleteTopic(snsClient, stsClient, topicNome) }) // FixMe https://github.com/localstack/localstack/issues/9306 @@ -54,8 +57,8 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, }) - const arn = await assertTopic(snsClient, { - Name: 'existingTopic', + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, }) const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { @@ -76,41 +79,42 @@ describe('SnsSqsPermissionConsumer', () => { expect(newConsumer.subscriptionProps.subscriptionArn).toBe( 'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395', ) - await deleteTopic(snsClient, 'existingTopic') }) it('does not create a new topic when mixed locator is passed', async () => { - const arn = await assertTopic(snsClient, { - Name: 'existingTopic', + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, }) const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { locatorConfig: { - topicName: 'existingTopic', + topicName: topicNome, }, creationConfig: { queue: { - QueueName: 'newQueue', + QueueName: queueName, }, }, }) await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/newQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) - expect(newConsumer.subscriptionProps.queueName).toBe('newQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) expect(newConsumer.subscriptionProps.topicArn).toEqual(arn) expect(newConsumer.subscriptionProps.subscriptionArn).toMatch( - 'arn:aws:sns:eu-west-1:000000000000:existingTopic', + `arn:aws:sns:eu-west-1:000000000000:${topicNome}:`, ) - await deleteTopic(snsClient, 'existingTopic') }) describe('tags update', () => { - const getTags = (queueUrl: string) => + const getQueueTags = (queueUrl: string) => sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) + const getTopicTags = (arn: string) => + snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) + it('updates existing queue tags when update is forced', async () => { const initialTags = { project: 'some-project', @@ -126,7 +130,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, tags: initialTags, }) - const preTags = await getTags(assertResult.queueUrl) + const preTags = await getQueueTags(assertResult.queueUrl) expect(preTags.Tags).toEqual(initialTags) const sqsSpy = vi.spyOn(sqsClient, 'send') @@ -134,7 +138,7 @@ describe('SnsSqsPermissionConsumer', () => { const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { creationConfig: { topic: { - Name: 'some-topic', + Name: topicNome, }, queue: { QueueName: queueName, @@ -156,7 +160,7 @@ describe('SnsSqsPermissionConsumer', () => { }) expect(updateCall).toBeDefined() - const postTags = await getTags(assertResult.queueUrl) + const postTags = await getQueueTags(assertResult.queueUrl) expect(postTags.Tags).toEqual({ ...newTags, leftover: 'some-leftover', @@ -173,7 +177,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, tags: initialTags, }) - const preTags = await getTags(assertResult.queueUrl) + const preTags = await getQueueTags(assertResult.queueUrl) expect(preTags.Tags).toEqual(initialTags) const sqsSpy = vi.spyOn(sqsClient, 'send') @@ -202,9 +206,150 @@ describe('SnsSqsPermissionConsumer', () => { }) expect(updateCall).toBeUndefined() - const postTags = await getTags(assertResult.queueUrl) + const postTags = await getQueueTags(assertResult.queueUrl) expect(postTags.Tags).toEqual(initialTags) }) + + it('updates existing topic tags when update is forced', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTopicTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: newTags }, + queue: { QueueName: queueName }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + await consumer.init() + + const updateCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTopicTags(arn) + const tags = postTags.Tags + expect(tags).toHaveLength(4) + expect(postTags.Tags).toEqual( + expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) + }) + + it('should throw error if tags are different and force tag update is not true', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTopicTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: [{ Key: 'example', Value: 'should fail' }] }, + queue: { QueueName: queueName }, + }, + }) + + await expect(consumer.init()).rejects.toThrowError( + /Topic already exists with different tags/, + ) + }) + + it('updates existing queue and topic tags when update is forced', async () => { + const initialTopicTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTopicTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTopicTags, + }) + const preTopicTags = await getTopicTags(arn) + expect(preTopicTags.Tags).toEqual(initialTopicTags) + + const initialQueueTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + const newQueueTags = { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + } + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialQueueTags, + }) + const preQueueTags = await getQueueTags(assertResult.queueUrl) + expect(preQueueTags.Tags).toEqual(initialQueueTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: newTopicTags }, + queue: { QueueName: queueName, tags: newQueueTags }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') + await consumer.init() + + const updateTopicCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateTopicCall).toBeDefined() + + const postTopicTags = await getTopicTags(arn) + const tags = postTopicTags.Tags + expect(tags).toHaveLength(4) + expect(postTopicTags.Tags).toEqual( + expect.arrayContaining([...newTopicTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) + + const updateQueueCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateQueueCall).toBeDefined() + + const postQueueTags = await getQueueTags(assertResult.queueUrl) + expect(postQueueTags.Tags).toEqual({ + ...newQueueTags, + leftover: 'some-leftover', + }) + }) }) describe('attributes update', () => { @@ -411,13 +556,14 @@ describe('SnsSqsPermissionConsumer', () => { describe('preHandlers', () => { let diContainer: AwilixContainer let publisher: SnsPermissionPublisher - beforeEach(async () => { + + beforeAll(async () => { diContainer = await registerDependencies({}, false) publisher = diContainer.cradle.permissionPublisher await publisher.init() }) - afterEach(async () => { + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -522,16 +668,15 @@ describe('SnsSqsPermissionConsumer', () => { let diContainer: AwilixContainer let publisher: SnsPermissionPublisher let consumer: SnsSqsPermissionConsumer - beforeEach(async () => { + + beforeAll(async () => { diContainer = await registerDependencies() publisher = diContainer.cradle.permissionPublisher consumer = diContainer.cradle.permissionConsumer }) - afterEach(async () => { - const { awilixManager } = diContainer.cradle - - await awilixManager.executeDispose() + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -566,14 +711,19 @@ describe('SnsSqsPermissionConsumer', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer - beforeEach(async () => { + beforeAll(async () => { diContainer = await registerDependencies({ permissionConsumer: asValue(() => undefined), permissionPublisher: asValue(() => undefined), }) }) - afterEach(async () => { + beforeEach(async () => { + await deleteQueue(diContainer.cradle.sqsClient, queueName) + await deleteTopic(diContainer.cradle.snsClient, diContainer.cradle.stsClient, topicName) + }) + + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -641,14 +791,19 @@ describe('SnsSqsPermissionConsumer', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer - beforeEach(async () => { + beforeAll(async () => { diContainer = await registerDependencies({ permissionConsumer: asValue(() => undefined), permissionPublisher: asValue(() => undefined), }) }) - afterEach(async () => { + beforeEach(async () => { + await deleteQueue(diContainer.cradle.sqsClient, queueName) + await deleteTopic(diContainer.cradle.snsClient, diContainer.cradle.stsClient, topicName) + }) + + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index a55b61b8..3a24cc51 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -141,7 +141,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< ) .build(), deletionConfig: options.deletionConfig ?? { - deleteIfExists: true, + deleteIfExists: false, }, payloadStoreConfig: options.payloadStoreConfig, consumerOverrides: options.consumerOverrides ?? { diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts index 552ea448..cad03160 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts @@ -25,6 +25,7 @@ import { assertBucket, getObjectContent } from '../utils/s3Utils' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsPermissionPublisher } from './SnsPermissionPublisher' const queueName = 'payloadOffloadingTestQueue' @@ -37,6 +38,7 @@ describe('SnsPermissionPublisher', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient let s3: S3 let payloadStoreConfig: PayloadStoreConfig @@ -51,6 +53,7 @@ describe('SnsPermissionPublisher', () => { }) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient s3 = diContainer.cradle.s3 await assertBucket(s3, s3BucketName) @@ -64,13 +67,14 @@ describe('SnsPermissionPublisher', () => { beforeEach(async () => { await deleteQueue(sqsClient, queueName) - await deleteTopic(snsClient, SnsPermissionPublisher.TOPIC_NAME) + await deleteTopic(snsClient, stsClient, SnsPermissionPublisher.TOPIC_NAME) const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName, }) await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName }, { Name: SnsPermissionPublisher.TOPIC_NAME, diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 37d3288b..0f21d877 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,4 +1,4 @@ -import type { SNSClient } from '@aws-sdk/client-sns' +import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' @@ -6,7 +6,7 @@ import type { SQSMessage } from '@message-queue-toolkit/sqs' import { FakeConsumerErrorResolver, assertQueue, deleteQueue } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' import { Consumer } from 'sqs-consumer' -import { afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { deserializeSNSMessage } from '../../lib/utils/snsMessageDeserializer' import { subscribeToTopic } from '../../lib/utils/snsSubscriber' @@ -19,17 +19,25 @@ import { PERMISSIONS_ADD_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsPermissionPublisher } from './SnsPermissionPublisher' -const queueName = 'someQueue' - describe('SnsPermissionPublisher', () => { describe('init', () => { + const topicNome = 'existingTopic' + let diContainer: AwilixContainer let snsClient: SNSClient + let stsClient: STSClient + beforeAll(async () => { diContainer = await registerDependencies() snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient + }) + + beforeEach(async () => { + await deleteTopic(snsClient, stsClient, topicNome) }) it('sets correct policy when policy fields are set', async () => { @@ -83,8 +91,8 @@ describe('SnsPermissionPublisher', () => { }) it('does not create a new queue when queue locator is passed', async () => { - const arn = await assertTopic(snsClient, { - Name: 'existingTopic', + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, }) const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { @@ -95,24 +103,99 @@ describe('SnsPermissionPublisher', () => { await newPublisher.init() expect(newPublisher.topicArnProp).toEqual(arn) - await deleteTopic(snsClient, 'existingTopic') + }) + + describe('tags', () => { + const getTags = (arn: string) => + snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) + + it('updates existing topic tags when update is forced', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: newTags }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + await newPublisher.init() + + const updateCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTags(arn) + const tags = postTags.Tags + expect(tags).toHaveLength(4) + expect(postTags.Tags).toEqual( + expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) + }) + + it('should throw error if tags are different and force tag update is not true', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: [{ Key: 'example', Value: 'should fail' }] }, + }, + }) + + await expect(newPublisher.init()).rejects.toThrowError( + /Topic already exists with different tags/, + ) + }) }) }) describe('publish', () => { + const queueName = 'someQueue' + let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient + let consumer: Consumer beforeEach(async () => { diContainer = await registerDependencies() sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient await diContainer.cradle.permissionConsumer.close() await deleteQueue(sqsClient, queueName) - await deleteTopic(snsClient, SnsPermissionPublisher.TOPIC_NAME) + await deleteTopic(snsClient, stsClient, SnsPermissionPublisher.TOPIC_NAME) }) afterEach(async () => { @@ -137,6 +220,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, @@ -199,6 +283,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, @@ -267,6 +352,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, diff --git a/packages/sns/test/utils/snsSubscriber.spec.ts b/packages/sns/test/utils/snsSubscriber.spec.ts index 29084578..ed196c02 100644 --- a/packages/sns/test/utils/snsSubscriber.spec.ts +++ b/packages/sns/test/utils/snsSubscriber.spec.ts @@ -12,6 +12,7 @@ import { } from '../../lib/utils/snsUtils' import { FakeLogger } from '../fakes/FakeLogger' +import type { STSClient } from '@aws-sdk/client-sts' import type { Dependencies } from './testContext' import { registerDependencies } from './testContext' @@ -22,11 +23,13 @@ describe('snsSubscriber', () => { let diContainer: AwilixContainer let snsClient: SNSClient let sqsClient: SQSClient + let stsClient: STSClient beforeEach(async () => { diContainer = await registerDependencies({}, false) snsClient = diContainer.cradle.snsClient sqsClient = diContainer.cradle.sqsClient + stsClient = diContainer.cradle.stsClient }) afterEach(async () => { @@ -34,7 +37,7 @@ describe('snsSubscriber', () => { await awilixManager.executeDispose() await diContainer.dispose() - await deleteTopic(snsClient, TOPIC_NAME) + await deleteTopic(snsClient, stsClient, TOPIC_NAME) await deleteQueue(sqsClient, QUEUE_NAME) }) @@ -44,6 +47,7 @@ describe('snsSubscriber', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -63,6 +67,7 @@ describe('snsSubscriber', () => { subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -95,6 +100,7 @@ describe('snsSubscriber', () => { const subscription = await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -113,6 +119,7 @@ describe('snsSubscriber', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, diff --git a/packages/sns/test/utils/testContext.ts b/packages/sns/test/utils/testContext.ts index e4c16e1d..bbc803ab 100644 --- a/packages/sns/test/utils/testContext.ts +++ b/packages/sns/test/utils/testContext.ts @@ -21,6 +21,7 @@ import { SnsPublisherManager } from '../../lib/sns/SnsPublisherManager' import { SnsSqsPermissionConsumer } from '../consumers/SnsSqsPermissionConsumer' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' +import { STSClient } from '@aws-sdk/client-sts' import { CreateLocateConfigMixPublisher } from '../publishers/CreateLocateConfigMixPublisher' import { TEST_AWS_CONFIG } from './testSnsConfig' @@ -97,6 +98,14 @@ export async function registerDependencies( lifetime: Lifetime.SINGLETON, }, ), + stsClient: asFunction( + () => { + return new STSClient(TEST_AWS_CONFIG) + }, + { + lifetime: Lifetime.SINGLETON, + }, + ), s3: asFunction(() => { return new S3(TEST_AWS_CONFIG) }), @@ -182,6 +191,7 @@ export interface Dependencies { logger: Logger sqsClient: SQSClient snsClient: SNSClient + stsClient: STSClient s3: S3 awilixManager: AwilixManager