Skip to content

Commit

Permalink
SNS Tags update (#222)
Browse files Browse the repository at this point in the history
* Testing SNS tags update

* SNS publisher update topic tags

* Adding tags update

* Adding tests

* Adding aws-sdk sts

* Minor test changes

* Localstack enabling sts

* Implementing tags update + simple tests

* Adding stsClient as a new dependency to avoid creation

* Tests fixes

* Adding stsUtils + tests

* sns publisher tag update tests

* Import error fix

* test improvement

* SNS and sqs consumer is able to update tags

* sns sqs consumer tag update tests

* Release prepare

* lint fix

* Test fix

* Test fix

* Improving tests

* Caching caller identity

* Exposing clearCachedCallerIdentity
  • Loading branch information
CarlosGamero authored Oct 28, 2024
1 parent 8452a15 commit d5ecdf3
Show file tree
Hide file tree
Showing 21 changed files with 517 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
2 changes: 1 addition & 1 deletion packages/sns/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
1 change: 1 addition & 0 deletions packages/sns/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export {
findSubscriptionByTopicAndQueue,
getSubscriptionAttributes,
} from './lib/utils/snsUtils'
export { clearCachedCallerIdentity } from './lib/utils/stsUtils'

export { subscribeToTopic } from './lib/utils/snsSubscriber'
export { initSns, initSnsSqs } from './lib/utils/snsInitter'
Expand Down
14 changes: 12 additions & 2 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient, Tag } from '@aws-sdk/client-sn
import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes'
import { deleteSns, initSns } from '../utils/snsInitter'

Expand All @@ -10,6 +11,7 @@ export const SNS_MESSAGE_MAX_SIZE = 256 * 1024 // 256KB

export type SNSDependencies = QueueDependencies & {
snsClient: SNSClient
stsClient: STSClient
}

export type SNSTopicAWSConfig = CreateTopicCommandInput
Expand All @@ -31,6 +33,7 @@ export type SNSTopicConfig = {
export type ExtraSNSCreationParams = {
queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[]
allowedSourceOwner?: string
forceTagUpdate?: boolean
}

export type SNSCreationConfig = {
Expand Down Expand Up @@ -59,21 +62,28 @@ export abstract class AbstractSnsService<
SNSOptionsType
> {
protected readonly snsClient: SNSClient
protected readonly stsClient: STSClient
// @ts-ignore
protected topicArn: string

constructor(dependencies: DependenciesType, options: SNSOptionsType) {
super(dependencies, options)

this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

public async init() {
if (this.deletionConfig && this.creationConfig) {
await deleteSns(this.snsClient, this.deletionConfig, this.creationConfig)
await deleteSns(this.snsClient, this.stsClient, this.deletionConfig, this.creationConfig)
}

const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig)
const initResult = await initSns(
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
)
this.topicArn = initResult.topicArn
this.isInitted = true
}
Expand Down
6 changes: 6 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter'
import { readSnsMessage } from '../utils/snsMessageReader'
import type { SNSSubscriptionOptions } from '../utils/snsSubscriber'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService'

export type SNSSQSConsumerDependencies = SQSConsumerDependencies & {
snsClient: SNSClient
stsClient: STSClient
}
export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig

Expand Down Expand Up @@ -53,6 +55,7 @@ export abstract class AbstractSnsSqsConsumer<
> {
private readonly subscriptionConfig?: SNSSubscriptionOptions
private readonly snsClient: SNSClient
private readonly stsClient: STSClient

// @ts-ignore
protected topicArn: string
Expand All @@ -74,13 +77,15 @@ export abstract class AbstractSnsSqsConsumer<

this.subscriptionConfig = options.subscriptionConfig
this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

override async init(): Promise<void> {
if (this.deletionConfig && this.creationConfig && this.subscriptionConfig) {
await deleteSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.deletionConfig,
this.creationConfig.queue,
this.creationConfig.topic,
Expand All @@ -95,6 +100,7 @@ export abstract class AbstractSnsSqsConsumer<
const initSnsSqsResult = await initSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
Expand Down
1 change: 1 addition & 0 deletions packages/sns/lib/sns/SnsPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class SnsPublisherManager<
newPublisherOptions: options.newPublisherOptions,
publisherDependencies: {
snsClient: dependencies.snsClient,
stsClient: dependencies.stsClient,
logger: dependencies.logger,
errorReporter: dependencies.errorReporter,
},
Expand Down
14 changes: 11 additions & 3 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer'

import type { STSClient } from '@aws-sdk/client-sts'
import type { Either } from '@lokalise/node-core'
import { type TopicResolutionOptions, isCreateTopicCommand } from '../types/TopicTypes'
import type { SNSSubscriptionOptions } from './snsSubscriber'
Expand All @@ -24,6 +25,7 @@ import {
export async function initSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSSQSQueueLocatorType,
creationConfig?: SNSCreationConfig & SQSCreationConfig,
subscriptionConfig?: SNSSubscriptionOptions,
Expand Down Expand Up @@ -59,6 +61,7 @@ export async function initSnsSqs(
const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
creationConfig.queue,
topicResolutionOptions,
subscriptionConfig,
Expand Down Expand Up @@ -132,6 +135,7 @@ export async function initSnsSqs(
export async function deleteSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput | undefined,
Expand All @@ -152,6 +156,7 @@ export async function deleteSnsSqs(
const { subscriptionArn } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
queueConfiguration,
topicConfiguration ?? topicLocator!,
subscriptionConfiguration,
Expand All @@ -176,13 +181,14 @@ export async function deleteSnsSqs(
if (!topicName) {
throw new Error('Failed to resolve topic name')
}
await deleteTopic(snsClient, topicName)
await deleteTopic(snsClient, stsClient, topicName)
}
await deleteSubscription(snsClient, subscriptionArn)
}

export async function deleteSns(
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
creationConfig: SNSCreationConfig,
) {
Expand All @@ -200,11 +206,12 @@ export async function deleteSns(
throw new Error('topic.Name must be set for automatic deletion')
}

await deleteTopic(snsClient, creationConfig.topic.Name)
await deleteTopic(snsClient, stsClient, creationConfig.topic.Name)
}

export async function initSns(
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSTopicLocatorType,
creationConfig?: SNSCreationConfig,
) {
Expand All @@ -227,9 +234,10 @@ export async function initSns(
'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory',
)
}
const topicArn = await assertTopic(snsClient, creationConfig.topic!, {
const topicArn = await assertTopic(snsClient, stsClient, creationConfig.topic!, {
queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: creationConfig.allowedSourceOwner,
forceTagUpdate: creationConfig.forceTagUpdate,
})
return {
topicArn,
Expand Down
15 changes: 12 additions & 3 deletions packages/sns/lib/utils/snsSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { assertQueue } from '@message-queue-toolkit/sqs'

import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService'

import type { STSClient } from '@aws-sdk/client-sts'
import {
type TopicResolutionOptions,
isCreateTopicCommand,
Expand All @@ -21,8 +22,9 @@ export type SNSSubscriptionOptions = Omit<
> & { updateAttributesIfExists: boolean }

async function resolveTopicArnToSubscribeTo(
topicConfiguration: TopicResolutionOptions,
snsClient: SNSClient,
stsClient: STSClient,
topicConfiguration: TopicResolutionOptions,
extraParams: (ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams) | undefined,
) {
//If topicArn is present, let's use it and return early.
Expand All @@ -32,9 +34,10 @@ async function resolveTopicArnToSubscribeTo(

//If input configuration is capable of creating a topic, let's create it and return its ARN.
if (isCreateTopicCommand(topicConfiguration)) {
return await assertTopic(snsClient, topicConfiguration, {
return await assertTopic(snsClient, stsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: extraParams?.allowedSourceOwner,
forceTagUpdate: extraParams?.forceTagUpdate,
})
}

Expand All @@ -45,12 +48,18 @@ async function resolveTopicArnToSubscribeTo(
export async function subscribeToTopic(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: TopicResolutionOptions,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams,
) {
const topicArn = await resolveTopicArnToSubscribeTo(topicConfiguration, snsClient, extraParams)
const topicArn = await resolveTopicArnToSubscribeTo(
snsClient,
stsClient,
topicConfiguration,
extraParams,
)

const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, {
topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix,
Expand Down
57 changes: 43 additions & 14 deletions packages/sns/lib/utils/snsUtils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type CreateTopicCommandInput,
type SNSClient,
TagResourceCommand,
paginateListTopics,
} from '@aws-sdk/client-sns'
import {
Expand All @@ -12,12 +13,14 @@ import {
SetTopicAttributesCommand,
UnsubscribeCommand,
} from '@aws-sdk/client-sns'
import type { Either } from '@lokalise/node-core'
import { type Either, isError } from '@lokalise/node-core'
import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from '@message-queue-toolkit/sqs'

import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService'

import type { STSClient } from '@aws-sdk/client-sts'
import { generateTopicSubscriptionPolicy } from './snsAttributeUtils'
import { buildTopicArn } from './stsUtils'

type AttributesResult = {
attributes?: Record<string, string>
Expand Down Expand Up @@ -79,16 +82,23 @@ export async function getSubscriptionAttributes(

export async function assertTopic(
snsClient: SNSClient,
stsClient: STSClient,
topicOptions: CreateTopicCommandInput,
extraParams?: ExtraSNSCreationParams,
) {
const command = new CreateTopicCommand(topicOptions)
const response = await snsClient.send(command)

if (!response.TopicArn) {
throw new Error('No topic arn in response')
let topicArn: string
try {
const command = new CreateTopicCommand(topicOptions)
const response = await snsClient.send(command)
if (!response.TopicArn) throw new Error('No topic arn in response')
topicArn = response.TopicArn
} catch (err) {
// We only manually build ARN in case of tag update
if (!extraParams?.forceTagUpdate) throw err
// To build ARN we need topic name and error should be "topic already exist with different tags"
if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err
topicArn = await buildTopicArn(stsClient, topicOptions.Name)
}
const topicArn = response.TopicArn

if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) {
const setTopicAttributesCommand = new SetTopicAttributesCommand({
Expand All @@ -102,21 +112,28 @@ export async function assertTopic(
})
await snsClient.send(setTopicAttributesCommand)
}
if (extraParams?.forceTagUpdate && topicOptions.Tags) {
const tagTopicCommand = new TagResourceCommand({
ResourceArn: topicArn,
Tags: topicOptions.Tags,
})
await snsClient.send(tagTopicCommand)
}

return topicArn
}

export async function deleteTopic(client: SNSClient, topicName: string) {
export async function deleteTopic(snsClient: SNSClient, stsClient: STSClient, topicName: string) {
try {
const topicArn = await assertTopic(client, {
const topicArn = await assertTopic(snsClient, stsClient, {
Name: topicName,
})

const command = new DeleteTopicCommand({
TopicArn: topicArn,
})

await client.send(command)
await snsClient.send(
new DeleteTopicCommand({
TopicArn: topicArn,
}),
)
} catch (_) {
// we don't care it operation has failed
}
Expand Down Expand Up @@ -178,3 +195,15 @@ export async function getTopicArnByName(snsClient: SNSClient, topicName?: string
*/
export const calculateOutgoingMessageSize = (message: unknown) =>
sqsCalculateOutgoingMessageSize(message)

const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) =>
!!error &&
isError(error) &&
'Error' in error &&
!!error.Error &&
typeof error.Error === 'object' &&
'Code' in error.Error &&
'Message' in error.Error &&
typeof error.Error.Message === 'string' &&
error.Error.Code === 'InvalidParameter' &&
error.Error.Message.includes('already exists with different tags')
Loading

0 comments on commit d5ecdf3

Please sign in to comment.