Skip to content

Commit

Permalink
wip support mix of locator and creation configs
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Savin committed Sep 19, 2024
1 parent f93ca52 commit a74e306
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 94 deletions.
2 changes: 1 addition & 1 deletion packages/sns/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export type {
SNSTopicAWSConfig,
SNSTopicConfig,
SNSQueueLocatorType,
SNSTopicLocatorType,
SNSCreationConfig,
SNSDependencies,
} from './lib/sns/AbstractSnsService'
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { resolveOutgoingMessageAttributes } from '@message-queue-toolkit/sqs'

import { calculateOutgoingMessageSize } from '../utils/snsUtils'

import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService'
import type { SNSCreationConfig, SNSDependencies, SNSTopicLocatorType } from './AbstractSnsService'
import { AbstractSnsService } from './AbstractSnsService'

export type SNSMessageOptions = {
Expand All @@ -26,7 +26,7 @@ export type SNSMessageOptions = {

export type SNSPublisherOptions<MessagePayloadType extends object> = QueuePublisherOptions<
SNSCreationConfig,
SNSQueueLocatorType,
SNSTopicLocatorType,
MessagePayloadType
>

Expand Down
11 changes: 6 additions & 5 deletions packages/sns/lib/sns/AbstractSnsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ export type ExtraSNSCreationParams = {
}

export type SNSCreationConfig = {
topic: SNSTopicAWSConfig
topic?: SNSTopicAWSConfig
updateAttributesIfExists?: boolean
} & ExtraSNSCreationParams

export type SNSQueueLocatorType = {
topicArn: string
export type SNSTopicLocatorType = {
topicArn?: string
topicName?: string
}

export type SNSOptions = QueueOptions<SNSCreationConfig, SNSQueueLocatorType>
export type SNSOptions = QueueOptions<SNSCreationConfig, SNSTopicLocatorType>

export abstract class AbstractSnsService<
MessagePayloadType extends object,
Expand All @@ -54,7 +55,7 @@ export abstract class AbstractSnsService<
MessageEnvelopeType,
DependenciesType,
SNSCreationConfig,
SNSQueueLocatorType,
SNSTopicLocatorType,
SNSOptionsType
> {
protected readonly snsClient: SNSClient
Expand Down
8 changes: 4 additions & 4 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import { deleteSnsSqs, initSnsSqs } from '../utils/snsInitter'
import { readSnsMessage } from '../utils/snsMessageReader'
import type { SNSSubscriptionOptions } from '../utils/snsSubscriber'

import type { SNSCreationConfig, SNSOptions, SNSQueueLocatorType } from './AbstractSnsService'
import type { SNSCreationConfig, SNSOptions, SNSTopicLocatorType } from './AbstractSnsService'

export type SNSSQSConsumerDependencies = SQSConsumerDependencies & {
snsClient: SNSClient
}
export type SNSSQSCreationConfig = SQSCreationConfig & SNSCreationConfig

export type SNSSQSQueueLocatorType = SQSQueueLocatorType &
SNSQueueLocatorType & {
export type SNSSQSQueueLocatorType = Partial<SQSQueueLocatorType> &
SNSTopicLocatorType & {
subscriptionArn?: string
}

Expand Down Expand Up @@ -83,7 +83,7 @@ export abstract class AbstractSnsSqsConsumer<
this.snsClient,
this.deletionConfig,
this.creationConfig.queue,
this.creationConfig.topic,
this.creationConfig.topic ?? this.locatorConfig!,
this.subscriptionConfig,
)
} else if (this.deletionConfig && this.creationConfig) {
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/lib/sns/SnsPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type {
SNSMessageOptions,
SNSPublisherOptions,
} from './AbstractSnsPublisher'
import type { SNSCreationConfig, SNSDependencies, SNSQueueLocatorType } from './AbstractSnsService'
import type { SNSCreationConfig, SNSDependencies, SNSTopicLocatorType } from './AbstractSnsService'
import type { SnsPublisherFactory } from './CommonSnsPublisherFactory'
import { CommonSnsPublisherFactory } from './CommonSnsPublisherFactory'

Expand Down Expand Up @@ -53,7 +53,7 @@ export class SnsPublisherManager<
AbstractSnsPublisher<z.infer<SupportedEventDefinitions[number]['publisherSchema']>>,
SNSDependencies,
SNSCreationConfig,
SNSQueueLocatorType,
SNSTopicLocatorType,
SnsMessageSchemaType<SnsAwareEventDefinition>,
Omit<
SNSPublisherOptions<z.infer<SupportedEventDefinitions[number]['publisherSchema']>>,
Expand Down
12 changes: 12 additions & 0 deletions packages/sns/lib/types/TopicTypes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { CreateTopicCommandInput } from '@aws-sdk/client-sns'
import { SNSTopicLocatorType } from '../sns/AbstractSnsService'

export type TopicResolutionOptions = CreateTopicCommandInput | SNSTopicLocatorType

export function isCreateTopicCommand(value: unknown): value is CreateTopicCommandInput {
return !!(value as CreateTopicCommandInput).Name
}

export function isSNSTopicLocatorType(value: unknown): value is SNSTopicLocatorType {
return !!(value as SNSTopicLocatorType).topicArn || !!(value as SNSTopicLocatorType).topicName
}
62 changes: 41 additions & 21 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'
import type { SQSCreationConfig } from '@message-queue-toolkit/sqs'
import { SQSCreationConfig, SQSQueueLocatorType } from '@message-queue-toolkit/sqs'
import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'

import type { SNSCreationConfig, SNSQueueLocatorType } from '../sns/AbstractSnsService'
import type { SNSCreationConfig, SNSTopicLocatorType } from '../sns/AbstractSnsService'
import type { SNSSQSQueueLocatorType } from '../sns/AbstractSnsSqsConsumer'

import type { SNSSubscriptionOptions } from './snsSubscriber'
import { subscribeToTopic } from './snsSubscriber'
import { assertTopic, deleteSubscription, deleteTopic, getTopicAttributes } from './snsUtils'
import { assertTopic, deleteSubscription, deleteTopic, getTopicArnByName, getTopicAttributes } from './snsUtils'
import { isCreateTopicCommand, TopicResolutionOptions } from '../types/TopicTypes'

export async function initSnsSqs(
sqsClient: SQSClient,
Expand All @@ -21,9 +22,9 @@ export async function initSnsSqs(
extraParams?: ExtraParams,
) {
if (!locatorConfig?.subscriptionArn) {
if (!creationConfig?.topic) {
if (!creationConfig?.topic && !locatorConfig?.topicArn && !locatorConfig?.topicName) {
throw new Error(
'If locatorConfig.subscriptionArn is not specified, creationConfig.topic parameter is mandatory, as there will be an attempt to create the missing topic',
'If locatorConfig.subscriptionArn is not specified, creationConfig.topic, localtorConfig.name or locatorConfig.topicArn parameter is mandatory, as there will be an attempt to create the missing topic',
)
}
if (!creationConfig?.queue) {
Expand All @@ -42,11 +43,18 @@ export async function initSnsSqs(
)
}

let subscriptionTopicArn = locatorConfig ? locatorConfig.topicArn ?? await getTopicArnByName(snsClient, locatorConfig.topicName)
: undefined

const topicResolutionOptions: TopicResolutionOptions = subscriptionTopicArn ? {
topicArn: subscriptionTopicArn,
} : creationConfig.topic!

const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic(
sqsClient,
snsClient,
creationConfig.queue,
creationConfig.topic,
topicResolutionOptions,
subscriptionConfig,
{
updateAttributesIfExists: creationConfig.updateAttributesIfExists,
Expand All @@ -69,8 +77,10 @@ export async function initSnsSqs(
}

// Check for existing resources, using the locators
const queuePromise = getQueueAttributes(sqsClient, locatorConfig)
const topicPromise = getTopicAttributes(snsClient, locatorConfig.topicArn)
const queuePromise = getQueueAttributes(sqsClient, (locatorConfig as SQSQueueLocatorType).queueUrl)

let subscriptionTopicArn = locatorConfig.topicArn ?? await getTopicArnByName(snsClient, locatorConfig.topicName)
const topicPromise = getTopicAttributes(snsClient, subscriptionTopicArn)

const [queueCheckResult, topicCheckResult] = await Promise.all([queuePromise, topicPromise])

Expand All @@ -81,12 +91,15 @@ export async function initSnsSqs(
throw new Error(`Topic with topicArn ${locatorConfig.topicArn} does not exist.`)
}

const splitUrl = locatorConfig.queueUrl.split('/')
const splitUrl = (locatorConfig as SQSQueueLocatorType).queueUrl.split('/')
const queueName = splitUrl[splitUrl.length - 1]

if (!locatorConfig.queueUrl) {
throw new Error('queueUrl not set on locator')
}
return {
subscriptionArn: locatorConfig.subscriptionArn,
topicArn: locatorConfig.topicArn,
topicArn: subscriptionTopicArn,
queueUrl: locatorConfig.queueUrl,
queueName,
}
Expand All @@ -97,7 +110,7 @@ export async function deleteSnsSqs(
snsClient: SNSClient,
deletionConfig: DeletionConfig,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput,
topicConfiguration: TopicResolutionOptions,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraParams,
) {
Expand Down Expand Up @@ -130,8 +143,12 @@ export async function deleteSnsSqs(
queueConfiguration.QueueName!,
deletionConfig.waitForConfirmation !== false,
)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await deleteTopic(snsClient, topicConfiguration.Name!)

const topicName = isCreateTopicCommand(topicConfiguration) ? topicConfiguration.Name : topicConfiguration.topicName
if (!topicName) {
throw new Error('Failed to resolve topic name')
}
await deleteTopic(snsClient, topicName)
await deleteSubscription(snsClient, subscriptionArn)
}

Expand All @@ -150,26 +167,29 @@ export async function deleteSns(
)
}

if (!creationConfig.topic.Name) {
throw new Error('topic.Name must be set for automatic deletion')
}
if (creationConfig.topic) {
if (!creationConfig.topic.Name) {
throw new Error('topic.Name must be set for automatic deletion')
}

await deleteTopic(snsClient, creationConfig.topic.Name)
await deleteTopic(snsClient, creationConfig.topic.Name)
}
}

export async function initSns(
snsClient: SNSClient,
locatorConfig?: SNSQueueLocatorType,
locatorConfig?: SNSTopicLocatorType,
creationConfig?: SNSCreationConfig,
) {
if (locatorConfig) {
const checkResult = await getTopicAttributes(snsClient, locatorConfig.topicArn)
const topicArn = locatorConfig.topicArn ?? await getTopicArnByName(snsClient, locatorConfig.topicName)
const checkResult = await getTopicAttributes(snsClient, topicArn)
if (checkResult.error === 'not_found') {
throw new Error(`Topic with topicArn ${locatorConfig.topicArn} does not exist.`)
}

return {
topicArn: locatorConfig.topicArn,
topicArn,
}
}

Expand All @@ -179,7 +199,7 @@ export async function initSns(
'When locatorConfig for the topic is not specified, creationConfig of the topic is mandatory',
)
}
const topicArn = await assertTopic(snsClient, creationConfig.topic, {
const topicArn = await assertTopic(snsClient, creationConfig.topic!, {
queueUrlsWithSubscribePermissionsPrefix: creationConfig.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: creationConfig.allowedSourceOwner,
})
Expand Down
23 changes: 16 additions & 7 deletions packages/sns/lib/utils/snsSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { assertQueue } from '@message-queue-toolkit/sqs'

import type { ExtraSNSCreationParams } from '../sns/AbstractSnsService'

import { assertTopic, findSubscriptionByTopicAndQueue } from './snsUtils'
import { assertTopic, findSubscriptionByTopicAndQueue, getTopicArnByName } from './snsUtils'
import { isCreateTopicCommand, isSNSTopicLocatorType, TopicResolutionOptions } from '../types/TopicTypes'

export type SNSSubscriptionOptions = Omit<
SubscribeCommandInput,
Expand All @@ -19,14 +20,22 @@ export async function subscribeToTopic(
sqsClient: SQSClient,
snsClient: SNSClient,
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput,
topicConfiguration: TopicResolutionOptions,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams,
) {
const topicArn = await assertTopic(snsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: extraParams?.allowedSourceOwner,
})
let topicArn = isSNSTopicLocatorType(topicConfiguration) ? topicConfiguration.topicArn : undefined

if (!topicArn) {
if (isCreateTopicCommand(topicConfiguration)) {
topicArn = await assertTopic(snsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
allowedSourceOwner: extraParams?.allowedSourceOwner,
})
} else {
topicArn = await getTopicArnByName(snsClient, topicConfiguration.topicName)
}
}
const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, {
topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix,
updateAttributesIfExists: extraParams?.updateAttributesIfExists,
Expand All @@ -53,7 +62,7 @@ export async function subscribeToTopic(
// @ts-ignore
logger.error(
`Error while creating subscription for queue "${queueConfiguration.QueueName}", topic "${
topicConfiguration.Name
isCreateTopicCommand(topicConfiguration) ? topicConfiguration.Name : topicConfiguration.topicName
}": ${(err as Error).message}`,
)

Expand Down
19 changes: 18 additions & 1 deletion packages/sns/lib/utils/snsUtils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
import { CreateTopicCommandInput, ListTopicsCommand, paginateListTopics, SNSClient } from '@aws-sdk/client-sns'
import {
CreateTopicCommand,
DeleteTopicCommand,
Expand Down Expand Up @@ -144,6 +144,23 @@ export async function findSubscriptionByTopicAndQueue(
})
}

export async function getTopicArnByName(snsClient: SNSClient, topicName?: string): Promise<string> {
if (!topicName) {
throw new Error('topicName is not provided')
}

// Use paginator to automatically handle NextToken
const paginator = paginateListTopics({ client: snsClient }, {});
for await (const page of paginator) {
for (const topic of page.Topics || []) {
if (topic.TopicArn?.includes(topicName)) {
return topic.TopicArn;
}
}
}
throw new Error(`Failed to resolve topic by name ${topicName}`)
}

/**
* Calculates the size of an outgoing SNS message.
*
Expand Down
Loading

0 comments on commit a74e306

Please sign in to comment.