From 2064658274e368cdb33a0f1fdb2c651ff652c603 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 21:14:31 +0200 Subject: [PATCH 01/23] Testing SNS tags update --- .../publishers/SnsPermissionPublisher.spec.ts | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 37d3288b..87749b26 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,4 +1,4 @@ -import type { SNSClient } from '@aws-sdk/client-sns' +import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' @@ -21,10 +21,10 @@ import type { Dependencies } from '../utils/testContext' import { SnsPermissionPublisher } from './SnsPermissionPublisher' -const queueName = 'someQueue' - describe('SnsPermissionPublisher', () => { describe('init', () => { + const topicNome = 'existingTopic' + let diContainer: AwilixContainer let snsClient: SNSClient beforeAll(async () => { @@ -84,7 +84,7 @@ describe('SnsPermissionPublisher', () => { it('does not create a new queue when queue locator is passed', async () => { const arn = await assertTopic(snsClient, { - Name: 'existingTopic', + Name: topicNome, }) const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { @@ -95,11 +95,42 @@ describe('SnsPermissionPublisher', () => { await newPublisher.init() expect(newPublisher.topicArnProp).toEqual(arn) - await deleteTopic(snsClient, 'existingTopic') + await deleteTopic(snsClient, topicNome) + }) + + // TESTING HOW SNS TAGS UPDATE WORKS + it.skip('to be removed', async () => { + await deleteTopic(snsClient, topicNome) + const arn = await assertTopic(snsClient, { + Name: topicNome, + Tags: [ + { Key: 'hello', Value: 'world' }, + { Key: 'goodbye', Value: 'world' }, + ], + }) + + const command = new ListTagsForResourceCommand({ ResourceArn: arn }) + const res = await snsClient.send(command) + console.log(res.Tags) + + const updateCommand = new TagResourceCommand({ + ResourceArn: arn, + Tags: [ + { Key: 'hello', Value: 'friend' }, + { Key: 'goodbye', Value: 'world' }, + ], + }) + await snsClient.send(updateCommand) + + const command2 = new ListTagsForResourceCommand({ ResourceArn: arn }) + const res2 = await snsClient.send(command2) + console.log(res2.Tags) }) }) describe('publish', () => { + const queueName = 'someQueue' + let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient From 250104b7bdb8b702f52aaf9efa04613d47b3d955 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 23:36:12 +0200 Subject: [PATCH 02/23] SNS publisher update topic tags --- packages/sns/lib/sns/AbstractSnsService.ts | 1 + packages/sns/lib/utils/snsUtils.ts | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index cbae6ae1..fcfb6137 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -31,6 +31,7 @@ export type SNSTopicConfig = { export type ExtraSNSCreationParams = { queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[] allowedSourceOwner?: string + forceTagUpdate?: boolean } export type SNSCreationConfig = { diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index 925c06d6..f5285073 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,6 +1,7 @@ import { type CreateTopicCommandInput, type SNSClient, + TagResourceCommand, paginateListTopics, } from '@aws-sdk/client-sns' import { @@ -102,6 +103,13 @@ export async function assertTopic( }) await snsClient.send(setTopicAttributesCommand) } + if (extraParams?.forceTagUpdate) { + const tagTopicCommand = new TagResourceCommand({ + ResourceArn: topicArn, + Tags: topicOptions.Tags, + }) + await snsClient.send(tagTopicCommand) + } return topicArn } From 265748a126913b5bd520fa1ffd608c38c703ec7e Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 15 Oct 2024 13:38:12 +0200 Subject: [PATCH 03/23] Adding tags update --- packages/sns/lib/utils/snsInitter.ts | 1 + packages/sns/lib/utils/snsUtils.ts | 3 +++ 2 files changed, 4 insertions(+) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 2ef2be97..a5a7e8c0 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -230,6 +230,7 @@ export async function initSns( const topicArn = await assertTopic(snsClient, creationConfig.topic!, { queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: creationConfig.allowedSourceOwner, + forceTagUpdate: creationConfig.forceTagUpdate, }) return { topicArn, diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index f5285073..c3bfcb90 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,5 +1,7 @@ import { type CreateTopicCommandInput, + ListTagsForResourceCommand, + ListTopicsCommand, type SNSClient, TagResourceCommand, paginateListTopics, @@ -83,6 +85,7 @@ export async function assertTopic( topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { + 'aws:sns:region:account_id:topic_name' const command = new CreateTopicCommand(topicOptions) const response = await snsClient.send(command) From 62d240cd40b78b2ee62aa161257b96121f3a743b Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 15 Oct 2024 13:40:06 +0200 Subject: [PATCH 04/23] Adding tests --- .../publishers/SnsPermissionPublisher.spec.ts | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 87749b26..d9be23ac 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,12 +1,12 @@ import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' -import type { SQSClient } from '@aws-sdk/client-sqs' +import { ListQueueTagsCommand, SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' import type { SQSMessage } from '@message-queue-toolkit/sqs' import { FakeConsumerErrorResolver, assertQueue, deleteQueue } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' import { Consumer } from 'sqs-consumer' -import { afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { deserializeSNSMessage } from '../../lib/utils/snsMessageDeserializer' import { subscribeToTopic } from '../../lib/utils/snsSubscriber' @@ -98,6 +98,52 @@ describe('SnsPermissionPublisher', () => { await deleteTopic(snsClient, topicNome) }) + describe('tags', () => { + const getTags = (arn: string) => + snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) + + it('updates existing queue tags when update is forced', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { + Name: topicNome, + Tags: newTags, + }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + await newPublisher.init() + + const updateCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTags(arn) + expect(postTags.Tags).toEqual([...newTags, { Key: 'leftover', Value: 'some-leftover' }]) + }) + }) + // TESTING HOW SNS TAGS UPDATE WORKS it.skip('to be removed', async () => { await deleteTopic(snsClient, topicNome) From 81e92791f15e045e62f89fdecf7929aa797f5f39 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:47:58 +0200 Subject: [PATCH 05/23] Adding aws-sdk sts --- packages/sns/package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 6903e69d..1876523e 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -32,6 +32,7 @@ "peerDependencies": { "@aws-sdk/client-sns": "^3.632.0", "@aws-sdk/client-sqs": "^3.632.0", + "@aws-sdk/client-sts": "^3.632.0", "@message-queue-toolkit/core": ">=15.0.0", "@message-queue-toolkit/schemas": ">=2.0.0", "@message-queue-toolkit/sqs": "^17.0.0" @@ -40,11 +41,11 @@ "@aws-sdk/client-s3": "^3.670.0", "@aws-sdk/client-sns": "^3.670.0", "@aws-sdk/client-sqs": "^3.670.0", + "@biomejs/biome": "1.9.3", + "@kibertoad/biome-config": "^1.2.1", "@message-queue-toolkit/core": "*", "@message-queue-toolkit/s3-payload-store": "*", "@message-queue-toolkit/sqs": "*", - "@biomejs/biome": "1.9.3", - "@kibertoad/biome-config": "^1.2.1", "@types/node": "^22.7.5", "@vitest/coverage-v8": "^2.1.2", "awilix": "^12.0.1", From b0f3fdfbd4c880ad40db92187a10585155bc9bb6 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:53:51 +0200 Subject: [PATCH 06/23] Minor test changes --- .../test/publishers/SnsPermissionPublisher.spec.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index d9be23ac..36c92345 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,5 +1,5 @@ -import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' -import { ListQueueTagsCommand, SQSClient } from '@aws-sdk/client-sqs' +import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' +import type { SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' import type { SQSMessage } from '@message-queue-toolkit/sqs' @@ -32,6 +32,10 @@ describe('SnsPermissionPublisher', () => { snsClient = diContainer.cradle.snsClient }) + beforeEach(async () => { + await deleteTopic(snsClient, topicNome) + }) + it('sets correct policy when policy fields are set', async () => { const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { creationConfig: { @@ -95,7 +99,6 @@ describe('SnsPermissionPublisher', () => { await newPublisher.init() expect(newPublisher.topicArnProp).toEqual(arn) - await deleteTopic(snsClient, topicNome) }) describe('tags', () => { @@ -123,10 +126,7 @@ describe('SnsPermissionPublisher', () => { const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { creationConfig: { - topic: { - Name: topicNome, - Tags: newTags, - }, + topic: { Name: topicNome, Tags: newTags }, forceTagUpdate: true, }, }) From c64167bb54c932338f2ee1842e7bb92a4c614066 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:54:00 +0200 Subject: [PATCH 07/23] Localstack enabling sts --- docker-compose.yml | 2 +- packages/sns/docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8e7d177d..395348e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock diff --git a/packages/sns/docker-compose.yml b/packages/sns/docker-compose.yml index f7d6d6fc..f11269f0 100644 --- a/packages/sns/docker-compose.yml +++ b/packages/sns/docker-compose.yml @@ -7,7 +7,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock From 5cd5f74b4af39f791b5a89755981a2de77d01f65 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:54:43 +0200 Subject: [PATCH 08/23] Implementing tags update + simple tests --- packages/sns/lib/utils/snsUtils.ts | 56 +++++++++++++++---- .../publishers/SnsPermissionPublisher.spec.ts | 35 ++---------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index c3bfcb90..f8fde8ae 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,7 +1,5 @@ import { type CreateTopicCommandInput, - ListTagsForResourceCommand, - ListTopicsCommand, type SNSClient, TagResourceCommand, paginateListTopics, @@ -15,11 +13,12 @@ import { SetTopicAttributesCommand, UnsubscribeCommand, } from '@aws-sdk/client-sns' -import type { Either } from '@lokalise/node-core' +import { type Either, isError } from '@lokalise/node-core' import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from '@message-queue-toolkit/sqs' import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' +import { GetCallerIdentityCommand, STSClient } from '@aws-sdk/client-sts' import { generateTopicSubscriptionPolicy } from './snsAttributeUtils' type AttributesResult = { @@ -85,14 +84,19 @@ export async function assertTopic( topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { - 'aws:sns:region:account_id:topic_name' - const command = new CreateTopicCommand(topicOptions) - const response = await snsClient.send(command) - - if (!response.TopicArn) { - throw new Error('No topic arn in response') + let topicArn: string + try { + const command = new CreateTopicCommand(topicOptions) + const response = await snsClient.send(command) + if (!response.TopicArn) throw new Error('No topic arn in response') + topicArn = response.TopicArn + } catch (err) { + // We only manually build ARN in case of tag update + if (!extraParams?.forceTagUpdate) throw err + // To build ARN we need topic name and error should be "topic already exist with different tags" + if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err + topicArn = await buildTopicArn(snsClient, topicOptions.Name) } - const topicArn = response.TopicArn if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) { const setTopicAttributesCommand = new SetTopicAttributesCommand({ @@ -189,3 +193,35 @@ export async function getTopicArnByName(snsClient: SNSClient, topicName?: string */ export const calculateOutgoingMessageSize = (message: unknown) => sqsCalculateOutgoingMessageSize(message) + +const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) => + !!error && + isError(error) && + 'Error' in error && + !!error.Error && + typeof error.Error === 'object' && + 'Code' in error.Error && + 'Message' in error.Error && + typeof error.Error.Message === 'string' && + error.Error.Code === 'InvalidParameter' && + error.Error.Message.includes('already exists with different tags') + +/** + * Manually builds the ARN of a topic based on the current AWS account and the topic name. + * It follows the following pattern: arn:aws:sns::: + * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + */ +const buildTopicArn = async (client: SNSClient, topicName: string) => { + const region = + typeof client.config.region === 'string' ? client.config.region : await client.config.region() + + const stsClient = new STSClient({ + endpoint: client.config.endpoint, + region, + credentials: client.config.credentials, + endpointProvider: client.config.endpointProvider, + }) + const identityResponse = await stsClient.send(new GetCallerIdentityCommand({})) + + return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` +} diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 36c92345..a88bb424 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -140,38 +140,13 @@ describe('SnsPermissionPublisher', () => { expect(updateCall).toBeDefined() const postTags = await getTags(arn) - expect(postTags.Tags).toEqual([...newTags, { Key: 'leftover', Value: 'some-leftover' }]) + const tags = postTags.Tags + expect(tags).toHaveLength(4) + expect(postTags.Tags).toEqual( + expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) }) }) - - // TESTING HOW SNS TAGS UPDATE WORKS - it.skip('to be removed', async () => { - await deleteTopic(snsClient, topicNome) - const arn = await assertTopic(snsClient, { - Name: topicNome, - Tags: [ - { Key: 'hello', Value: 'world' }, - { Key: 'goodbye', Value: 'world' }, - ], - }) - - const command = new ListTagsForResourceCommand({ ResourceArn: arn }) - const res = await snsClient.send(command) - console.log(res.Tags) - - const updateCommand = new TagResourceCommand({ - ResourceArn: arn, - Tags: [ - { Key: 'hello', Value: 'friend' }, - { Key: 'goodbye', Value: 'world' }, - ], - }) - await snsClient.send(updateCommand) - - const command2 = new ListTagsForResourceCommand({ ResourceArn: arn }) - const res2 = await snsClient.send(command2) - console.log(res2.Tags) - }) }) describe('publish', () => { From 33a1cd0eb885066bc82f1723094d9ad4d91c4a99 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 16:30:58 +0200 Subject: [PATCH 09/23] Adding stsClient as a new dependency to avoid creation --- packages/sns/lib/sns/AbstractSnsService.ts | 13 +++++- .../sns/lib/sns/AbstractSnsSqsConsumer.ts | 6 +++ packages/sns/lib/sns/SnsPublisherManager.ts | 1 + packages/sns/lib/utils/snsAttributeUtils.ts | 16 ++++++++ packages/sns/lib/utils/snsInitter.ts | 13 ++++-- packages/sns/lib/utils/snsSubscriber.ts | 14 +++++-- packages/sns/lib/utils/snsUtils.ts | 41 +++++-------------- 7 files changed, 66 insertions(+), 38 deletions(-) diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index fcfb6137..13b1bbcf 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient, Tag } from '@aws-sdk/client-sn import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core' import { AbstractQueueService } from '@message-queue-toolkit/core' +import type { STSClient } from '@aws-sdk/client-sts' import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes' import { deleteSns, initSns } from '../utils/snsInitter' @@ -10,6 +11,7 @@ export const SNS_MESSAGE_MAX_SIZE = 256 * 1024 // 256KB export type SNSDependencies = QueueDependencies & { snsClient: SNSClient + stsClient: STSClient } export type SNSTopicAWSConfig = CreateTopicCommandInput @@ -60,6 +62,7 @@ export abstract class AbstractSnsService< SNSOptionsType > { protected readonly snsClient: SNSClient + protected readonly stsClient: STSClient // @ts-ignore protected topicArn: string @@ -67,14 +70,20 @@ export abstract class AbstractSnsService< super(dependencies, options) this.snsClient = dependencies.snsClient + this.stsClient = dependencies.stsClient } public async init() { if (this.deletionConfig && this.creationConfig) { - await deleteSns(this.snsClient, this.deletionConfig, this.creationConfig) + await deleteSns(this.snsClient, this.stsClient, this.deletionConfig, this.creationConfig) } - const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig) + const initResult = await initSns( + this.snsClient, + this.stsClient, + this.locatorConfig, + this.creationConfig, + ) this.topicArn = initResult.topicArn this.isInitted = true } diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 69c16c12..f180c068 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -12,10 +12,12 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter' import { readSnsMessage } from '../utils/snsMessageReader' import type { SNSSubscriptionOptions } from '../utils/snsSubscriber' +import type { STSClient } from '@aws-sdk/client-sts' import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService' export type SNSSQSConsumerDependencies = SQSConsumerDependencies & { snsClient: SNSClient + stsClient: STSClient } export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig @@ -53,6 +55,7 @@ export abstract class AbstractSnsSqsConsumer< > { private readonly subscriptionConfig?: SNSSubscriptionOptions private readonly snsClient: SNSClient + private readonly stsClient: STSClient // @ts-ignore protected topicArn: string @@ -74,6 +77,7 @@ export abstract class AbstractSnsSqsConsumer< this.subscriptionConfig = options.subscriptionConfig this.snsClient = dependencies.snsClient + this.stsClient = dependencies.stsClient } override async init(): Promise { @@ -81,6 +85,7 @@ export abstract class AbstractSnsSqsConsumer< await deleteSnsSqs( this.sqsClient, this.snsClient, + this.stsClient, this.deletionConfig, this.creationConfig.queue, this.creationConfig.topic, @@ -95,6 +100,7 @@ export abstract class AbstractSnsSqsConsumer< const initSnsSqsResult = await initSnsSqs( this.sqsClient, this.snsClient, + this.stsClient, this.locatorConfig, this.creationConfig, this.subscriptionConfig, diff --git a/packages/sns/lib/sns/SnsPublisherManager.ts b/packages/sns/lib/sns/SnsPublisherManager.ts index 572cc4fb..b20a5ff8 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.ts @@ -79,6 +79,7 @@ export class SnsPublisherManager< newPublisherOptions: options.newPublisherOptions, publisherDependencies: { snsClient: dependencies.snsClient, + stsClient: dependencies.stsClient, logger: dependencies.logger, errorReporter: dependencies.errorReporter, }, diff --git a/packages/sns/lib/utils/snsAttributeUtils.ts b/packages/sns/lib/utils/snsAttributeUtils.ts index 98b8369d..78acb84e 100644 --- a/packages/sns/lib/utils/snsAttributeUtils.ts +++ b/packages/sns/lib/utils/snsAttributeUtils.ts @@ -1,3 +1,4 @@ +import { GetCallerIdentityCommand, type STSClient } from '@aws-sdk/client-sts' import type { ZodSchema } from 'zod' // See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html @@ -61,3 +62,18 @@ export function generateFilterAttributes( FilterPolicyScope: 'MessageBody', } } + +/** + * Manually builds the ARN of a topic based on the current AWS account and the topic name. + * It follows the following pattern: arn:aws:sns::: + * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + * + * // TODO: add tests + */ +export const buildTopicArn = async (client: STSClient, topicName: string) => { + const identityResponse = await client.send(new GetCallerIdentityCommand({})) + const region = + typeof client.config.region === 'string' ? client.config.region : await client.config.region() + + return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` +} diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index a5a7e8c0..ebaad77c 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -8,6 +8,7 @@ import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService' import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer' +import type { STSClient } from '@aws-sdk/client-sts' import type { Either } from '@lokalise/node-core' import { type TopicResolutionOptions, isCreateTopicCommand } from '../types/TopicTypes' import type { SNSSubscriptionOptions } from './snsSubscriber' @@ -24,6 +25,7 @@ import { export async function initSnsSqs( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, locatorConfig?: SNSSQSQueueLocatorType, creationConfig?: SNSCreationConfig & SQSCreationConfig, subscriptionConfig?: SNSSubscriptionOptions, @@ -59,6 +61,7 @@ export async function initSnsSqs( const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( sqsClient, snsClient, + stsClient, creationConfig.queue, topicResolutionOptions, subscriptionConfig, @@ -132,6 +135,7 @@ export async function initSnsSqs( export async function deleteSnsSqs( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, deletionConfig: DeletionConfig, queueConfiguration: CreateQueueCommandInput, topicConfiguration: CreateTopicCommandInput | undefined, @@ -152,6 +156,7 @@ export async function deleteSnsSqs( const { subscriptionArn } = await subscribeToTopic( sqsClient, snsClient, + stsClient, queueConfiguration, topicConfiguration ?? topicLocator!, subscriptionConfiguration, @@ -176,13 +181,14 @@ export async function deleteSnsSqs( if (!topicName) { throw new Error('Failed to resolve topic name') } - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) } await deleteSubscription(snsClient, subscriptionArn) } export async function deleteSns( snsClient: SNSClient, + stsClient: STSClient, deletionConfig: DeletionConfig, creationConfig: SNSCreationConfig, ) { @@ -200,11 +206,12 @@ export async function deleteSns( throw new Error('topic.Name must be set for automatic deletion') } - await deleteTopic(snsClient, creationConfig.topic.Name) + await deleteTopic(snsClient, stsClient, creationConfig.topic.Name) } export async function initSns( snsClient: SNSClient, + stsClient: STSClient, locatorConfig?: SNSTopicLocatorType, creationConfig?: SNSCreationConfig, ) { @@ -227,7 +234,7 @@ export async function initSns( 'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory', ) } - const topicArn = await assertTopic(snsClient, creationConfig.topic!, { + const topicArn = await assertTopic(snsClient, stsClient, creationConfig.topic!, { queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: creationConfig.allowedSourceOwner, forceTagUpdate: creationConfig.forceTagUpdate, diff --git a/packages/sns/lib/utils/snsSubscriber.ts b/packages/sns/lib/utils/snsSubscriber.ts index d53f1dde..1d711977 100644 --- a/packages/sns/lib/utils/snsSubscriber.ts +++ b/packages/sns/lib/utils/snsSubscriber.ts @@ -8,6 +8,7 @@ import { assertQueue } from '@message-queue-toolkit/sqs' import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' +import type { STSClient } from '@aws-sdk/client-sts' import { type TopicResolutionOptions, isCreateTopicCommand, @@ -21,8 +22,9 @@ export type SNSSubscriptionOptions = Omit< > & { updateAttributesIfExists: boolean } async function resolveTopicArnToSubscribeTo( - topicConfiguration: TopicResolutionOptions, snsClient: SNSClient, + stsClient: STSClient, + topicConfiguration: TopicResolutionOptions, extraParams: (ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams) | undefined, ) { //If topicArn is present, let's use it and return early. @@ -32,7 +34,7 @@ async function resolveTopicArnToSubscribeTo( //If input configuration is capable of creating a topic, let's create it and return its ARN. if (isCreateTopicCommand(topicConfiguration)) { - return await assertTopic(snsClient, topicConfiguration, { + return await assertTopic(snsClient, stsClient, topicConfiguration, { queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: extraParams?.allowedSourceOwner, }) @@ -45,12 +47,18 @@ async function resolveTopicArnToSubscribeTo( export async function subscribeToTopic( sqsClient: SQSClient, snsClient: SNSClient, + stsClient: STSClient, queueConfiguration: CreateQueueCommandInput, topicConfiguration: TopicResolutionOptions, subscriptionConfiguration: SNSSubscriptionOptions, extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams, ) { - const topicArn = await resolveTopicArnToSubscribeTo(topicConfiguration, snsClient, extraParams) + const topicArn = await resolveTopicArnToSubscribeTo( + snsClient, + stsClient, + topicConfiguration, + extraParams, + ) const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, { topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix, diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index f8fde8ae..4e93136a 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -18,8 +18,8 @@ import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' -import { GetCallerIdentityCommand, STSClient } from '@aws-sdk/client-sts' -import { generateTopicSubscriptionPolicy } from './snsAttributeUtils' +import type { STSClient } from '@aws-sdk/client-sts' +import { buildTopicArn, generateTopicSubscriptionPolicy } from './snsAttributeUtils' type AttributesResult = { attributes?: Record @@ -81,6 +81,7 @@ export async function getSubscriptionAttributes( export async function assertTopic( snsClient: SNSClient, + stsClient: STSClient, topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { @@ -95,7 +96,7 @@ export async function assertTopic( if (!extraParams?.forceTagUpdate) throw err // To build ARN we need topic name and error should be "topic already exist with different tags" if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err - topicArn = await buildTopicArn(snsClient, topicOptions.Name) + topicArn = await buildTopicArn(stsClient, topicOptions.Name) } if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) { @@ -121,17 +122,17 @@ export async function assertTopic( return topicArn } -export async function deleteTopic(client: SNSClient, topicName: string) { +export async function deleteTopic(snsClient: SNSClient, stsClient: STSClient, topicName: string) { try { - const topicArn = await assertTopic(client, { + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName, }) - const command = new DeleteTopicCommand({ - TopicArn: topicArn, - }) - - await client.send(command) + await snsClient.send( + new DeleteTopicCommand({ + TopicArn: topicArn, + }), + ) } catch (_) { // we don't care it operation has failed } @@ -205,23 +206,3 @@ const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) => typeof error.Error.Message === 'string' && error.Error.Code === 'InvalidParameter' && error.Error.Message.includes('already exists with different tags') - -/** - * Manually builds the ARN of a topic based on the current AWS account and the topic name. - * It follows the following pattern: arn:aws:sns::: - * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html - */ -const buildTopicArn = async (client: SNSClient, topicName: string) => { - const region = - typeof client.config.region === 'string' ? client.config.region : await client.config.region() - - const stsClient = new STSClient({ - endpoint: client.config.endpoint, - region, - credentials: client.config.credentials, - endpointProvider: client.config.endpointProvider, - }) - const identityResponse = await stsClient.send(new GetCallerIdentityCommand({})) - - return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` -} From 2557b24b8d607dd694e11248d7b8c32350b94e8c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 16:55:28 +0200 Subject: [PATCH 10/23] Tests fixes --- .../CreateLocateConfigMixConsumer.spec.ts | 7 +++++-- ...sPermissionConsumer.deadLetterQueue.spec.ts | 7 +++++-- .../consumers/SnsSqsPermissionConsumer.spec.ts | 11 +++++++---- ...rmissionPublisher.payloadOffloading.spec.ts | 6 +++++- .../publishers/SnsPermissionPublisher.spec.ts | 18 ++++++++++++++---- packages/sns/test/utils/snsSubscriber.spec.ts | 9 ++++++++- packages/sns/test/utils/testContext.ts | 10 ++++++++++ 7 files changed, 54 insertions(+), 14 deletions(-) diff --git a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts index f42f7981..8b54b9ec 100644 --- a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts +++ b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.spec.ts @@ -1,5 +1,6 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' +import type { STSClient } from '@aws-sdk/client-sts' import { deleteQueue } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' import { beforeAll, beforeEach, describe, it } from 'vitest' @@ -11,20 +12,22 @@ describe('CreateLocateConfigMixConsumer', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient beforeAll(async () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient }) beforeEach(async () => { await deleteQueue(sqsClient, CreateLocateConfigMixConsumer.CONSUMED_QUEUE_NAME) - await deleteTopic(snsClient, CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME) + await deleteTopic(snsClient, stsClient, CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME) }) it('accepts mixed config of create and locate', async () => { - await assertTopic(snsClient, { + await assertTopic(snsClient, stsClient, { Name: CreateLocateConfigMixConsumer.SUBSCRIBED_TOPIC_NAME, }) const consumer = new CreateLocateConfigMixConsumer(diContainer.cradle) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts index 183f9820..6cee19a0 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts @@ -16,6 +16,7 @@ import type { SnsPermissionPublisher } from '../publishers/SnsPermissionPublishe import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' import type { PERMISSIONS_REMOVE_MESSAGE_TYPE } from './userConsumerSchemas' @@ -28,6 +29,7 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient let publisher: SnsPermissionPublisher let consumer: SnsSqsPermissionConsumer | undefined @@ -36,13 +38,14 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient publisher = diContainer.cradle.permissionPublisher }) beforeEach(async () => { await deleteQueue(sqsClient, queueName) await deleteQueue(sqsClient, deadLetterQueueName) - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) }) afterEach(async () => { @@ -61,7 +64,7 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { beforeEach(async () => { await deleteQueue(sqsClient, queueName) await deleteQueue(sqsClient, deadLetterQueueName) - await deleteTopic(snsClient, topicName) + await deleteTopic(snsClient, stsClient, topicName) }) it('creates a new dead letter queue', async () => { diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 3a09fe43..534eda04 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -13,6 +13,7 @@ import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' describe('SnsSqsPermissionConsumer', () => { @@ -22,11 +23,13 @@ describe('SnsSqsPermissionConsumer', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient beforeAll(async () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient }) beforeEach(async () => { await deleteQueue(sqsClient, queueName) @@ -54,7 +57,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, }) - const arn = await assertTopic(snsClient, { + const arn = await assertTopic(snsClient, stsClient, { Name: 'existingTopic', }) @@ -76,11 +79,11 @@ describe('SnsSqsPermissionConsumer', () => { expect(newConsumer.subscriptionProps.subscriptionArn).toBe( 'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395', ) - await deleteTopic(snsClient, 'existingTopic') + await deleteTopic(snsClient, stsClient, 'existingTopic') }) it('does not create a new topic when mixed locator is passed', async () => { - const arn = await assertTopic(snsClient, { + const arn = await assertTopic(snsClient, stsClient, { Name: 'existingTopic', }) @@ -104,7 +107,7 @@ describe('SnsSqsPermissionConsumer', () => { expect(newConsumer.subscriptionProps.subscriptionArn).toMatch( 'arn:aws:sns:eu-west-1:000000000000:existingTopic', ) - await deleteTopic(snsClient, 'existingTopic') + await deleteTopic(snsClient, stsClient, 'existingTopic') }) describe('tags update', () => { diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts index 552ea448..cad03160 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts @@ -25,6 +25,7 @@ import { assertBucket, getObjectContent } from '../utils/s3Utils' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsPermissionPublisher } from './SnsPermissionPublisher' const queueName = 'payloadOffloadingTestQueue' @@ -37,6 +38,7 @@ describe('SnsPermissionPublisher', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient let s3: S3 let payloadStoreConfig: PayloadStoreConfig @@ -51,6 +53,7 @@ describe('SnsPermissionPublisher', () => { }) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient s3 = diContainer.cradle.s3 await assertBucket(s3, s3BucketName) @@ -64,13 +67,14 @@ describe('SnsPermissionPublisher', () => { beforeEach(async () => { await deleteQueue(sqsClient, queueName) - await deleteTopic(snsClient, SnsPermissionPublisher.TOPIC_NAME) + await deleteTopic(snsClient, stsClient, SnsPermissionPublisher.TOPIC_NAME) const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName, }) await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName }, { Name: SnsPermissionPublisher.TOPIC_NAME, diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index a88bb424..e51c71cf 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -19,6 +19,7 @@ import { PERMISSIONS_ADD_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import type { STSClient } from '@aws-sdk/client-sts' import { SnsPermissionPublisher } from './SnsPermissionPublisher' describe('SnsPermissionPublisher', () => { @@ -27,13 +28,16 @@ describe('SnsPermissionPublisher', () => { let diContainer: AwilixContainer let snsClient: SNSClient + let stsClient: STSClient + beforeAll(async () => { diContainer = await registerDependencies() snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient }) beforeEach(async () => { - await deleteTopic(snsClient, topicNome) + await deleteTopic(snsClient, stsClient, topicNome) }) it('sets correct policy when policy fields are set', async () => { @@ -87,7 +91,7 @@ describe('SnsPermissionPublisher', () => { }) it('does not create a new queue when queue locator is passed', async () => { - const arn = await assertTopic(snsClient, { + const arn = await assertTopic(snsClient, stsClient, { Name: topicNome, }) @@ -117,7 +121,7 @@ describe('SnsPermissionPublisher', () => { { Key: 'cc', Value: 'some-cc' }, ] - const arn = await assertTopic(snsClient, { + const arn = await assertTopic(snsClient, stsClient, { Name: topicNome, Tags: initialTags, }) @@ -155,16 +159,19 @@ describe('SnsPermissionPublisher', () => { let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + let stsClient: STSClient + let consumer: Consumer beforeEach(async () => { diContainer = await registerDependencies() sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient + stsClient = diContainer.cradle.stsClient await diContainer.cradle.permissionConsumer.close() await deleteQueue(sqsClient, queueName) - await deleteTopic(snsClient, SnsPermissionPublisher.TOPIC_NAME) + await deleteTopic(snsClient, stsClient, SnsPermissionPublisher.TOPIC_NAME) }) afterEach(async () => { @@ -189,6 +196,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, @@ -251,6 +259,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, @@ -319,6 +328,7 @@ describe('SnsPermissionPublisher', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: queueName, }, diff --git a/packages/sns/test/utils/snsSubscriber.spec.ts b/packages/sns/test/utils/snsSubscriber.spec.ts index 29084578..ed196c02 100644 --- a/packages/sns/test/utils/snsSubscriber.spec.ts +++ b/packages/sns/test/utils/snsSubscriber.spec.ts @@ -12,6 +12,7 @@ import { } from '../../lib/utils/snsUtils' import { FakeLogger } from '../fakes/FakeLogger' +import type { STSClient } from '@aws-sdk/client-sts' import type { Dependencies } from './testContext' import { registerDependencies } from './testContext' @@ -22,11 +23,13 @@ describe('snsSubscriber', () => { let diContainer: AwilixContainer let snsClient: SNSClient let sqsClient: SQSClient + let stsClient: STSClient beforeEach(async () => { diContainer = await registerDependencies({}, false) snsClient = diContainer.cradle.snsClient sqsClient = diContainer.cradle.sqsClient + stsClient = diContainer.cradle.stsClient }) afterEach(async () => { @@ -34,7 +37,7 @@ describe('snsSubscriber', () => { await awilixManager.executeDispose() await diContainer.dispose() - await deleteTopic(snsClient, TOPIC_NAME) + await deleteTopic(snsClient, stsClient, TOPIC_NAME) await deleteQueue(sqsClient, QUEUE_NAME) }) @@ -44,6 +47,7 @@ describe('snsSubscriber', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -63,6 +67,7 @@ describe('snsSubscriber', () => { subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -95,6 +100,7 @@ describe('snsSubscriber', () => { const subscription = await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, @@ -113,6 +119,7 @@ describe('snsSubscriber', () => { await subscribeToTopic( sqsClient, snsClient, + stsClient, { QueueName: QUEUE_NAME, }, diff --git a/packages/sns/test/utils/testContext.ts b/packages/sns/test/utils/testContext.ts index e4c16e1d..bbc803ab 100644 --- a/packages/sns/test/utils/testContext.ts +++ b/packages/sns/test/utils/testContext.ts @@ -21,6 +21,7 @@ import { SnsPublisherManager } from '../../lib/sns/SnsPublisherManager' import { SnsSqsPermissionConsumer } from '../consumers/SnsSqsPermissionConsumer' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' +import { STSClient } from '@aws-sdk/client-sts' import { CreateLocateConfigMixPublisher } from '../publishers/CreateLocateConfigMixPublisher' import { TEST_AWS_CONFIG } from './testSnsConfig' @@ -97,6 +98,14 @@ export async function registerDependencies( lifetime: Lifetime.SINGLETON, }, ), + stsClient: asFunction( + () => { + return new STSClient(TEST_AWS_CONFIG) + }, + { + lifetime: Lifetime.SINGLETON, + }, + ), s3: asFunction(() => { return new S3(TEST_AWS_CONFIG) }), @@ -182,6 +191,7 @@ export interface Dependencies { logger: Logger sqsClient: SQSClient snsClient: SNSClient + stsClient: STSClient s3: S3 awilixManager: AwilixManager From 51f5dffd616d35be58f25c93a30015bb15adfd27 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 17:09:16 +0200 Subject: [PATCH 11/23] Adding stsUtils + tests --- packages/sns/lib/utils/snsAttributeUtils.ts | 16 ---------------- packages/sns/lib/utils/stsUtils.spec.ts | 20 ++++++++++++++++++++ packages/sns/lib/utils/stsUtils.ts | 14 ++++++++++++++ 3 files changed, 34 insertions(+), 16 deletions(-) create mode 100644 packages/sns/lib/utils/stsUtils.spec.ts create mode 100644 packages/sns/lib/utils/stsUtils.ts diff --git a/packages/sns/lib/utils/snsAttributeUtils.ts b/packages/sns/lib/utils/snsAttributeUtils.ts index 78acb84e..98b8369d 100644 --- a/packages/sns/lib/utils/snsAttributeUtils.ts +++ b/packages/sns/lib/utils/snsAttributeUtils.ts @@ -1,4 +1,3 @@ -import { GetCallerIdentityCommand, type STSClient } from '@aws-sdk/client-sts' import type { ZodSchema } from 'zod' // See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_version.html @@ -62,18 +61,3 @@ export function generateFilterAttributes( FilterPolicyScope: 'MessageBody', } } - -/** - * Manually builds the ARN of a topic based on the current AWS account and the topic name. - * It follows the following pattern: arn:aws:sns::: - * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html - * - * // TODO: add tests - */ -export const buildTopicArn = async (client: STSClient, topicName: string) => { - const identityResponse = await client.send(new GetCallerIdentityCommand({})) - const region = - typeof client.config.region === 'string' ? client.config.region : await client.config.region() - - return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` -} diff --git a/packages/sns/lib/utils/stsUtils.spec.ts b/packages/sns/lib/utils/stsUtils.spec.ts new file mode 100644 index 00000000..8aa0901f --- /dev/null +++ b/packages/sns/lib/utils/stsUtils.spec.ts @@ -0,0 +1,20 @@ +import type { STSClient } from '@aws-sdk/client-sts' +import { beforeAll, describe, expect, it } from 'vitest' +import { registerDependencies } from '../../test/utils/testContext' +import { buildTopicArn } from './stsUtils' + +describe('stsUtils', () => { + let stsClient: STSClient + + beforeAll(async () => { + const diContainer = await registerDependencies({}, false) + stsClient = diContainer.cradle.stsClient + }) + + describe('buildTopicArn', () => { + it('build ARN for topic', async () => { + const topicName = 'my-test-topic' + await expect(buildTopicArn(stsClient, topicName)).resolves.toMatchInlineSnapshot(`"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`) + }) + }) +}) diff --git a/packages/sns/lib/utils/stsUtils.ts b/packages/sns/lib/utils/stsUtils.ts new file mode 100644 index 00000000..908bda8e --- /dev/null +++ b/packages/sns/lib/utils/stsUtils.ts @@ -0,0 +1,14 @@ +import { GetCallerIdentityCommand, type STSClient } from '@aws-sdk/client-sts' + +/** + * Manually builds the ARN of a topic based on the current AWS account and the topic name. + * It follows the following pattern: arn:aws:sns::: + * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + */ +export const buildTopicArn = async (client: STSClient, topicName: string) => { + const identityResponse = await client.send(new GetCallerIdentityCommand({})) + const region = + typeof client.config.region === 'string' ? client.config.region : await client.config.region() + + return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` +} From 33e951e3e70d73b8c5ecf3c5259ce1c543c6e255 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 17:45:16 +0200 Subject: [PATCH 12/23] sns publisher tag update tests --- .../publishers/SnsPermissionPublisher.spec.ts | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index e51c71cf..0f21d877 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -109,7 +109,7 @@ describe('SnsPermissionPublisher', () => { const getTags = (arn: string) => snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) - it('updates existing queue tags when update is forced', async () => { + it('updates existing topic tags when update is forced', async () => { const initialTags = [ { Key: 'project', Value: 'some-project' }, { Key: 'service', Value: 'some-service' }, @@ -150,6 +150,30 @@ describe('SnsPermissionPublisher', () => { expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), ) }) + + it('should throw error if tags are different and force tag update is not true', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: [{ Key: 'example', Value: 'should fail' }] }, + }, + }) + + await expect(newPublisher.init()).rejects.toThrowError( + /Topic already exists with different tags/, + ) + }) }) }) From 3cce29d5f9dc2ee7f615663af124f08152ce7b2c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:00:00 +0200 Subject: [PATCH 13/23] Import error fix --- packages/sns/lib/utils/snsUtils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index 4e93136a..bfb7cea1 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -19,7 +19,8 @@ import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' import type { STSClient } from '@aws-sdk/client-sts' -import { buildTopicArn, generateTopicSubscriptionPolicy } from './snsAttributeUtils' +import { generateTopicSubscriptionPolicy } from './snsAttributeUtils' +import { buildTopicArn } from './stsUtils' type AttributesResult = { attributes?: Record From b24adf8a93b9639818bd4a852fce2ab0ab5a1bdb Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:07:08 +0200 Subject: [PATCH 14/23] test improvement --- .../SnsSqsPermissionConsumer.spec.ts | 32 ++++++++++++------- .../consumers/SnsSqsPermissionConsumer.ts | 2 +- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 534eda04..f52ca5f0 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -414,13 +414,14 @@ describe('SnsSqsPermissionConsumer', () => { describe('preHandlers', () => { let diContainer: AwilixContainer let publisher: SnsPermissionPublisher - beforeEach(async () => { + + beforeAll(async () => { diContainer = await registerDependencies({}, false) publisher = diContainer.cradle.permissionPublisher await publisher.init() }) - afterEach(async () => { + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -525,16 +526,15 @@ describe('SnsSqsPermissionConsumer', () => { let diContainer: AwilixContainer let publisher: SnsPermissionPublisher let consumer: SnsSqsPermissionConsumer - beforeEach(async () => { + + beforeAll(async () => { diContainer = await registerDependencies() publisher = diContainer.cradle.permissionPublisher consumer = diContainer.cradle.permissionConsumer }) - afterEach(async () => { - const { awilixManager } = diContainer.cradle - - await awilixManager.executeDispose() + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -569,14 +569,19 @@ describe('SnsSqsPermissionConsumer', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer - beforeEach(async () => { + beforeAll(async () => { diContainer = await registerDependencies({ permissionConsumer: asValue(() => undefined), permissionPublisher: asValue(() => undefined), }) }) - afterEach(async () => { + beforeEach(async () => { + await deleteQueue(diContainer.cradle.sqsClient, queueName) + await deleteTopic(diContainer.cradle.snsClient, diContainer.cradle.stsClient, topicName) + }) + + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) @@ -644,14 +649,19 @@ describe('SnsSqsPermissionConsumer', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer - beforeEach(async () => { + beforeAll(async () => { diContainer = await registerDependencies({ permissionConsumer: asValue(() => undefined), permissionPublisher: asValue(() => undefined), }) }) - afterEach(async () => { + beforeEach(async () => { + await deleteQueue(diContainer.cradle.sqsClient, queueName) + await deleteTopic(diContainer.cradle.snsClient, diContainer.cradle.stsClient, topicName) + }) + + afterAll(async () => { await diContainer.cradle.awilixManager.executeDispose() await diContainer.dispose() }) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index a55b61b8..3a24cc51 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -141,7 +141,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< ) .build(), deletionConfig: options.deletionConfig ?? { - deleteIfExists: true, + deleteIfExists: false, }, payloadStoreConfig: options.payloadStoreConfig, consumerOverrides: options.consumerOverrides ?? { From d561fa2eee2858662edb9a7601398d847f7dacc4 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:09:38 +0200 Subject: [PATCH 15/23] SNS and sqs consumer is able to update tags --- packages/sns/lib/utils/snsSubscriber.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sns/lib/utils/snsSubscriber.ts b/packages/sns/lib/utils/snsSubscriber.ts index 1d711977..e09a594f 100644 --- a/packages/sns/lib/utils/snsSubscriber.ts +++ b/packages/sns/lib/utils/snsSubscriber.ts @@ -37,6 +37,7 @@ async function resolveTopicArnToSubscribeTo( return await assertTopic(snsClient, stsClient, topicConfiguration, { queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: extraParams?.allowedSourceOwner, + forceTagUpdate: extraParams?.forceTagUpdate, }) } From 5437f436a5e77c57a8146ef38c690759e52e3f94 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:19:52 +0200 Subject: [PATCH 16/23] sns sqs consumer tag update tests --- .../SnsSqsPermissionConsumer.spec.ts | 183 ++++++++++++++++-- 1 file changed, 164 insertions(+), 19 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index f52ca5f0..abc7a340 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -1,12 +1,10 @@ import { setTimeout } from 'node:timers/promises' - -import type { SNSClient } from '@aws-sdk/client-sns' +import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' -import type { AwilixContainer } from 'awilix' -import { asValue } from 'awilix' -import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { type AwilixContainer, asValue } from 'awilix' +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' @@ -19,6 +17,7 @@ import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' describe('SnsSqsPermissionConsumer', () => { describe('init', () => { const queueName = 'some-queue' + const topicNome = 'some-topic' let diContainer: AwilixContainer let sqsClient: SQSClient @@ -33,6 +32,7 @@ describe('SnsSqsPermissionConsumer', () => { }) beforeEach(async () => { await deleteQueue(sqsClient, queueName) + await deleteTopic(snsClient, stsClient, topicNome) }) // FixMe https://github.com/localstack/localstack/issues/9306 @@ -58,7 +58,7 @@ describe('SnsSqsPermissionConsumer', () => { }) const arn = await assertTopic(snsClient, stsClient, { - Name: 'existingTopic', + Name: topicNome, }) const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { @@ -79,41 +79,45 @@ describe('SnsSqsPermissionConsumer', () => { expect(newConsumer.subscriptionProps.subscriptionArn).toBe( 'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395', ) - await deleteTopic(snsClient, stsClient, 'existingTopic') }) it('does not create a new topic when mixed locator is passed', async () => { const arn = await assertTopic(snsClient, stsClient, { - Name: 'existingTopic', + Name: topicNome, }) const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { locatorConfig: { - topicName: 'existingTopic', + topicName: topicNome, }, creationConfig: { queue: { - QueueName: 'newQueue', + QueueName: queueName, }, }, }) await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/newQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) - expect(newConsumer.subscriptionProps.queueName).toBe('newQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) expect(newConsumer.subscriptionProps.topicArn).toEqual(arn) expect(newConsumer.subscriptionProps.subscriptionArn).toMatch( - 'arn:aws:sns:eu-west-1:000000000000:existingTopic', + `arn:aws:sns:eu-west-1:000000000000:${topicNome}:`, ) - await deleteTopic(snsClient, stsClient, 'existingTopic') + + await deleteTopic(snsClient, stsClient, topicNome) + await deleteQueue(sqsClient, queueName) }) describe('tags update', () => { - const getTags = (queueUrl: string) => + const getQueueTags = (queueUrl: string) => sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) + const getTopicTags = (arn: string) => + snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) + it('updates existing queue tags when update is forced', async () => { const initialTags = { project: 'some-project', @@ -129,7 +133,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, tags: initialTags, }) - const preTags = await getTags(assertResult.queueUrl) + const preTags = await getQueueTags(assertResult.queueUrl) expect(preTags.Tags).toEqual(initialTags) const sqsSpy = vi.spyOn(sqsClient, 'send') @@ -159,7 +163,7 @@ describe('SnsSqsPermissionConsumer', () => { }) expect(updateCall).toBeDefined() - const postTags = await getTags(assertResult.queueUrl) + const postTags = await getQueueTags(assertResult.queueUrl) expect(postTags.Tags).toEqual({ ...newTags, leftover: 'some-leftover', @@ -176,7 +180,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: queueName, tags: initialTags, }) - const preTags = await getTags(assertResult.queueUrl) + const preTags = await getQueueTags(assertResult.queueUrl) expect(preTags.Tags).toEqual(initialTags) const sqsSpy = vi.spyOn(sqsClient, 'send') @@ -205,9 +209,150 @@ describe('SnsSqsPermissionConsumer', () => { }) expect(updateCall).toBeUndefined() - const postTags = await getTags(assertResult.queueUrl) + const postTags = await getQueueTags(assertResult.queueUrl) expect(postTags.Tags).toEqual(initialTags) }) + + it('updates existing topic tags when update is forced', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTopicTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: newTags }, + queue: { QueueName: queueName }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + await consumer.init() + + const updateCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTopicTags(arn) + const tags = postTags.Tags + expect(tags).toHaveLength(4) + expect(postTags.Tags).toEqual( + expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) + }) + + it('should throw error if tags are different and force tag update is not true', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTopicTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: [{ Key: 'example', Value: 'should fail' }] }, + queue: { QueueName: queueName }, + }, + }) + + await expect(consumer.init()).rejects.toThrowError( + /Topic already exists with different tags/, + ) + }) + + it('updates existing queue and topic tags when update is forced', async () => { + const initialTopicTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTopicTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, stsClient, { + Name: topicNome, + Tags: initialTopicTags, + }) + const preTopicTags = await getTopicTags(arn) + expect(preTopicTags.Tags).toEqual(initialTopicTags) + + const initialQueueTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + const newQueueTags = { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + } + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialQueueTags, + }) + const preQueueTags = await getQueueTags(assertResult.queueUrl) + expect(preQueueTags.Tags).toEqual(initialQueueTags) + + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicNome, Tags: newTopicTags }, + queue: { QueueName: queueName, tags: newQueueTags }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') + await consumer.init() + + const updateTopicCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateTopicCall).toBeDefined() + + const postTopicTags = await getTopicTags(arn) + const tags = postTopicTags.Tags + expect(tags).toHaveLength(4) + expect(postTopicTags.Tags).toEqual( + expect.arrayContaining([...newTopicTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) + + const updateQueueCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateQueueCall).toBeDefined() + + const postQueueTags = await getQueueTags(assertResult.queueUrl) + expect(postQueueTags.Tags).toEqual({ + ...newQueueTags, + leftover: 'some-leftover', + }) + }) }) describe('attributes update', () => { From e8bd90e229d149a1203f2c77f8a36bd1da9e9b79 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:20:41 +0200 Subject: [PATCH 17/23] Release prepare --- packages/sns/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 1876523e..1763697f 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "17.3.0", + "version": "18.0.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", From 7bf729cca1477b835bb5d535eaa4e73e78604ae5 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:24:35 +0200 Subject: [PATCH 18/23] lint fix --- packages/sns/lib/utils/stsUtils.spec.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sns/lib/utils/stsUtils.spec.ts b/packages/sns/lib/utils/stsUtils.spec.ts index 8aa0901f..a1dba067 100644 --- a/packages/sns/lib/utils/stsUtils.spec.ts +++ b/packages/sns/lib/utils/stsUtils.spec.ts @@ -14,7 +14,9 @@ describe('stsUtils', () => { describe('buildTopicArn', () => { it('build ARN for topic', async () => { const topicName = 'my-test-topic' - await expect(buildTopicArn(stsClient, topicName)).resolves.toMatchInlineSnapshot(`"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`) + await expect(buildTopicArn(stsClient, topicName)).resolves.toMatchInlineSnapshot( + `"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`, + ) }) }) }) From 01307eba5d9e7d086a201852ee7d0409854098bd Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:34:34 +0200 Subject: [PATCH 19/23] Test fix --- packages/sns/lib/utils/snsUtils.ts | 2 +- packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index bfb7cea1..700ad5a5 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -112,7 +112,7 @@ export async function assertTopic( }) await snsClient.send(setTopicAttributesCommand) } - if (extraParams?.forceTagUpdate) { + if (extraParams?.forceTagUpdate && topicOptions.Tags) { const tagTopicCommand = new TagResourceCommand({ ResourceArn: topicArn, Tags: topicOptions.Tags, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index abc7a340..bf928bfd 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -106,9 +106,6 @@ describe('SnsSqsPermissionConsumer', () => { expect(newConsumer.subscriptionProps.subscriptionArn).toMatch( `arn:aws:sns:eu-west-1:000000000000:${topicNome}:`, ) - - await deleteTopic(snsClient, stsClient, topicNome) - await deleteQueue(sqsClient, queueName) }) describe('tags update', () => { @@ -141,7 +138,7 @@ describe('SnsSqsPermissionConsumer', () => { const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { creationConfig: { topic: { - Name: 'some-topic', + Name: topicNome, }, queue: { QueueName: queueName, From 729ca0ae1431fdf4961e22ca3616bd612a8f4b1c Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Wed, 23 Oct 2024 18:40:24 +0200 Subject: [PATCH 20/23] Test fix --- .../SnsSqsPermissionConsumer.payloadOffloading.spec.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts index c66b1e28..ef282149 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.payloadOffloading.spec.ts @@ -11,6 +11,8 @@ import { assertBucket, emptyBucket } from '../utils/s3Utils' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' +import { deleteQueue } from '@message-queue-toolkit/sqs' +import { deleteTopic } from '../../lib/utils/snsUtils' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' @@ -49,6 +51,14 @@ describe('SnsSqsPermissionConsumer', () => { publisher = new SnsPermissionPublisher(diContainer.cradle, { payloadStoreConfig, }) + + await deleteQueue(diContainer.cradle.sqsClient, SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME) + await deleteTopic( + diContainer.cradle.snsClient, + diContainer.cradle.stsClient, + SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME, + ) + await consumer.start() await publisher.init() }) From 5e5b28333452632a0f53ccb20ad47531cef2d6e6 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 28 Oct 2024 11:28:25 +0100 Subject: [PATCH 21/23] Improving tests --- packages/sns/lib/utils/stsUtils.spec.ts | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/sns/lib/utils/stsUtils.spec.ts b/packages/sns/lib/utils/stsUtils.spec.ts index a1dba067..69eb6615 100644 --- a/packages/sns/lib/utils/stsUtils.spec.ts +++ b/packages/sns/lib/utils/stsUtils.spec.ts @@ -1,22 +1,35 @@ +import type { SNSClient } from '@aws-sdk/client-sns' import type { STSClient } from '@aws-sdk/client-sts' -import { beforeAll, describe, expect, it } from 'vitest' +import { beforeAll, beforeEach, describe, expect, it } from 'vitest' import { registerDependencies } from '../../test/utils/testContext' -import { buildTopicArn } from './stsUtils' +import { assertTopic, deleteTopic } from './snsUtils' describe('stsUtils', () => { let stsClient: STSClient + let snsClient: SNSClient beforeAll(async () => { const diContainer = await registerDependencies({}, false) stsClient = diContainer.cradle.stsClient + snsClient = diContainer.cradle.snsClient }) describe('buildTopicArn', () => { + const topicName = 'my-test-topic' + + beforeEach(async () => { + await deleteTopic(snsClient, stsClient, topicName) + }) + it('build ARN for topic', async () => { - const topicName = 'my-test-topic' - await expect(buildTopicArn(stsClient, topicName)).resolves.toMatchInlineSnapshot( + const expectedTopicArn = `arn:aws:sns:eu-west-1:000000000000:${topicName}` + expect(expectedTopicArn).toMatchInlineSnapshot( `"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`, ) + + // creating real topic to make sure arn is correct + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + expect(topicArn).toBe(expectedTopicArn) }) }) }) From cf89e8963252c24c3b28c17792323b628614f39e Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 28 Oct 2024 12:07:54 +0100 Subject: [PATCH 22/23] Caching caller identity --- packages/sns/lib/utils/stsUtils.spec.ts | 29 +++++++++++++++++++--- packages/sns/lib/utils/stsUtils.ts | 33 +++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/packages/sns/lib/utils/stsUtils.spec.ts b/packages/sns/lib/utils/stsUtils.spec.ts index 69eb6615..e9584c91 100644 --- a/packages/sns/lib/utils/stsUtils.spec.ts +++ b/packages/sns/lib/utils/stsUtils.spec.ts @@ -1,8 +1,9 @@ import type { SNSClient } from '@aws-sdk/client-sns' import type { STSClient } from '@aws-sdk/client-sts' -import { beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { registerDependencies } from '../../test/utils/testContext' import { assertTopic, deleteTopic } from './snsUtils' +import { buildTopicArn, clearCachedCallerIdentity } from './stsUtils' describe('stsUtils', () => { let stsClient: STSClient @@ -19,17 +20,37 @@ describe('stsUtils', () => { beforeEach(async () => { await deleteTopic(snsClient, stsClient, topicName) + clearCachedCallerIdentity() }) it('build ARN for topic', async () => { - const expectedTopicArn = `arn:aws:sns:eu-west-1:000000000000:${topicName}` - expect(expectedTopicArn).toMatchInlineSnapshot( + const buildedTopicArn = await buildTopicArn(stsClient, topicName) + expect(buildedTopicArn).toMatchInlineSnapshot( `"arn:aws:sns:eu-west-1:000000000000:my-test-topic"`, ) // creating real topic to make sure arn is correct const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) - expect(topicArn).toBe(expectedTopicArn) + expect(topicArn).toBe(buildedTopicArn) + }) + + it('should be able to handle parallel calls getting caller identity only once', async () => { + const stsClientSpy = vi.spyOn(stsClient, 'send') + + const result = await Promise.all([ + buildTopicArn(stsClient, topicName), + buildTopicArn(stsClient, topicName), + buildTopicArn(stsClient, topicName), + ]) + + expect(result).toMatchInlineSnapshot(` + [ + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + "arn:aws:sns:eu-west-1:000000000000:my-test-topic", + ] + `) + expect(stsClientSpy).toHaveBeenCalledOnce() }) }) }) diff --git a/packages/sns/lib/utils/stsUtils.ts b/packages/sns/lib/utils/stsUtils.ts index 908bda8e..7d9a93fa 100644 --- a/packages/sns/lib/utils/stsUtils.ts +++ b/packages/sns/lib/utils/stsUtils.ts @@ -1,4 +1,11 @@ -import { GetCallerIdentityCommand, type STSClient } from '@aws-sdk/client-sts' +import { + GetCallerIdentityCommand, + type GetCallerIdentityCommandOutput, + type STSClient, +} from '@aws-sdk/client-sts' + +let callerIdentityPromise: Promise | undefined +let callerIdentityCached: GetCallerIdentityCommandOutput | undefined /** * Manually builds the ARN of a topic based on the current AWS account and the topic name. @@ -6,9 +13,31 @@ import { GetCallerIdentityCommand, type STSClient } from '@aws-sdk/client-sts' * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html */ export const buildTopicArn = async (client: STSClient, topicName: string) => { - const identityResponse = await client.send(new GetCallerIdentityCommand({})) + const identityResponse = await getAndCacheCallerIdentity(client) const region = typeof client.config.region === 'string' ? client.config.region : await client.config.region() return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` } + +export const clearCachedCallerIdentity = () => { + callerIdentityPromise = undefined + callerIdentityCached = undefined +} + +const getAndCacheCallerIdentity = async ( + client: STSClient, +): Promise => { + if (!callerIdentityCached) { + if (!callerIdentityPromise) { + callerIdentityPromise = client.send(new GetCallerIdentityCommand({})).then((response) => { + callerIdentityCached = response + }) + } + + await callerIdentityPromise + callerIdentityPromise = undefined + } + + return callerIdentityCached! +} From b594c80e5346dd1abd8ac4e2d4b276d0ae193666 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 28 Oct 2024 12:14:04 +0100 Subject: [PATCH 23/23] Exposing clearCachedCallerIdentity --- packages/sns/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sns/index.ts b/packages/sns/index.ts index ba79d820..604a9f08 100644 --- a/packages/sns/index.ts +++ b/packages/sns/index.ts @@ -33,6 +33,7 @@ export { findSubscriptionByTopicAndQueue, getSubscriptionAttributes, } from './lib/utils/snsUtils' +export { clearCachedCallerIdentity } from './lib/utils/stsUtils' export { subscribeToTopic } from './lib/utils/snsSubscriber' export { initSns, initSnsSqs } from './lib/utils/snsInitter'