diff --git a/packages/sns/lib/utils/snsSubscriber.ts b/packages/sns/lib/utils/snsSubscriber.ts index 374cd546..aa78ce43 100644 --- a/packages/sns/lib/utils/snsSubscriber.ts +++ b/packages/sns/lib/utils/snsSubscriber.ts @@ -2,7 +2,6 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns' import { SubscribeCommand } from '@aws-sdk/client-sns' import type { SubscribeCommandInput } from '@aws-sdk/client-sns/dist-types/commands/SubscribeCommand' import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs' -import { GetQueueAttributesCommand } from '@aws-sdk/client-sqs' import type { ExtraSQSCreationParams } from '@message-queue-toolkit/sqs' import { assertQueue } from '@message-queue-toolkit/sqs' @@ -26,24 +25,13 @@ export async function subscribeToTopic( const topicArn = await assertTopic(snsClient, topicConfiguration, { queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix, }) - const queueUrl = await assertQueue(sqsClient, queueConfiguration, { + const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, { topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix, }) - const getQueueAttributesCommand = new GetQueueAttributesCommand({ - QueueUrl: queueUrl, - AttributeNames: ['QueueArn'], - }) - const queueAttributesResponse = await sqsClient.send(getQueueAttributesCommand) - const sqsArn = queueAttributesResponse.Attributes?.QueueArn - - if (!sqsArn) { - throw new Error(`Queue ${queueUrl} ARN is not defined`) - } - const subscribeCommand = new SubscribeCommand({ TopicArn: topicArn, - Endpoint: sqsArn, + Endpoint: queueArn, Protocol: 'sqs', ReturnSubscriptionArn: true, ...subscriptionConfiguration, @@ -54,5 +42,6 @@ export async function subscribeToTopic( subscriptionArn: subscriptionResult.SubscriptionArn, topicArn, queueUrl, + queueArn, } } diff --git a/packages/sns/test/consumers/SnsSqsPermissionsConsumerMonoSchema.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionsConsumerMonoSchema.spec.ts index 51759fbe..9092c8bc 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionsConsumerMonoSchema.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionsConsumerMonoSchema.spec.ts @@ -77,7 +77,7 @@ describe('SNS PermissionsConsumer', () => { const topic = await getTopicAttributes(snsClient, newConsumer.topicArn) expect(queue.result?.attributes?.Policy).toBe( - `{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"http://s3.localhost.localstack.cloud:4566/000000000000/policy-queue","Condition":{"ArnLike":{"aws:SourceArn":"dummy*"}}}]}`, + `{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:policy-queue","Condition":{"ArnLike":{"aws:SourceArn":"dummy*"}}}]}`, ) expect(topic.result?.attributes?.Policy).toBe( `{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSQSSubscription","Effect":"Allow","Principal":{"AWS":"*"},"Action":["sns:Subscribe"],"Resource":"arn:aws:sns:eu-west-1:000000000000:policy-topic","Condition":{"StringLike":{"sns:Endpoint":"dummy*"}}}]}`, diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index 97e7838d..26666eb6 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -113,7 +113,7 @@ describe('SNSPermissionPublisher', () => { permissions: perms, } satisfies PERMISSIONS_MESSAGE_TYPE - const queueUrl = await assertQueue(sqsClient, { + const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName, }) diff --git a/packages/sqs/lib/errors/SqsConsumerErrorResolver.spec.ts b/packages/sqs/lib/errors/SqsConsumerErrorResolver.spec.ts new file mode 100644 index 00000000..ccf1f6e1 --- /dev/null +++ b/packages/sqs/lib/errors/SqsConsumerErrorResolver.spec.ts @@ -0,0 +1,16 @@ +import { SqsConsumerErrorResolver } from './SqsConsumerErrorResolver' + +describe('SqsConsumerErrorResolver', () => { + const resolver = new SqsConsumerErrorResolver() + it('Resolves error from standardized error', () => { + const error = resolver.processError({ + message: 'someError', + code: 'ERROR_CODE', + }) + + expect(error).toMatchObject({ + message: 'someError', + errorCode: 'ERROR_CODE', + }) + }) +}) diff --git a/packages/sqs/lib/sqs/AbstractSqsService.ts b/packages/sqs/lib/sqs/AbstractSqsService.ts index d079f817..c2117a51 100644 --- a/packages/sqs/lib/sqs/AbstractSqsService.ts +++ b/packages/sqs/lib/sqs/AbstractSqsService.ts @@ -62,6 +62,8 @@ export abstract class AbstractSqsService< public queueUrl: string // @ts-ignore public queueName: string + // @ts-ignore + public queueArn: string constructor(dependencies: DependenciesType, options: SQSOptionsType) { super(dependencies, options) @@ -73,12 +75,13 @@ export abstract class AbstractSqsService< if (this.deletionConfig && this.creationConfig) { await deleteSqs(this.sqsClient, this.deletionConfig, this.creationConfig) } - const { queueUrl, queueName } = await initSqs( + const { queueUrl, queueName, queueArn } = await initSqs( this.sqsClient, this.locatorConfig, this.creationConfig, ) + this.queueArn = queueArn this.queueUrl = queueUrl this.queueName = queueName } diff --git a/packages/sqs/lib/utils/sqsAttributeUtils.ts b/packages/sqs/lib/utils/sqsAttributeUtils.ts index 2b5ab6e1..683d0b12 100644 --- a/packages/sqs/lib/utils/sqsAttributeUtils.ts +++ b/packages/sqs/lib/utils/sqsAttributeUtils.ts @@ -2,10 +2,10 @@ const POLICY_VERSION = '2012-10-17' export function generateQueuePublishForTopicPolicy( - queueUrl: string, + queueArn: string, supportedSnsTopicArnPrefix: string, ) { - return `{"Version":"${POLICY_VERSION}","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"${queueUrl}","Condition":{"ArnLike":{"aws:SourceArn":"${supportedSnsTopicArnPrefix}"}}}]}` + return `{"Version":"${POLICY_VERSION}","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"${queueArn}","Condition":{"ArnLike":{"aws:SourceArn":"${supportedSnsTopicArnPrefix}"}}}]}` } export function generateWildcardSqsArn(sqsQueueArnPrefix: string) { diff --git a/packages/sqs/lib/utils/sqsInitter.ts b/packages/sqs/lib/utils/sqsInitter.ts index 5ae8f570..ae30eac4 100644 --- a/packages/sqs/lib/utils/sqsInitter.ts +++ b/packages/sqs/lib/utils/sqsInitter.ts @@ -36,16 +36,22 @@ export async function initSqs( ) { // reuse existing queue only if (locatorConfig) { - const checkResult = await getQueueAttributes(sqsClient, locatorConfig) + const checkResult = await getQueueAttributes(sqsClient, locatorConfig, ['QueueArn']) if (checkResult.error === 'not_found') { throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`) } + const queueArn = checkResult.result?.attributes?.QueueArn + if (!queueArn) { + throw new Error('Queue ARN was not set') + } + const queueUrl = locatorConfig.queueUrl const splitUrl = queueUrl.split('/') const queueName = splitUrl[splitUrl.length - 1] return { + queueArn, queueUrl, queueName, } @@ -56,13 +62,14 @@ export async function initSqs( throw new Error('queueConfig.QueueName is mandatory when locator is not provided') } - const queueUrl = await assertQueue(sqsClient, creationConfig.queue, { + const { queueUrl, queueArn } = await assertQueue(sqsClient, creationConfig.queue, { topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, }) const queueName = creationConfig.queue.QueueName return { queueUrl, + queueArn, queueName, } } diff --git a/packages/sqs/lib/utils/sqsUtils.ts b/packages/sqs/lib/utils/sqsUtils.ts index a622bdd9..e469bfe1 100644 --- a/packages/sqs/lib/utils/sqsUtils.ts +++ b/packages/sqs/lib/utils/sqsUtils.ts @@ -64,12 +64,23 @@ export async function assertQueue( throw new Error(`Queue ${queueConfig.QueueName ?? ''} was not created`) } + const getQueueAttributesCommand = new GetQueueAttributesCommand({ + QueueUrl: response.QueueUrl, + AttributeNames: ['QueueArn'], + }) + const queueAttributesResponse = await sqsClient.send(getQueueAttributesCommand) + const queueArn = queueAttributesResponse.Attributes?.QueueArn + + if (!queueArn) { + throw new Error('Queue ARN was not set') + } + if (extraParams?.topicArnsWithPublishPermissionsPrefix) { const setTopicAttributesCommand = new SetQueueAttributesCommand({ QueueUrl: response.QueueUrl, Attributes: { Policy: generateQueuePublishForTopicPolicy( - response.QueueUrl, + queueArn, extraParams.topicArnsWithPublishPermissionsPrefix, ), }, @@ -77,7 +88,10 @@ export async function assertQueue( await sqsClient.send(setTopicAttributesCommand) } - return response.QueueUrl + return { + queueArn, + queueUrl: response.QueueUrl, + } } export async function deleteQueue(client: SQSClient, queueName: string) { diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 3ff34519..ff2d903e 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -37,14 +37,14 @@ "devDependencies": { "@aws-sdk/client-sqs": "^3.385.0", "@message-queue-toolkit/core": "*", - "@types/node": "^20.4.1", - "@typescript-eslint/eslint-plugin": "^6.2.1", - "@typescript-eslint/parser": "^6.2.1", + "@types/node": "^20.5.0", + "@typescript-eslint/eslint-plugin": "^6.4.0", + "@typescript-eslint/parser": "^6.4.0", "@vitest/coverage-v8": "^0.34.1", "awilix": "^8.0.1", "awilix-manager": "^2.0.0", "del-cli": "^5.0.0", - "eslint": "^8.44.0", + "eslint": "^8.47.0", "eslint-config-prettier": "^8.8.0", "eslint-plugin-import": "^2.27.5", "eslint-plugin-prettier": "^5.0.0",