From 92f999baf76669ddcffce1de38453fb405481106 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Sun, 13 Oct 2024 00:18:57 +0300 Subject: [PATCH 01/17] Add support for forcing tag update --- packages/sns/lib/utils/snsSubscriber.ts | 1 + packages/sqs/lib/sqs/AbstractSqsService.ts | 2 + packages/sqs/lib/utils/sqsInitter.ts | 15 +++- packages/sqs/lib/utils/sqsUtils.ts | 6 +- .../consumers/SqsPermissionConsumer.spec.ts | 81 +++++++++++++++++++ 5 files changed, 103 insertions(+), 2 deletions(-) diff --git a/packages/sns/lib/utils/snsSubscriber.ts b/packages/sns/lib/utils/snsSubscriber.ts index 03e2527c..d53f1dde 100644 --- a/packages/sns/lib/utils/snsSubscriber.ts +++ b/packages/sns/lib/utils/snsSubscriber.ts @@ -55,6 +55,7 @@ export async function subscribeToTopic( const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, { topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix, updateAttributesIfExists: extraParams?.updateAttributesIfExists, + forceTagUpdate: extraParams?.forceTagUpdate, }) const subscribeCommand = new SubscribeCommand({ diff --git a/packages/sqs/lib/sqs/AbstractSqsService.ts b/packages/sqs/lib/sqs/AbstractSqsService.ts index 09eb450d..72adc048 100644 --- a/packages/sqs/lib/sqs/AbstractSqsService.ts +++ b/packages/sqs/lib/sqs/AbstractSqsService.ts @@ -15,11 +15,13 @@ export type SQSDependencies = QueueDependencies & { export type ExtraSQSCreationParams = { topicArnsWithPublishPermissionsPrefix?: string updateAttributesIfExists?: boolean + forceTagUpdate?: boolean } export type SQSCreationConfig = { queue: CreateQueueRequest updateAttributesIfExists?: boolean + forceTagUpdate?: boolean } & ExtraSQSCreationParams export type SQSQueueLocatorType = { diff --git a/packages/sqs/lib/utils/sqsInitter.ts b/packages/sqs/lib/utils/sqsInitter.ts index 11c321f4..425f425e 100644 --- a/packages/sqs/lib/utils/sqsInitter.ts +++ b/packages/sqs/lib/utils/sqsInitter.ts @@ -1,4 +1,4 @@ -import { SetQueueAttributesCommand } from '@aws-sdk/client-sqs' +import { SetQueueAttributesCommand, TagQueueCommand } from '@aws-sdk/client-sqs' import type { QueueAttributeName, SQSClient } from '@aws-sdk/client-sqs' import type { DeletionConfig } from '@message-queue-toolkit/core' import { isProduction } from '@message-queue-toolkit/core' @@ -45,6 +45,18 @@ export async function updateQueueAttributes( await sqsClient.send(command) } +export async function updateQueueTags( + sqsClient: SQSClient, + queueUrl: string, + tags: Record | undefined = {}, +) { + const command = new TagQueueCommand({ + QueueUrl: queueUrl, + Tags: tags, + }) + await sqsClient.send(command) +} + export async function initSqs( sqsClient: SQSClient, locatorConfig?: Partial, @@ -82,6 +94,7 @@ export async function initSqs( const { queueUrl, queueArn } = await assertQueue(sqsClient, creationConfig.queue, { topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, updateAttributesIfExists: creationConfig.updateAttributesIfExists, + forceTagUpdate: creationConfig.forceTagUpdate, }) const queueName = creationConfig.queue.QueueName diff --git a/packages/sqs/lib/utils/sqsUtils.ts b/packages/sqs/lib/utils/sqsUtils.ts index 7be06c90..d5740d94 100644 --- a/packages/sqs/lib/utils/sqsUtils.ts +++ b/packages/sqs/lib/utils/sqsUtils.ts @@ -19,7 +19,7 @@ import { isShallowSubset, waitAndRetry } from '@message-queue-toolkit/core' import type { ExtraSQSCreationParams } from '../sqs/AbstractSqsService' import { generateQueuePublishForTopicPolicy } from './sqsAttributeUtils' -import { updateQueueAttributes } from './sqsInitter' +import { updateQueueAttributes, updateQueueTags } from './sqsInitter' const AWS_QUEUE_DOES_NOT_EXIST_ERROR_NAME = 'QueueDoesNotExist' @@ -122,6 +122,10 @@ async function updateExistingQueue( } } + if (extraParams?.forceTagUpdate) { + await updateQueueTags(sqsClient, queueUrl, queueConfig.tags) + } + return { queueUrl, queueArn, diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index e74b53bf..f8d3bd6c 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -108,6 +108,87 @@ describe('SqsPermissionConsumer', () => { }) }) + it('updates existing queue when update is forced', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + }, + }) + + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + }, + }, + forceTagUpdate: true, + }, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) + + const sqsSpy = vi.spyOn(sqsClient, 'send') + + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) + + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeDefined() + }) + + it('updates existing queue when update is not forced', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + }, + }) + + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + }, + }, + }, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) + + const sqsSpy = vi.spyOn(sqsClient, 'send') + + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) + + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeUndefined() + }) + it('does not update existing queue when attributes did not change', async () => { await assertQueue(sqsClient, { QueueName: queueName, From 1c743a1ed2a7284d19e06ed809e6d4a0b758fcca Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:04:23 +0200 Subject: [PATCH 02/17] Finishing SQS tag update tests --- .../consumers/SqsPermissionConsumer.spec.ts | 185 +++++++++++------- 1 file changed, 111 insertions(+), 74 deletions(-) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index f8d3bd6c..a5b3e8d3 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -1,6 +1,11 @@ import { setTimeout } from 'node:timers/promises' -import type { SQSClient, SendMessageCommandInput } from '@aws-sdk/client-sqs' +import { + ListQueueTagsCommand, + ListQueuesCommand, + SQSClient, + SendMessageCommandInput, +} from '@aws-sdk/client-sqs' import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import type { BarrierResult } from '@message-queue-toolkit/core' @@ -108,13 +113,11 @@ describe('SqsPermissionConsumer', () => { }) }) - it('updates existing queue when update is forced', async () => { + it('does not update existing queue when attributes did not change', async () => { await assertQueue(sqsClient, { QueueName: queueName, - tags: { - project: 'some-project', - service: 'some-service', - leftover: 'some-leftover', + Attributes: { + KmsMasterKeyId: 'somevalue', }, }) @@ -122,13 +125,11 @@ describe('SqsPermissionConsumer', () => { creationConfig: { queue: { QueueName: queueName, - tags: { - project: 'some-project', - service: 'changed-service', - cc: 'some-cc', + Attributes: { + KmsMasterKeyId: 'somevalue', }, }, - forceTagUpdate: true, + updateAttributesIfExists: true, }, deletionConfig: { deleteIfExists: false, @@ -144,90 +145,126 @@ describe('SqsPermissionConsumer', () => { ) const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'TagQueueCommand' + return entry[0].constructor.name === 'SetQueueAttributesCommand' }) - expect(updateCall).toBeDefined() + expect(updateCall).toBeUndefined() + + const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) + + expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') }) - it('updates existing queue when update is not forced', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - tags: { + describe('tags update', () => { + const getTags = (queueUrl: string) => + sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) + + it('updates existing queue tags when update is forced', async () => { + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + }, + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual({ project: 'some-project', service: 'some-service', leftover: 'some-leftover', - }, - }) + }) - const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - tags: { - project: 'some-project', - service: 'changed-service', - cc: 'some-cc', + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + }, }, + forceTagUpdate: true, }, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, - }) + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') - await newConsumer.init() - expect(newConsumer.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'TagQueueCommand' - }) - expect(updateCall).toBeUndefined() - }) + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeDefined() - it('does not update existing queue when attributes did not change', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual({ + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + leftover: 'some-leftover', + }) }) - const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, + it('not updates existing queue tags when update is not forced', async () => { + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, - }) + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual({ + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + }, + }, + }, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - await newConsumer.init() - expect(newConsumer.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + const sqsSpy = vi.spyOn(sqsClient, 'send') - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'SetQueueAttributesCommand' - }) - expect(updateCall).toBeUndefined() + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeUndefined() - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual({ + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + }) + }) }) }) From 4e99ff1f6eb33daedaed393209ee43ca6679543d Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:05:53 +0200 Subject: [PATCH 03/17] Minor test improvement --- .../consumers/SqsPermissionConsumer.spec.ts | 139 +++++++++--------- 1 file changed, 70 insertions(+), 69 deletions(-) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index a5b3e8d3..a94a00e3 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -2,9 +2,8 @@ import { setTimeout } from 'node:timers/promises' import { ListQueueTagsCommand, - ListQueuesCommand, - SQSClient, - SendMessageCommandInput, + type SQSClient, + type SendMessageCommandInput, } from '@aws-sdk/client-sqs' import { ReceiveMessageCommand, SendMessageCommand } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' @@ -68,90 +67,92 @@ describe('SqsPermissionConsumer', () => { ) }) - it('updates existing queue when one with different attributes exist', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, - }) + describe('attributes update', () => { + it('updates existing queue when one with different attributes exist', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }) - const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'othervalue', - VisibilityTimeout: '10', + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }, }, + updateAttributesIfExists: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, - }) + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') - await newConsumer.init() - expect(newConsumer.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'SetQueueAttributesCommand' - }) - expect(updateCall).toBeDefined() + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'SetQueueAttributesCommand' + }) + expect(updateCall).toBeDefined() - const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) + const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) - expect(attributes.result?.attributes).toMatchObject({ - KmsMasterKeyId: 'othervalue', - VisibilityTimeout: '10', + expect(attributes.result?.attributes).toMatchObject({ + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }) }) - }) - it('does not update existing queue when attributes did not change', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, - }) + it('does not update existing queue when attributes did not change', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }) - const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', + const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, }, + updateAttributesIfExists: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, - }) + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') - await newConsumer.init() - expect(newConsumer.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + await newConsumer.init() + expect(newConsumer.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'SetQueueAttributesCommand' - }) - expect(updateCall).toBeUndefined() + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'SetQueueAttributesCommand' + }) + expect(updateCall).toBeUndefined() - const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) + const attributes = await getQueueAttributes(sqsClient, newConsumer.queueProps.url) - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') + expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') + }) }) describe('tags update', () => { From 88370fbe827392fe1062a674968dd55cf4ddfeaf Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:07:47 +0200 Subject: [PATCH 04/17] lint fix --- packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 1c4180b0..bb39f9df 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -242,7 +242,7 @@ describe('SnsSqsPermissionConsumer', () => { expect(attributes.result?.attributes).toMatchObject({ RedrivePolicy: JSON.stringify({ - deadLetterTargetArn: `arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue`, + deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', maxReceiveCount: 3, }), }) @@ -279,7 +279,7 @@ describe('SnsSqsPermissionConsumer', () => { expect(attributes.result?.attributes).toMatchObject({ RedrivePolicy: JSON.stringify({ - deadLetterTargetArn: `arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue`, + deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', maxReceiveCount: 3, }), }) From ff57c4be5856d753de6d4a26dd0e0219ee570175 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:22:51 +0200 Subject: [PATCH 05/17] Fixing missing piece on SNS --- packages/sns/lib/utils/snsInitter.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 0c61edcb..2ef2be97 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -69,6 +69,7 @@ export async function initSnsSqs( allowedSourceOwner: creationConfig.allowedSourceOwner, topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, logger: extraParams?.logger, + forceTagUpdate: creationConfig.forceTagUpdate, }, ) if (!subscriptionArn) { From ed649a8c32109e974c720ce2d3cf705fe7b742bd Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:32:34 +0200 Subject: [PATCH 06/17] sns sqs consumer update tags tests --- .../SnsSqsPermissionConsumer.spec.ts | 407 ++++++++++++------ 1 file changed, 266 insertions(+), 141 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index bb39f9df..2e20cd8c 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -1,7 +1,7 @@ import { setTimeout } from 'node:timers/promises' import type { SNSClient } from '@aws-sdk/client-sns' -import type { SQSClient } from '@aws-sdk/client-sqs' +import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' @@ -104,184 +104,309 @@ describe('SnsSqsPermissionConsumer', () => { await deleteTopic(snsClient, 'existingTopic') }) - it('updates existing queue when one with different attributes exist', async () => { - await assertQueue(sqsClient, { - QueueName: 'existingQueue', - Attributes: { - KmsMasterKeyId: 'somevalue', - }, + describe('tags update', () => { + const queueName = 'my-queue-with-tags' + + beforeEach(async () => { + await deleteQueue(sqsClient, queueName) }) - const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { - Name: 'sometopic', - }, - queue: { - QueueName: 'existingQueue', - Attributes: { - KmsMasterKeyId: 'othervalue', - VisibilityTimeout: '10', + const getTags = (queueUrl: string) => + sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) + + it('updates existing queue tags when update is forced', async () => { + const initialTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + const newTags = { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + } + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialTags, + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual(initialTags) + + const sqsSpy = vi.spyOn(sqsClient, 'send') + + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { + Name: 'some-topic', }, + queue: { + QueueName: queueName, + tags: newTags, + }, + forceTagUpdate: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - }) + deletionConfig: { deleteIfExists: false }, + }) - await newConsumer.init() - expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', - ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/my-queue-with-tags', + ) + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) - const attributes = await getQueueAttributes(sqsClient, newConsumer.subscriptionProps.queueUrl) + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeDefined() - expect(attributes.result?.attributes).toMatchObject({ - KmsMasterKeyId: 'othervalue', - VisibilityTimeout: '10', + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual({ + ...newTags, + leftover: 'some-leftover', + }) }) - }) - it('updates existing queue when one with different attributes exist and sets the policy', async () => { - await assertQueue(sqsClient, { - QueueName: 'existingQueue', - Attributes: { - KmsMasterKeyId: 'somevalue', - }, - }) + it('does not update existing queue tags when update is not forced', async () => { + const initialTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialTags, + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual(initialTags) - const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { - Name: 'sometopic', - }, - queue: { - QueueName: 'existingQueue', - Attributes: { - KmsMasterKeyId: 'othervalue', + const sqsSpy = vi.spyOn(sqsClient, 'send') + + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { + Name: 'some-topic', + }, + queue: { + QueueName: queueName, + tags: { service: 'changed-service' }, }, }, - updateAttributesIfExists: true, - topicArnsWithPublishPermissionsPrefix: 'someservice-', - }, - deletionConfig: { - deleteIfExists: false, - }, - }) + deletionConfig: { deleteIfExists: false }, + }) - await newConsumer.init() - expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', - ) + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/my-queue-with-tags', + ) + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) - const attributes = await getQueueAttributes(sqsClient, newConsumer.subscriptionProps.queueUrl) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeUndefined() - expect(attributes.result?.attributes!.Policy).toBe( - '{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:existingQueue","Condition":{"ArnLike":{"aws:SourceArn":"someservice-"}}}]}', - ) + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual(initialTags) + }) }) - it('does not attempt to update non-existing queue when passing update param', async () => { - const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { - Name: 'sometopic', + describe('attributes update', () => { + it('updates existing queue when one with different attributes exist', async () => { + await assertQueue(sqsClient, { + QueueName: 'existingQueue', + Attributes: { + KmsMasterKeyId: 'somevalue', }, - queue: { - QueueName: 'existingQueue', - Attributes: { - KmsMasterKeyId: 'othervalue', + }) + + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { + Name: 'sometopic', + }, + queue: { + QueueName: 'existingQueue', + Attributes: { + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }, }, + updateAttributesIfExists: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - }) + deletionConfig: { + deleteIfExists: false, + }, + }) - await newConsumer.init() - expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', - ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + ) + expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') - const attributes = await getQueueAttributes(sqsClient, newConsumer.subscriptionProps.queueUrl) + const attributes = await getQueueAttributes( + sqsClient, + newConsumer.subscriptionProps.queueUrl, + ) - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') - }) + expect(attributes.result?.attributes).toMatchObject({ + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }) + }) - it('creates a new dead letter queue', async () => { - const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { Name: 'sometopic' }, - queue: { QueueName: 'existingQueue' }, - updateAttributesIfExists: true, - }, - deadLetterQueue: { - redrivePolicy: { maxReceiveCount: 3 }, + it('updates existing queue when one with different attributes exist and sets the policy', async () => { + await assertQueue(sqsClient, { + QueueName: 'existingQueue', + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }) + + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { creationConfig: { - queue: { QueueName: 'deadLetterQueue' }, + topic: { + Name: 'sometopic', + }, + queue: { + QueueName: 'existingQueue', + Attributes: { + KmsMasterKeyId: 'othervalue', + }, + }, + updateAttributesIfExists: true, + topicArnsWithPublishPermissionsPrefix: 'someservice-', }, - }, - }) + deletionConfig: { + deleteIfExists: false, + }, + }) - await newConsumer.init() - expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', - ) - expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', - ) + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + ) - const attributes = await getQueueAttributes(sqsClient, newConsumer.subscriptionProps.queueUrl) + const attributes = await getQueueAttributes( + sqsClient, + newConsumer.subscriptionProps.queueUrl, + ) + expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') - expect(attributes.result?.attributes).toMatchObject({ - RedrivePolicy: JSON.stringify({ - deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', - maxReceiveCount: 3, - }), + expect(attributes.result?.attributes!.Policy).toBe( + '{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:existingQueue","Condition":{"ArnLike":{"aws:SourceArn":"someservice-"}}}]}', + ) }) - }) - it('using existing dead letter queue', async () => { - await assertQueue(sqsClient, { - QueueName: 'deadLetterQueue', + it('does not attempt to update non-existing queue when passing update param', async () => { + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { + Name: 'sometopic', + }, + queue: { + QueueName: 'existingQueue', + Attributes: { + KmsMasterKeyId: 'othervalue', + }, + }, + updateAttributesIfExists: true, + }, + deletionConfig: { + deleteIfExists: false, + }, + }) + + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + ) + expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + + const attributes = await getQueueAttributes( + sqsClient, + newConsumer.subscriptionProps.queueUrl, + ) + + expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') }) + }) - const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { Name: 'sometopic' }, - queue: { QueueName: 'existingQueue' }, - updateAttributesIfExists: true, - }, - deadLetterQueue: { - redrivePolicy: { maxReceiveCount: 3 }, - locatorConfig: { - queueUrl: 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', + describe('dead letter queue', () => { + it('creates a new dead letter queue', async () => { + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: 'sometopic' }, + queue: { QueueName: 'existingQueue' }, + updateAttributesIfExists: true, }, - }, + deadLetterQueue: { + redrivePolicy: { maxReceiveCount: 3 }, + creationConfig: { + queue: { QueueName: 'deadLetterQueue' }, + }, + }, + }) + + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + ) + expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', + ) + + const attributes = await getQueueAttributes( + sqsClient, + newConsumer.subscriptionProps.queueUrl, + ) + + expect(attributes.result?.attributes).toMatchObject({ + RedrivePolicy: JSON.stringify({ + deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', + maxReceiveCount: 3, + }), + }) }) - await newConsumer.init() - expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', - ) - expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', - ) + it('using existing dead letter queue', async () => { + await assertQueue(sqsClient, { + QueueName: 'deadLetterQueue', + }) - const attributes = await getQueueAttributes(sqsClient, newConsumer.subscriptionProps.queueUrl) + const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: 'sometopic' }, + queue: { QueueName: 'existingQueue' }, + updateAttributesIfExists: true, + }, + deadLetterQueue: { + redrivePolicy: { maxReceiveCount: 3 }, + locatorConfig: { + queueUrl: 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', + }, + }, + }) - expect(attributes.result?.attributes).toMatchObject({ - RedrivePolicy: JSON.stringify({ - deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', - maxReceiveCount: 3, - }), + await newConsumer.init() + expect(newConsumer.subscriptionProps.queueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + ) + expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( + 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', + ) + + const attributes = await getQueueAttributes( + sqsClient, + newConsumer.subscriptionProps.queueUrl, + ) + + expect(attributes.result?.attributes).toMatchObject({ + RedrivePolicy: JSON.stringify({ + deadLetterTargetArn: 'arn:aws:sqs:eu-west-1:000000000000:deadLetterQueue', + maxReceiveCount: 3, + }), + }) }) }) }) From 1dc024c9fcd51e75ad877f596aab1fde801b8f57 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:33:54 +0200 Subject: [PATCH 07/17] Minor improvements --- .../consumers/SqsPermissionConsumer.spec.ts | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index a94a00e3..1fca15c4 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -160,30 +160,29 @@ describe('SqsPermissionConsumer', () => { sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) it('updates existing queue tags when update is forced', async () => { - const assertResult = await assertQueue(sqsClient, { - QueueName: queueName, - tags: { - project: 'some-project', - service: 'some-service', - leftover: 'some-leftover', - }, - }) - const preTags = await getTags(assertResult.queueUrl) - expect(preTags.Tags).toEqual({ + const initialTags = { project: 'some-project', service: 'some-service', leftover: 'some-leftover', + } + const newTags = { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + } + + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialTags, }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual(initialTags) const newConsumer = new SqsPermissionConsumer(diContainer.cradle, { creationConfig: { queue: { QueueName: queueName, - tags: { - project: 'some-project', - service: 'changed-service', - cc: 'some-cc', - }, + tags: newTags, }, forceTagUpdate: true, }, @@ -207,14 +206,12 @@ describe('SqsPermissionConsumer', () => { const postTags = await getTags(assertResult.queueUrl) expect(postTags.Tags).toEqual({ - project: 'some-project', - service: 'changed-service', - cc: 'some-cc', + ...newTags, leftover: 'some-leftover', }) }) - it('not updates existing queue tags when update is not forced', async () => { + it('does not update existing queue tags when update is not forced', async () => { const assertResult = await assertQueue(sqsClient, { QueueName: queueName, tags: { From 563c17151608c9ea5708038c1209f28211c22a47 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:43:47 +0200 Subject: [PATCH 08/17] Minor test simplification --- .../SnsSqsPermissionConsumer.spec.ts | 61 +++++++++---------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 2e20cd8c..3a09fe43 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -17,27 +17,30 @@ import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' describe('SnsSqsPermissionConsumer', () => { describe('init', () => { + const queueName = 'some-queue' + let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient + beforeAll(async () => { diContainer = await registerDependencies({}, false) sqsClient = diContainer.cradle.sqsClient snsClient = diContainer.cradle.snsClient }) beforeEach(async () => { - await deleteQueue(sqsClient, 'existingQueue') + await deleteQueue(sqsClient, queueName) }) // FixMe https://github.com/localstack/localstack/issues/9306 it.skip('throws an error when invalid queue locator is passed', async () => { await assertQueue(sqsClient, { - QueueName: 'existingQueue', + QueueName: queueName, }) const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { locatorConfig: { - queueUrl: 'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue', + queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`, subscriptionArn: 'dummy', topicArn: 'dummy', }, @@ -48,7 +51,7 @@ describe('SnsSqsPermissionConsumer', () => { it('does not create a new queue when queue locator is passed', async () => { await assertQueue(sqsClient, { - QueueName: 'existingQueue', + QueueName: queueName, }) const arn = await assertTopic(snsClient, { @@ -58,7 +61,7 @@ describe('SnsSqsPermissionConsumer', () => { const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { locatorConfig: { topicArn: arn, - queueUrl: 'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue', + queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`, subscriptionArn: 'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395', }, @@ -66,9 +69,9 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue', + `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`, ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) expect(newConsumer.subscriptionProps.topicArn).toEqual(arn) expect(newConsumer.subscriptionProps.subscriptionArn).toBe( 'arn:aws:sns:eu-west-1:000000000000:user_permissions:bdf640a2-bedf-475a-98b8-758b88c87395', @@ -105,12 +108,6 @@ describe('SnsSqsPermissionConsumer', () => { }) describe('tags update', () => { - const queueName = 'my-queue-with-tags' - - beforeEach(async () => { - await deleteQueue(sqsClient, queueName) - }) - const getTags = (queueUrl: string) => sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) @@ -150,7 +147,7 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/my-queue-with-tags', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) expect(newConsumer.subscriptionProps.queueName).toBe(queueName) @@ -196,7 +193,7 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/my-queue-with-tags', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) expect(newConsumer.subscriptionProps.queueName).toBe(queueName) @@ -213,7 +210,7 @@ describe('SnsSqsPermissionConsumer', () => { describe('attributes update', () => { it('updates existing queue when one with different attributes exist', async () => { await assertQueue(sqsClient, { - QueueName: 'existingQueue', + QueueName: queueName, Attributes: { KmsMasterKeyId: 'somevalue', }, @@ -225,7 +222,7 @@ describe('SnsSqsPermissionConsumer', () => { Name: 'sometopic', }, queue: { - QueueName: 'existingQueue', + QueueName: queueName, Attributes: { KmsMasterKeyId: 'othervalue', VisibilityTimeout: '10', @@ -240,9 +237,9 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) const attributes = await getQueueAttributes( sqsClient, @@ -257,7 +254,7 @@ describe('SnsSqsPermissionConsumer', () => { it('updates existing queue when one with different attributes exist and sets the policy', async () => { await assertQueue(sqsClient, { - QueueName: 'existingQueue', + QueueName: queueName, Attributes: { KmsMasterKeyId: 'somevalue', }, @@ -269,7 +266,7 @@ describe('SnsSqsPermissionConsumer', () => { Name: 'sometopic', }, queue: { - QueueName: 'existingQueue', + QueueName: queueName, Attributes: { KmsMasterKeyId: 'othervalue', }, @@ -284,17 +281,17 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) const attributes = await getQueueAttributes( sqsClient, newConsumer.subscriptionProps.queueUrl, ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) - expect(attributes.result?.attributes!.Policy).toBe( - '{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:existingQueue","Condition":{"ArnLike":{"aws:SourceArn":"someservice-"}}}]}', + expect(attributes.result?.attributes!.Policy).toMatchInlineSnapshot( + `"{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:some-queue","Condition":{"ArnLike":{"aws:SourceArn":"someservice-"}}}]}"`, ) }) @@ -305,7 +302,7 @@ describe('SnsSqsPermissionConsumer', () => { Name: 'sometopic', }, queue: { - QueueName: 'existingQueue', + QueueName: queueName, Attributes: { KmsMasterKeyId: 'othervalue', }, @@ -319,9 +316,9 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) - expect(newConsumer.subscriptionProps.queueName).toBe('existingQueue') + expect(newConsumer.subscriptionProps.queueName).toBe(queueName) const attributes = await getQueueAttributes( sqsClient, @@ -337,7 +334,7 @@ describe('SnsSqsPermissionConsumer', () => { const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { creationConfig: { topic: { Name: 'sometopic' }, - queue: { QueueName: 'existingQueue' }, + queue: { QueueName: queueName }, updateAttributesIfExists: true, }, deadLetterQueue: { @@ -350,7 +347,7 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', @@ -377,7 +374,7 @@ describe('SnsSqsPermissionConsumer', () => { const newConsumer = new SnsSqsPermissionConsumer(diContainer.cradle, { creationConfig: { topic: { Name: 'sometopic' }, - queue: { QueueName: 'existingQueue' }, + queue: { QueueName: queueName }, updateAttributesIfExists: true, }, deadLetterQueue: { @@ -390,7 +387,7 @@ describe('SnsSqsPermissionConsumer', () => { await newConsumer.init() expect(newConsumer.subscriptionProps.queueUrl).toBe( - 'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue', + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, ) expect(newConsumer.subscriptionProps.deadLetterQueueUrl).toBe( 'http://sqs.eu-west-1.localstack:4566/000000000000/deadLetterQueue', From 4ad7c3bb20024b0e7c1db20582167bd65f6c4c51 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 19:50:47 +0200 Subject: [PATCH 09/17] SQS publisher tests --- .../publishers/SqsPermissionPublisher.spec.ts | 226 +++++++++++++----- 1 file changed, 164 insertions(+), 62 deletions(-) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts index 995c089e..c1299455 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts @@ -1,4 +1,4 @@ -import type { SQSClient } from '@aws-sdk/client-sqs' +import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import type { AwilixContainer } from 'awilix' import { Consumer } from 'sqs-consumer' @@ -59,86 +59,188 @@ describe('SqsPermissionPublisher', () => { ) }) - it('updates existing queue when one with different attributes exist', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, - }) - - const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'othervalue', + describe('attributes update', () => { + it('updates existing queue when one with different attributes exist', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }) + + const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'othervalue', + }, }, + updateAttributesIfExists: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, - }) + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + const sqsSpy = vi.spyOn(sqsClient, 'send') - await newPublisher.init() - expect(newPublisher.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + await newPublisher.init() + expect(newPublisher.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) + + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'SetQueueAttributesCommand' + }) + expect(updateCall).toBeDefined() - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'SetQueueAttributesCommand' + const attributes = await getQueueAttributes(sqsClient, newPublisher.queueProps.url) + + expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') }) - expect(updateCall).toBeDefined() - const attributes = await getQueueAttributes(sqsClient, newPublisher.queueProps.url) + it('does not update existing queue when attributes did not change', async () => { + await assertQueue(sqsClient, { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }) + + const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + Attributes: { + KmsMasterKeyId: 'somevalue', + }, + }, + updateAttributesIfExists: true, + }, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) + + const sqsSpy = vi.spyOn(sqsClient, 'send') - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') - }) + await newPublisher.init() + expect(newPublisher.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - it('does not update existing queue when attributes did not change', async () => { - await assertQueue(sqsClient, { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', - }, + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'SetQueueAttributesCommand' + }) + expect(updateCall).toBeUndefined() + + const attributes = await getQueueAttributes(sqsClient, newPublisher.queueProps.url) + + expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') }) + }) - const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { - creationConfig: { - queue: { - QueueName: queueName, - Attributes: { - KmsMasterKeyId: 'somevalue', + describe('tags update', () => { + const getTags = (queueUrl: string) => + sqsClient.send(new ListQueueTagsCommand({ QueueUrl: queueUrl })) + + it('updates existing queue tags when update is forced', async () => { + const initialTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + const newTags = { + project: 'some-project', + service: 'changed-service', + cc: 'some-cc', + } + + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialTags, + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: newTags, }, + forceTagUpdate: true, }, - updateAttributesIfExists: true, - }, - deletionConfig: { - deleteIfExists: false, - }, - logMessages: true, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) + + const sqsSpy = vi.spyOn(sqsClient, 'send') + + await newPublisher.init() + expect(newPublisher.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) + + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual({ + ...newTags, + leftover: 'some-leftover', + }) }) - const sqsSpy = vi.spyOn(sqsClient, 'send') + it('does not update existing queue tags when update is not forced', async () => { + const initialTags = { + project: 'some-project', + service: 'some-service', + leftover: 'some-leftover', + } + + const assertResult = await assertQueue(sqsClient, { + QueueName: queueName, + tags: initialTags, + }) + const preTags = await getTags(assertResult.queueUrl) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { + queue: { + QueueName: queueName, + tags: { service: 'changed-service' }, + }, + }, + deletionConfig: { + deleteIfExists: false, + }, + logMessages: true, + }) - await newPublisher.init() - expect(newPublisher.queueProps.url).toBe( - `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, - ) + const sqsSpy = vi.spyOn(sqsClient, 'send') - const updateCall = sqsSpy.mock.calls.find((entry) => { - return entry[0].constructor.name === 'SetQueueAttributesCommand' - }) - expect(updateCall).toBeUndefined() + await newPublisher.init() + expect(newPublisher.queueProps.url).toBe( + `http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`, + ) - const attributes = await getQueueAttributes(sqsClient, newPublisher.queueProps.url) + const updateCall = sqsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagQueueCommand' + }) + expect(updateCall).toBeUndefined() - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('somevalue') + const postTags = await getTags(assertResult.queueUrl) + expect(postTags.Tags).toEqual(initialTags) + }) }) }) From 3e2175bf33252f914d73d63a8098f6ddbfebb970 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 21:14:31 +0200 Subject: [PATCH 10/17] Testing SNS tags update --- .../publishers/SnsPermissionPublisher.spec.ts | 41 ++++++++++++++++--- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 37d3288b..87749b26 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,4 +1,4 @@ -import type { SNSClient } from '@aws-sdk/client-sns' +import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' @@ -21,10 +21,10 @@ import type { Dependencies } from '../utils/testContext' import { SnsPermissionPublisher } from './SnsPermissionPublisher' -const queueName = 'someQueue' - describe('SnsPermissionPublisher', () => { describe('init', () => { + const topicNome = 'existingTopic' + let diContainer: AwilixContainer let snsClient: SNSClient beforeAll(async () => { @@ -84,7 +84,7 @@ describe('SnsPermissionPublisher', () => { it('does not create a new queue when queue locator is passed', async () => { const arn = await assertTopic(snsClient, { - Name: 'existingTopic', + Name: topicNome, }) const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { @@ -95,11 +95,42 @@ describe('SnsPermissionPublisher', () => { await newPublisher.init() expect(newPublisher.topicArnProp).toEqual(arn) - await deleteTopic(snsClient, 'existingTopic') + await deleteTopic(snsClient, topicNome) + }) + + // TESTING HOW SNS TAGS UPDATE WORKS + it.skip('to be removed', async () => { + await deleteTopic(snsClient, topicNome) + const arn = await assertTopic(snsClient, { + Name: topicNome, + Tags: [ + { Key: 'hello', Value: 'world' }, + { Key: 'goodbye', Value: 'world' }, + ], + }) + + const command = new ListTagsForResourceCommand({ ResourceArn: arn }) + const res = await snsClient.send(command) + console.log(res.Tags) + + const updateCommand = new TagResourceCommand({ + ResourceArn: arn, + Tags: [ + { Key: 'hello', Value: 'friend' }, + { Key: 'goodbye', Value: 'world' }, + ], + }) + await snsClient.send(updateCommand) + + const command2 = new ListTagsForResourceCommand({ ResourceArn: arn }) + const res2 = await snsClient.send(command2) + console.log(res2.Tags) }) }) describe('publish', () => { + const queueName = 'someQueue' + let diContainer: AwilixContainer let sqsClient: SQSClient let snsClient: SNSClient From 45bf9127b90e5e4c59804590c51080103da052e5 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Mon, 14 Oct 2024 23:36:12 +0200 Subject: [PATCH 11/17] SNS publisher update topic tags --- packages/sns/lib/sns/AbstractSnsService.ts | 1 + packages/sns/lib/utils/snsUtils.ts | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/packages/sns/lib/sns/AbstractSnsService.ts b/packages/sns/lib/sns/AbstractSnsService.ts index cbae6ae1..fcfb6137 100644 --- a/packages/sns/lib/sns/AbstractSnsService.ts +++ b/packages/sns/lib/sns/AbstractSnsService.ts @@ -31,6 +31,7 @@ export type SNSTopicConfig = { export type ExtraSNSCreationParams = { queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[] allowedSourceOwner?: string + forceTagUpdate?: boolean } export type SNSCreationConfig = { diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index 925c06d6..f5285073 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,6 +1,7 @@ import { type CreateTopicCommandInput, type SNSClient, + TagResourceCommand, paginateListTopics, } from '@aws-sdk/client-sns' import { @@ -102,6 +103,13 @@ export async function assertTopic( }) await snsClient.send(setTopicAttributesCommand) } + if (extraParams?.forceTagUpdate) { + const tagTopicCommand = new TagResourceCommand({ + ResourceArn: topicArn, + Tags: topicOptions.Tags, + }) + await snsClient.send(tagTopicCommand) + } return topicArn } From 4772660c910d32334808e1e88866cefcea14af4f Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 15 Oct 2024 13:38:12 +0200 Subject: [PATCH 12/17] Adding tags update --- packages/sns/lib/utils/snsInitter.ts | 1 + packages/sns/lib/utils/snsUtils.ts | 3 +++ 2 files changed, 4 insertions(+) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 2ef2be97..a5a7e8c0 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -230,6 +230,7 @@ export async function initSns( const topicArn = await assertTopic(snsClient, creationConfig.topic!, { queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix, allowedSourceOwner: creationConfig.allowedSourceOwner, + forceTagUpdate: creationConfig.forceTagUpdate, }) return { topicArn, diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index f5285073..c3bfcb90 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,5 +1,7 @@ import { type CreateTopicCommandInput, + ListTagsForResourceCommand, + ListTopicsCommand, type SNSClient, TagResourceCommand, paginateListTopics, @@ -83,6 +85,7 @@ export async function assertTopic( topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { + 'aws:sns:region:account_id:topic_name' const command = new CreateTopicCommand(topicOptions) const response = await snsClient.send(command) From dda6b527f501bc6926fa0ed871741e461cced7ba Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Tue, 15 Oct 2024 13:40:06 +0200 Subject: [PATCH 13/17] Adding tests --- .../publishers/SnsPermissionPublisher.spec.ts | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 87749b26..d9be23ac 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,12 +1,12 @@ import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' -import type { SQSClient } from '@aws-sdk/client-sqs' +import { ListQueueTagsCommand, SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' import type { SQSMessage } from '@message-queue-toolkit/sqs' import { FakeConsumerErrorResolver, assertQueue, deleteQueue } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' import { Consumer } from 'sqs-consumer' -import { afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { deserializeSNSMessage } from '../../lib/utils/snsMessageDeserializer' import { subscribeToTopic } from '../../lib/utils/snsSubscriber' @@ -98,6 +98,52 @@ describe('SnsPermissionPublisher', () => { await deleteTopic(snsClient, topicNome) }) + describe('tags', () => { + const getTags = (arn: string) => + snsClient.send(new ListTagsForResourceCommand({ ResourceArn: arn })) + + it('updates existing queue tags when update is forced', async () => { + const initialTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'some-service' }, + { Key: 'leftover', Value: 'some-leftover' }, + ] + const newTags = [ + { Key: 'project', Value: 'some-project' }, + { Key: 'service', Value: 'changed-service' }, + { Key: 'cc', Value: 'some-cc' }, + ] + + const arn = await assertTopic(snsClient, { + Name: topicNome, + Tags: initialTags, + }) + const preTags = await getTags(arn) + expect(preTags.Tags).toEqual(initialTags) + + const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { + Name: topicNome, + Tags: newTags, + }, + forceTagUpdate: true, + }, + }) + + const snsSpy = vi.spyOn(snsClient, 'send') + await newPublisher.init() + + const updateCall = snsSpy.mock.calls.find((entry) => { + return entry[0].constructor.name === 'TagResourceCommand' + }) + expect(updateCall).toBeDefined() + + const postTags = await getTags(arn) + expect(postTags.Tags).toEqual([...newTags, { Key: 'leftover', Value: 'some-leftover' }]) + }) + }) + // TESTING HOW SNS TAGS UPDATE WORKS it.skip('to be removed', async () => { await deleteTopic(snsClient, topicNome) From bbdd9e27642871bb07a2f4e305fb8f44f054aa40 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:47:58 +0200 Subject: [PATCH 14/17] Adding aws-sdk sts --- packages/sns/package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 142e7d95..582dd396 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -32,6 +32,7 @@ "peerDependencies": { "@aws-sdk/client-sns": "^3.632.0", "@aws-sdk/client-sqs": "^3.632.0", + "@aws-sdk/client-sts": "^3.632.0", "@message-queue-toolkit/core": ">=15.0.0", "@message-queue-toolkit/schemas": ">=2.0.0", "@message-queue-toolkit/sqs": "^17.0.0" @@ -40,11 +41,11 @@ "@aws-sdk/client-s3": "^3.670.0", "@aws-sdk/client-sns": "^3.670.0", "@aws-sdk/client-sqs": "^3.670.0", + "@biomejs/biome": "1.9.3", + "@kibertoad/biome-config": "^1.2.1", "@message-queue-toolkit/core": "*", "@message-queue-toolkit/s3-payload-store": "*", "@message-queue-toolkit/sqs": "*", - "@biomejs/biome": "1.9.3", - "@kibertoad/biome-config": "^1.2.1", "@types/node": "^22.7.5", "@vitest/coverage-v8": "^2.1.2", "awilix": "^12.0.1", From edb47d1b42947ac7ac23ce1f60768ca2a7792c7f Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:53:51 +0200 Subject: [PATCH 15/17] Minor test changes --- .../test/publishers/SnsPermissionPublisher.spec.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index d9be23ac..36c92345 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -1,5 +1,5 @@ -import { ListTagsForResourceCommand, type SNSClient, TagResourceCommand } from '@aws-sdk/client-sns' -import { ListQueueTagsCommand, SQSClient } from '@aws-sdk/client-sqs' +import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' +import type { SQSClient } from '@aws-sdk/client-sqs' import type { InternalError } from '@lokalise/node-core' import { waitAndRetry } from '@lokalise/node-core' import type { SQSMessage } from '@message-queue-toolkit/sqs' @@ -32,6 +32,10 @@ describe('SnsPermissionPublisher', () => { snsClient = diContainer.cradle.snsClient }) + beforeEach(async () => { + await deleteTopic(snsClient, topicNome) + }) + it('sets correct policy when policy fields are set', async () => { const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { creationConfig: { @@ -95,7 +99,6 @@ describe('SnsPermissionPublisher', () => { await newPublisher.init() expect(newPublisher.topicArnProp).toEqual(arn) - await deleteTopic(snsClient, topicNome) }) describe('tags', () => { @@ -123,10 +126,7 @@ describe('SnsPermissionPublisher', () => { const newPublisher = new SnsPermissionPublisher(diContainer.cradle, { creationConfig: { - topic: { - Name: topicNome, - Tags: newTags, - }, + topic: { Name: topicNome, Tags: newTags }, forceTagUpdate: true, }, }) From 9213aae1bb6eacf89e3578298569f3757f9c13f2 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:54:00 +0200 Subject: [PATCH 16/17] Localstack enabling sts --- docker-compose.yml | 2 +- packages/sns/docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f89a99a4..fe1e669d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,7 +15,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock diff --git a/packages/sns/docker-compose.yml b/packages/sns/docker-compose.yml index 4b507c1d..6ec389d5 100644 --- a/packages/sns/docker-compose.yml +++ b/packages/sns/docker-compose.yml @@ -7,7 +7,7 @@ services: - '127.0.0.1:4566:4566' # LocalStack Gateway - '127.0.0.1:4510-4559:4510-4559' # external services port range environment: - - SERVICES=sns,sqs,s3 + - SERVICES=sns,sqs,s3,sts - DEBUG=0 - DATA_DIR=${DATA_DIR-} - DOCKER_HOST=unix:///var/run/docker.sock From 9473b607b38ed6a7efc1a11b31ae72dbdb324756 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 18 Oct 2024 20:54:43 +0200 Subject: [PATCH 17/17] Implementing tags update + simple tests --- packages/sns/lib/utils/snsUtils.ts | 56 +++++++++++++++---- .../publishers/SnsPermissionPublisher.spec.ts | 35 ++---------- 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/packages/sns/lib/utils/snsUtils.ts b/packages/sns/lib/utils/snsUtils.ts index c3bfcb90..f8fde8ae 100644 --- a/packages/sns/lib/utils/snsUtils.ts +++ b/packages/sns/lib/utils/snsUtils.ts @@ -1,7 +1,5 @@ import { type CreateTopicCommandInput, - ListTagsForResourceCommand, - ListTopicsCommand, type SNSClient, TagResourceCommand, paginateListTopics, @@ -15,11 +13,12 @@ import { SetTopicAttributesCommand, UnsubscribeCommand, } from '@aws-sdk/client-sns' -import type { Either } from '@lokalise/node-core' +import { type Either, isError } from '@lokalise/node-core' import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from '@message-queue-toolkit/sqs' import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService' +import { GetCallerIdentityCommand, STSClient } from '@aws-sdk/client-sts' import { generateTopicSubscriptionPolicy } from './snsAttributeUtils' type AttributesResult = { @@ -85,14 +84,19 @@ export async function assertTopic( topicOptions: CreateTopicCommandInput, extraParams?: ExtraSNSCreationParams, ) { - 'aws:sns:region:account_id:topic_name' - const command = new CreateTopicCommand(topicOptions) - const response = await snsClient.send(command) - - if (!response.TopicArn) { - throw new Error('No topic arn in response') + let topicArn: string + try { + const command = new CreateTopicCommand(topicOptions) + const response = await snsClient.send(command) + if (!response.TopicArn) throw new Error('No topic arn in response') + topicArn = response.TopicArn + } catch (err) { + // We only manually build ARN in case of tag update + if (!extraParams?.forceTagUpdate) throw err + // To build ARN we need topic name and error should be "topic already exist with different tags" + if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err + topicArn = await buildTopicArn(snsClient, topicOptions.Name) } - const topicArn = response.TopicArn if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) { const setTopicAttributesCommand = new SetTopicAttributesCommand({ @@ -189,3 +193,35 @@ export async function getTopicArnByName(snsClient: SNSClient, topicName?: string */ export const calculateOutgoingMessageSize = (message: unknown) => sqsCalculateOutgoingMessageSize(message) + +const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) => + !!error && + isError(error) && + 'Error' in error && + !!error.Error && + typeof error.Error === 'object' && + 'Code' in error.Error && + 'Message' in error.Error && + typeof error.Error.Message === 'string' && + error.Error.Code === 'InvalidParameter' && + error.Error.Message.includes('already exists with different tags') + +/** + * Manually builds the ARN of a topic based on the current AWS account and the topic name. + * It follows the following pattern: arn:aws:sns::: + * Doc -> https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html + */ +const buildTopicArn = async (client: SNSClient, topicName: string) => { + const region = + typeof client.config.region === 'string' ? client.config.region : await client.config.region() + + const stsClient = new STSClient({ + endpoint: client.config.endpoint, + region, + credentials: client.config.credentials, + endpointProvider: client.config.endpointProvider, + }) + const identityResponse = await stsClient.send(new GetCallerIdentityCommand({})) + + return `arn:aws:sns:${region}:${identityResponse.Account}:${topicName}` +} diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 36c92345..a88bb424 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -140,38 +140,13 @@ describe('SnsPermissionPublisher', () => { expect(updateCall).toBeDefined() const postTags = await getTags(arn) - expect(postTags.Tags).toEqual([...newTags, { Key: 'leftover', Value: 'some-leftover' }]) + const tags = postTags.Tags + expect(tags).toHaveLength(4) + expect(postTags.Tags).toEqual( + expect.arrayContaining([...newTags, { Key: 'leftover', Value: 'some-leftover' }]), + ) }) }) - - // TESTING HOW SNS TAGS UPDATE WORKS - it.skip('to be removed', async () => { - await deleteTopic(snsClient, topicNome) - const arn = await assertTopic(snsClient, { - Name: topicNome, - Tags: [ - { Key: 'hello', Value: 'world' }, - { Key: 'goodbye', Value: 'world' }, - ], - }) - - const command = new ListTagsForResourceCommand({ ResourceArn: arn }) - const res = await snsClient.send(command) - console.log(res.Tags) - - const updateCommand = new TagResourceCommand({ - ResourceArn: arn, - Tags: [ - { Key: 'hello', Value: 'friend' }, - { Key: 'goodbye', Value: 'world' }, - ], - }) - await snsClient.send(updateCommand) - - const command2 = new ListTagsForResourceCommand({ ResourceArn: arn }) - const res2 = await snsClient.send(command2) - console.log(res2.Tags) - }) }) describe('publish', () => {