Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNS Tags update #222

Merged
merged 24 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
2 changes: 1 addition & 1 deletion packages/sns/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
environment:
- SERVICES=sns,sqs,s3
- SERVICES=sns,sqs,s3,sts
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- DOCKER_HOST=unix:///var/run/docker.sock
Expand Down
1 change: 1 addition & 0 deletions packages/sns/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export {
findSubscriptionByTopicAndQueue,
getSubscriptionAttributes,
} from './lib/utils/snsUtils'
export { clearCachedCallerIdentity } from './lib/utils/stsUtils'

export { subscribeToTopic } from './lib/utils/snsSubscriber'
export { initSns, initSnsSqs } from './lib/utils/snsInitter'
Expand Down
14 changes: 12 additions & 2 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient, Tag } from '@aws-sdk/client-sn
import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNS_MESSAGE_BODY_TYPE } from '../types/MessageTypes'
import { deleteSns, initSns } from '../utils/snsInitter'

Expand All @@ -10,6 +11,7 @@ export const SNS_MESSAGE_MAX_SIZE = 256 * 1024 // 256KB

export type SNSDependencies = QueueDependencies & {
snsClient: SNSClient
stsClient: STSClient
Comment on lines 13 to +14
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new dependency as discussed

}

export type SNSTopicAWSConfig = CreateTopicCommandInput
Expand All @@ -31,6 +33,7 @@ export type SNSTopicConfig = {
export type ExtraSNSCreationParams = {
queueUrlsWithSubscribePermissionsPrefix?: string | readonly string[]
allowedSourceOwner?: string
forceTagUpdate?: boolean
}

export type SNSCreationConfig = {
Expand Down Expand Up @@ -59,21 +62,28 @@ export abstract class AbstractSnsService<
SNSOptionsType
> {
protected readonly snsClient: SNSClient
protected readonly stsClient: STSClient
// @ts-ignore
protected topicArn: string

constructor(dependencies: DependenciesType, options: SNSOptionsType) {
super(dependencies, options)

this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

public async init() {
if (this.deletionConfig && this.creationConfig) {
await deleteSns(this.snsClient, this.deletionConfig, this.creationConfig)
await deleteSns(this.snsClient, this.stsClient, this.deletionConfig, this.creationConfig)
}

const initResult = await initSns(this.snsClient, this.locatorConfig, this.creationConfig)
const initResult = await initSns(
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
)
this.topicArn = initResult.topicArn
this.isInitted = true
}
Expand Down
6 changes: 6 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter'
import { readSnsMessage } from '../utils/snsMessageReader'
import type { SNSSubscriptionOptions } from '../utils/snsSubscriber'

import type { STSClient } from '@aws-sdk/client-sts'
import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService'

export type SNSSQSConsumerDependencies = SQSConsumerDependencies & {
snsClient: SNSClient
stsClient: STSClient
}
export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig

Expand Down Expand Up @@ -53,6 +55,7 @@ export abstract class AbstractSnsSqsConsumer<
> {
private readonly subscriptionConfig?: SNSSubscriptionOptions
private readonly snsClient: SNSClient
private readonly stsClient: STSClient

// @ts-ignore
protected topicArn: string
Expand All @@ -74,13 +77,15 @@ export abstract class AbstractSnsSqsConsumer<

this.subscriptionConfig = options.subscriptionConfig
this.snsClient = dependencies.snsClient
this.stsClient = dependencies.stsClient
}

override async init(): Promise<void> {
if (this.deletionConfig && this.creationConfig && this.subscriptionConfig) {
await deleteSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.deletionConfig,
this.creationConfig.queue,
this.creationConfig.topic,
Expand All @@ -95,6 +100,7 @@ export abstract class AbstractSnsSqsConsumer<
const initSnsSqsResult = await initSnsSqs(
this.sqsClient,
this.snsClient,
this.stsClient,
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
Expand Down
1 change: 1 addition & 0 deletions packages/sns/lib/sns/SnsPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export class SnsPublisherManager<
newPublisherOptions: options.newPublisherOptions,
publisherDependencies: {
snsClient: dependencies.snsClient,
stsClient: dependencies.stsClient,
logger: dependencies.logger,
errorReporter: dependencies.errorReporter,
},
Expand Down
14 changes: 11 additions & 3 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer'

import type { STSClient } from '@aws-sdk/client-sts'
import type { Either } from '@lokalise/node-core'
import { type TopicResolutionOptions, isCreateTopicCommand } from '../types/TopicTypes'
import type { SNSSubscriptionOptions } from './snsSubscriber'
Expand All @@ -24,6 +25,7 @@ import {
export async function initSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSSQSQueueLocatorType,
creationConfig?: SNSCreationConfig & SQSCreationConfig,
subscriptionConfig?: SNSSubscriptionOptions,
Expand Down Expand Up @@ -59,6 +61,7 @@ export async function initSnsSqs(
const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
creationConfig.queue,
topicResolutionOptions,
subscriptionConfig,
Expand Down Expand Up @@ -132,6 +135,7 @@ export async function initSnsSqs(
export async function deleteSnsSqs(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput | undefined,
Expand All @@ -152,6 +156,7 @@ export async function deleteSnsSqs(
const { subscriptionArn } = await subscribeToTopic(
sqsClient,
snsClient,
stsClient,
queueConfiguration,
topicConfiguration ?? topicLocator!,
subscriptionConfiguration,
Expand All @@ -176,13 +181,14 @@ export async function deleteSnsSqs(
if (!topicName) {
throw new Error('Failed to resolve topic name')
}
await deleteTopic(snsClient, topicName)
await deleteTopic(snsClient, stsClient, topicName)
}
await deleteSubscription(snsClient, subscriptionArn)
}

export async function deleteSns(
snsClient: SNSClient,
stsClient: STSClient,
deletionConfig: DeletionConfig,
creationConfig: SNSCreationConfig,
) {
Expand All @@ -200,11 +206,12 @@ export async function deleteSns(
throw new Error('topic.Name must be set for automatic deletion')
}

await deleteTopic(snsClient, creationConfig.topic.Name)
await deleteTopic(snsClient, stsClient, creationConfig.topic.Name)
}

export async function initSns(
snsClient: SNSClient,
stsClient: STSClient,
locatorConfig?: SNSTopicLocatorType,
creationConfig?: SNSCreationConfig,
) {
Expand All @@ -227,9 +234,10 @@ export async function initSns(
'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory',
)
}
const topicArn = await assertTopic(snsClient, creationConfig.topic!, {
const topicArn = await assertTopic(snsClient, stsClient, creationConfig.topic!, {
queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: creationConfig.allowedSourceOwner,
forceTagUpdate: creationConfig.forceTagUpdate,
})
return {
topicArn,
Expand Down
15 changes: 12 additions & 3 deletions packages/sns/lib/utils/snsSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { assertQueue } from '@message-queue-toolkit/sqs'

import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService'

import type { STSClient } from '@aws-sdk/client-sts'
import {
type TopicResolutionOptions,
isCreateTopicCommand,
Expand All @@ -21,8 +22,9 @@ export type SNSSubscriptionOptions = Omit<
> & { updateAttributesIfExists: boolean }

async function resolveTopicArnToSubscribeTo(
topicConfiguration: TopicResolutionOptions,
snsClient: SNSClient,
stsClient: STSClient,
topicConfiguration: TopicResolutionOptions,
extraParams: (ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams) | undefined,
) {
//If topicArn is present, let's use it and return early.
Expand All @@ -32,9 +34,10 @@ async function resolveTopicArnToSubscribeTo(

//If input configuration is capable of creating a topic, let's create it and return its ARN.
if (isCreateTopicCommand(topicConfiguration)) {
return await assertTopic(snsClient, topicConfiguration, {
return await assertTopic(snsClient, stsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: extraParams?.allowedSourceOwner,
forceTagUpdate: extraParams?.forceTagUpdate,
})
}

Expand All @@ -45,12 +48,18 @@ async function resolveTopicArnToSubscribeTo(
export async function subscribeToTopic(
sqsClient: SQSClient,
snsClient: SNSClient,
stsClient: STSClient,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: TopicResolutionOptions,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams,
) {
const topicArn = await resolveTopicArnToSubscribeTo(topicConfiguration, snsClient, extraParams)
const topicArn = await resolveTopicArnToSubscribeTo(
snsClient,
stsClient,
topicConfiguration,
extraParams,
)

const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, {
topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix,
Expand Down
57 changes: 43 additions & 14 deletions packages/sns/lib/utils/snsUtils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type CreateTopicCommandInput,
type SNSClient,
TagResourceCommand,
paginateListTopics,
} from '@aws-sdk/client-sns'
import {
Expand All @@ -12,12 +13,14 @@ import {
SetTopicAttributesCommand,
UnsubscribeCommand,
} from '@aws-sdk/client-sns'
import type { Either } from '@lokalise/node-core'
import { type Either, isError } from '@lokalise/node-core'
import { calculateOutgoingMessageSize as sqsCalculateOutgoingMessageSize } from '@message-queue-toolkit/sqs'

import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService'

import type { STSClient } from '@aws-sdk/client-sts'
import { generateTopicSubscriptionPolicy } from './snsAttributeUtils'
import { buildTopicArn } from './stsUtils'

type AttributesResult = {
attributes?: Record<string, string>
Expand Down Expand Up @@ -79,16 +82,23 @@ export async function getSubscriptionAttributes(

export async function assertTopic(
snsClient: SNSClient,
stsClient: STSClient,
topicOptions: CreateTopicCommandInput,
extraParams?: ExtraSNSCreationParams,
) {
const command = new CreateTopicCommand(topicOptions)
const response = await snsClient.send(command)

if (!response.TopicArn) {
throw new Error('No topic arn in response')
let topicArn: string
try {
const command = new CreateTopicCommand(topicOptions)
const response = await snsClient.send(command)
if (!response.TopicArn) throw new Error('No topic arn in response')
topicArn = response.TopicArn
} catch (err) {
// We only manually build ARN in case of tag update
if (!extraParams?.forceTagUpdate) throw err
// To build ARN we need topic name and error should be "topic already exist with different tags"
if (!topicOptions.Name || !isTopicAlreadyExistWithDifferentTagsError(err)) throw err
topicArn = await buildTopicArn(stsClient, topicOptions.Name)
}
const topicArn = response.TopicArn

if (extraParams?.queueUrlsWithSubscribePermissionsPrefix || extraParams?.allowedSourceOwner) {
const setTopicAttributesCommand = new SetTopicAttributesCommand({
Expand All @@ -102,21 +112,28 @@ export async function assertTopic(
})
await snsClient.send(setTopicAttributesCommand)
}
if (extraParams?.forceTagUpdate && topicOptions.Tags) {
const tagTopicCommand = new TagResourceCommand({
ResourceArn: topicArn,
Tags: topicOptions.Tags,
})
await snsClient.send(tagTopicCommand)
}

return topicArn
}

export async function deleteTopic(client: SNSClient, topicName: string) {
export async function deleteTopic(snsClient: SNSClient, stsClient: STSClient, topicName: string) {
try {
const topicArn = await assertTopic(client, {
const topicArn = await assertTopic(snsClient, stsClient, {
Name: topicName,
})

const command = new DeleteTopicCommand({
TopicArn: topicArn,
})

await client.send(command)
await snsClient.send(
new DeleteTopicCommand({
TopicArn: topicArn,
}),
)
} catch (_) {
// we don't care it operation has failed
}
Expand Down Expand Up @@ -178,3 +195,15 @@ export async function getTopicArnByName(snsClient: SNSClient, topicName?: string
*/
export const calculateOutgoingMessageSize = (message: unknown) =>
sqsCalculateOutgoingMessageSize(message)

const isTopicAlreadyExistWithDifferentTagsError = (error: unknown) =>
!!error &&
isError(error) &&
'Error' in error &&
!!error.Error &&
typeof error.Error === 'object' &&
'Code' in error.Error &&
'Message' in error.Error &&
typeof error.Error.Message === 'string' &&
error.Error.Code === 'InvalidParameter' &&
error.Error.Message.includes('already exists with different tags')
Loading
Loading