Skip to content

Commit

Permalink
Use SQS ARN instead of SQS url for resource setting (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored Aug 16, 2023
1 parent 52bfa0a commit 1872aae
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 27 deletions.
17 changes: 3 additions & 14 deletions packages/sns/lib/utils/snsSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
import { SubscribeCommand } from '@aws-sdk/client-sns'
import type { SubscribeCommandInput } from '@aws-sdk/client-sns/dist-types/commands/SubscribeCommand'
import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import { GetQueueAttributesCommand } from '@aws-sdk/client-sqs'
import type { ExtraSQSCreationParams } from '@message-queue-toolkit/sqs'
import { assertQueue } from '@message-queue-toolkit/sqs'

Expand All @@ -26,24 +25,13 @@ export async function subscribeToTopic(
const topicArn = await assertTopic(snsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
})
const queueUrl = await assertQueue(sqsClient, queueConfiguration, {
const { queueUrl, queueArn } = await assertQueue(sqsClient, queueConfiguration, {
topicArnsWithPublishPermissionsPrefix: extraParams?.topicArnsWithPublishPermissionsPrefix,
})

const getQueueAttributesCommand = new GetQueueAttributesCommand({
QueueUrl: queueUrl,
AttributeNames: ['QueueArn'],
})
const queueAttributesResponse = await sqsClient.send(getQueueAttributesCommand)
const sqsArn = queueAttributesResponse.Attributes?.QueueArn

if (!sqsArn) {
throw new Error(`Queue ${queueUrl} ARN is not defined`)
}

const subscribeCommand = new SubscribeCommand({
TopicArn: topicArn,
Endpoint: sqsArn,
Endpoint: queueArn,
Protocol: 'sqs',
ReturnSubscriptionArn: true,
...subscriptionConfiguration,
Expand All @@ -54,5 +42,6 @@ export async function subscribeToTopic(
subscriptionArn: subscriptionResult.SubscriptionArn,
topicArn,
queueUrl,
queueArn,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('SNS PermissionsConsumer', () => {
const topic = await getTopicAttributes(snsClient, newConsumer.topicArn)

expect(queue.result?.attributes?.Policy).toBe(
`{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"http://s3.localhost.localstack.cloud:4566/000000000000/policy-queue","Condition":{"ArnLike":{"aws:SourceArn":"dummy*"}}}]}`,
`{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"arn:aws:sqs:eu-west-1:000000000000:policy-queue","Condition":{"ArnLike":{"aws:SourceArn":"dummy*"}}}]}`,
)
expect(topic.result?.attributes?.Policy).toBe(
`{"Version":"2012-10-17","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSQSSubscription","Effect":"Allow","Principal":{"AWS":"*"},"Action":["sns:Subscribe"],"Resource":"arn:aws:sns:eu-west-1:000000000000:policy-topic","Condition":{"StringLike":{"sns:Endpoint":"dummy*"}}}]}`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ describe('SNSPermissionPublisher', () => {
permissions: perms,
} satisfies PERMISSIONS_MESSAGE_TYPE

const queueUrl = await assertQueue(sqsClient, {
const { queueUrl } = await assertQueue(sqsClient, {
QueueName: queueName,
})

Expand Down
16 changes: 16 additions & 0 deletions packages/sqs/lib/errors/SqsConsumerErrorResolver.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { SqsConsumerErrorResolver } from './SqsConsumerErrorResolver'

describe('SqsConsumerErrorResolver', () => {
const resolver = new SqsConsumerErrorResolver()
it('Resolves error from standardized error', () => {
const error = resolver.processError({
message: 'someError',
code: 'ERROR_CODE',
})

expect(error).toMatchObject({
message: 'someError',
errorCode: 'ERROR_CODE',
})
})
})
5 changes: 4 additions & 1 deletion packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export abstract class AbstractSqsService<
public queueUrl: string
// @ts-ignore
public queueName: string
// @ts-ignore
public queueArn: string

constructor(dependencies: DependenciesType, options: SQSOptionsType) {
super(dependencies, options)
Expand All @@ -73,12 +75,13 @@ export abstract class AbstractSqsService<
if (this.deletionConfig && this.creationConfig) {
await deleteSqs(this.sqsClient, this.deletionConfig, this.creationConfig)
}
const { queueUrl, queueName } = await initSqs(
const { queueUrl, queueName, queueArn } = await initSqs(
this.sqsClient,
this.locatorConfig,
this.creationConfig,
)

this.queueArn = queueArn
this.queueUrl = queueUrl
this.queueName = queueName
}
Expand Down
4 changes: 2 additions & 2 deletions packages/sqs/lib/utils/sqsAttributeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
const POLICY_VERSION = '2012-10-17'

export function generateQueuePublishForTopicPolicy(
queueUrl: string,
queueArn: string,
supportedSnsTopicArnPrefix: string,
) {
return `{"Version":"${POLICY_VERSION}","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"${queueUrl}","Condition":{"ArnLike":{"aws:SourceArn":"${supportedSnsTopicArnPrefix}"}}}]}`
return `{"Version":"${POLICY_VERSION}","Id":"__default_policy_ID","Statement":[{"Sid":"AllowSNSPublish","Effect":"Allow","Principal":{"AWS":"*"},"Action":"sqs:SendMessage","Resource":"${queueArn}","Condition":{"ArnLike":{"aws:SourceArn":"${supportedSnsTopicArnPrefix}"}}}]}`
}

export function generateWildcardSqsArn(sqsQueueArnPrefix: string) {
Expand Down
11 changes: 9 additions & 2 deletions packages/sqs/lib/utils/sqsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ export async function initSqs(
) {
// reuse existing queue only
if (locatorConfig) {
const checkResult = await getQueueAttributes(sqsClient, locatorConfig)
const checkResult = await getQueueAttributes(sqsClient, locatorConfig, ['QueueArn'])
if (checkResult.error === 'not_found') {
throw new Error(`Queue with queueUrl ${locatorConfig.queueUrl} does not exist.`)
}

const queueArn = checkResult.result?.attributes?.QueueArn
if (!queueArn) {
throw new Error('Queue ARN was not set')
}

const queueUrl = locatorConfig.queueUrl

const splitUrl = queueUrl.split('/')
const queueName = splitUrl[splitUrl.length - 1]
return {
queueArn,
queueUrl,
queueName,
}
Expand All @@ -56,13 +62,14 @@ export async function initSqs(
throw new Error('queueConfig.QueueName is mandatory when locator is not provided')
}

const queueUrl = await assertQueue(sqsClient, creationConfig.queue, {
const { queueUrl, queueArn } = await assertQueue(sqsClient, creationConfig.queue, {
topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix,
})
const queueName = creationConfig.queue.QueueName

return {
queueUrl,
queueArn,
queueName,
}
}
18 changes: 16 additions & 2 deletions packages/sqs/lib/utils/sqsUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,34 @@ export async function assertQueue(
throw new Error(`Queue ${queueConfig.QueueName ?? ''} was not created`)
}

const getQueueAttributesCommand = new GetQueueAttributesCommand({
QueueUrl: response.QueueUrl,
AttributeNames: ['QueueArn'],
})
const queueAttributesResponse = await sqsClient.send(getQueueAttributesCommand)
const queueArn = queueAttributesResponse.Attributes?.QueueArn

if (!queueArn) {
throw new Error('Queue ARN was not set')
}

if (extraParams?.topicArnsWithPublishPermissionsPrefix) {
const setTopicAttributesCommand = new SetQueueAttributesCommand({
QueueUrl: response.QueueUrl,
Attributes: {
Policy: generateQueuePublishForTopicPolicy(
response.QueueUrl,
queueArn,
extraParams.topicArnsWithPublishPermissionsPrefix,
),
},
})
await sqsClient.send(setTopicAttributesCommand)
}

return response.QueueUrl
return {
queueArn,
queueUrl: response.QueueUrl,
}
}

export async function deleteQueue(client: SQSClient, queueName: string) {
Expand Down
8 changes: 4 additions & 4 deletions packages/sqs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
"devDependencies": {
"@aws-sdk/client-sqs": "^3.385.0",
"@message-queue-toolkit/core": "*",
"@types/node": "^20.4.1",
"@typescript-eslint/eslint-plugin": "^6.2.1",
"@typescript-eslint/parser": "^6.2.1",
"@types/node": "^20.5.0",
"@typescript-eslint/eslint-plugin": "^6.4.0",
"@typescript-eslint/parser": "^6.4.0",
"@vitest/coverage-v8": "^0.34.1",
"awilix": "^8.0.1",
"awilix-manager": "^2.0.0",
"del-cli": "^5.0.0",
"eslint": "^8.44.0",
"eslint": "^8.47.0",
"eslint-config-prettier": "^8.8.0",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^5.0.0",
Expand Down

0 comments on commit 1872aae

Please sign in to comment.