Skip to content

Commit

Permalink
Extra logging for subscription failures (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored Oct 6, 2023
1 parent f73fbf6 commit dcaaf2e
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 17 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- rabbit_data:/var/lib/rabbitmq

localstack:
image: localstack/localstack:2.1.0
image: localstack/localstack:2.3.2
network_mode: bridge
hostname: localstack
ports:
Expand All @@ -18,9 +18,9 @@ services:
- SERVICES=sns,sqs
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- LAMBDA_EXECUTOR=local
- DOCKER_HOST=unix:///var/run/docker.sock
- HOSTNAME_EXTERNAL=localstack
- LOCALSTACK_HOST=localstack
# - LOCALSTACK_API_KEY=someDummyKey
volumes:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
Expand Down
1 change: 1 addition & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type {
TransactionObservabilityManager,
Logger,
SchemaMap,
ExtraParams,
} from './lib/types/MessageQueueTypes'

export { AbstractQueueService } from './lib/queues/AbstractQueueService'
Expand Down
4 changes: 4 additions & 0 deletions packages/core/lib/types/MessageQueueTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export type LogFn = {
(msg: string, ...args: any[]): void
}

export type ExtraParams = {
logger?: Logger
}

export type Logger = {
error: LogFn
info: LogFn
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
localstack:
image: localstack/localstack:2.1.0
image: localstack/localstack:2.3.2
network_mode: bridge
hostname: localstack
ports:
Expand All @@ -10,9 +10,9 @@ services:
- SERVICES=sns,sqs
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- LAMBDA_EXECUTOR=local
- DOCKER_HOST=unix:///var/run/docker.sock
- HOSTNAME_EXTERNAL=localstack
- LOCALSTACK_HOST=localstack
# - LOCALSTACK_API_KEY=someDummyKey
volumes:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
Expand Down
6 changes: 6 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumerMonoSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ export abstract class AbstractSnsSqsConsumerMonoSchema<
this.creationConfig.queue,
this.creationConfig.topic,
this.subscriptionConfig,
{
logger: this.logger,
},
)
} else if (this.deletionConfig && this.creationConfig) {
await deleteSqs(this.sqsClient, this.deletionConfig, this.creationConfig)
Expand All @@ -123,6 +126,9 @@ export abstract class AbstractSnsSqsConsumerMonoSchema<
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
{
logger: this.logger,
},
)
this.queueUrl = initSnsSqsResult.queueUrl
this.topicArn = initSnsSqsResult.topicArn
Expand Down
3 changes: 3 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ export abstract class AbstractSnsSqsConsumerMultiSchema<
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
{
logger: this.logger,
},
)
this.queueUrl = initSnsSqsResult.queueUrl
this.topicArn = initSnsSqsResult.topicArn
Expand Down
6 changes: 5 additions & 1 deletion packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { SNSClient, CreateTopicCommandInput } from '@aws-sdk/client-sns'
import type { SQSClient, CreateQueueCommandInput } from '@aws-sdk/client-sqs'
import type { DeletionConfig } from '@message-queue-toolkit/core'
import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'
import type { SQSCreationConfig } from '@message-queue-toolkit/sqs'
import { deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
Expand All @@ -18,6 +18,7 @@ export async function initSnsSqs(
locatorConfig?: SNSSQSQueueLocatorType,
creationConfig?: SNSCreationConfig & SQSCreationConfig,
subscriptionConfig?: SNSSubscriptionOptions,
extraParams?: ExtraParams,
) {
if (!locatorConfig?.subscriptionArn) {
if (!creationConfig?.topic) {
Expand Down Expand Up @@ -46,6 +47,7 @@ export async function initSnsSqs(
queueUrlsWithSubscribePermissionsPrefix:
creationConfig.queueUrlsWithSubscribePermissionsPrefix,
topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix,
logger: extraParams?.logger,
},
)
if (!subscriptionArn) {
Expand Down Expand Up @@ -86,6 +88,7 @@ export async function deleteSnsSqs(
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraParams,
) {
if (!deletionConfig.deleteIfExists) {
return
Expand All @@ -103,6 +106,7 @@ export async function deleteSnsSqs(
queueConfiguration,
topicConfiguration,
subscriptionConfiguration,
extraParams,
)

if (!subscriptionArn) {
Expand Down
26 changes: 19 additions & 7 deletions packages/sns/lib/utils/snsSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns'
import { SubscribeCommand } from '@aws-sdk/client-sns'
import type { SubscribeCommandInput } from '@aws-sdk/client-sns/dist-types/commands/SubscribeCommand'
import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import type { ExtraParams } from '@message-queue-toolkit/core'
import type { ExtraSQSCreationParams } from '@message-queue-toolkit/sqs'
import { assertQueue } from '@message-queue-toolkit/sqs'

Expand All @@ -20,7 +21,7 @@ export async function subscribeToTopic(
queueConfiguration: CreateQueueCommandInput,
topicConfiguration: CreateTopicCommandInput,
subscriptionConfiguration: SNSSubscriptionOptions,
extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams,
extraParams?: ExtraSNSCreationParams & ExtraSQSCreationParams & ExtraParams,
) {
const topicArn = await assertTopic(snsClient, topicConfiguration, {
queueUrlsWithSubscribePermissionsPrefix: extraParams?.queueUrlsWithSubscribePermissionsPrefix,
Expand All @@ -37,11 +38,22 @@ export async function subscribeToTopic(
...subscriptionConfiguration,
})

const subscriptionResult = await snsClient.send(subscribeCommand)
return {
subscriptionArn: subscriptionResult.SubscriptionArn,
topicArn,
queueUrl,
queueArn,
try {
const subscriptionResult = await snsClient.send(subscribeCommand)
return {
subscriptionArn: subscriptionResult.SubscriptionArn,
topicArn,
queueUrl,
queueArn,
}
} catch (err) {
const logger = extraParams?.logger ?? console
// @ts-ignore
logger.error(
`Error while creating subscription for queue "${queueConfiguration.QueueName}", topic "${
topicConfiguration.Name
}": ${(err as Error).message}`,
)
throw err
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ describe('SNS PermissionsConsumer', () => {
)
})

it('throws an error when invalid queue locator is passed', async () => {
// FixMe https://github.com/localstack/localstack/issues/9306
it.skip('throws an error when invalid queue locator is passed', async () => {
await assertQueue(sqsClient, {
QueueName: 'existingQueue',
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ describe('SNS PermissionsConsumerMultiSchema', () => {
snsClient = diContainer.cradle.snsClient
})

it('throws an error when invalid queue locator is passed', async () => {
// FixMe https://github.com/localstack/localstack/issues/9306
it.skip('throws an error when invalid queue locator is passed', async () => {
await assertQueue(sqsClient, {
QueueName: 'existingQueue',
})
Expand Down
26 changes: 26 additions & 0 deletions packages/sns/test/fakes/FakeLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { Logger } from '@message-queue-toolkit/core'

export class FakeLogger implements Logger {
public readonly loggedMessages: unknown[] = []
public readonly loggedWarnings: unknown[] = []
public readonly loggedErrors: unknown[] = []

debug(obj: unknown) {
this.loggedMessages.push(obj)
}
error(obj: unknown) {
this.loggedErrors.push(obj)
}
fatal(obj: unknown) {
this.loggedErrors.push(obj)
}
info(obj: unknown) {
this.loggedMessages.push(obj)
}
trace(obj: unknown) {
this.loggedMessages.push(obj)
}
warn(obj: unknown) {
this.loggedWarnings.push(obj)
}
}
3 changes: 2 additions & 1 deletion packages/sns/test/publishers/SnsPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ describe('SNSPermissionPublisher', () => {
)
})

it('throws an error when invalid queue locator is passed', async () => {
// FixMe https://github.com/localstack/localstack/issues/9306
it.skip('throws an error when invalid queue locator is passed', async () => {
const newPublisher = new SnsPermissionPublisherMonoSchema(diContainer.cradle, {
locatorConfig: {
topicArn: 'dummy',
Expand Down
86 changes: 86 additions & 0 deletions packages/sns/test/utils/snsSubscriber.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import type { SNSClient } from '@aws-sdk/client-sns'
import type { SQSClient } from '@aws-sdk/client-sqs'
import { deleteQueue } from '@message-queue-toolkit/sqs'
import type { AwilixContainer } from 'awilix'
import { afterEach, describe, expect } from 'vitest'

import { subscribeToTopic } from '../../lib/utils/snsSubscriber'
import { deleteTopic } from '../../lib/utils/snsUtils'
import { FakeLogger } from '../fakes/FakeLogger'

import type { Dependencies } from './testContext'
import { registerDependencies } from './testContext'

const TOPIC_NAME = 'topic'
const QUEUE_NAME = 'queue'

describe('snsSubscriber', () => {
let diContainer: AwilixContainer<Dependencies>
let snsClient: SNSClient
let sqsClient: SQSClient
beforeEach(async () => {
diContainer = await registerDependencies({}, false)
snsClient = diContainer.cradle.snsClient
sqsClient = diContainer.cradle.sqsClient
})

afterEach(async () => {
const { awilixManager } = diContainer.cradle
await awilixManager.executeDispose()
await diContainer.dispose()

await deleteTopic(snsClient, TOPIC_NAME)
await deleteQueue(sqsClient, QUEUE_NAME)
})

describe('subscribeToTopic', () => {
it('logs queue in subscription error', async () => {
const logger = new FakeLogger()
await subscribeToTopic(
sqsClient,
snsClient,
{
QueueName: QUEUE_NAME,
},
{
Name: TOPIC_NAME,
},
{
Attributes: {
FilterPolicy: `{"type":["remove"]}`,
FilterPolicyScope: 'MessageAttributes',
},
},
)

await expect(
subscribeToTopic(
sqsClient,
snsClient,
{
QueueName: QUEUE_NAME,
},
{
Name: TOPIC_NAME,
},
{
Attributes: {
FilterPolicy: `{"type":["add"]}`,
FilterPolicyScope: 'MessageBody',
},
},
{
logger,
},
),
).rejects.toThrow(
/Invalid parameter: Attributes Reason: Subscription already exists with different attributes/,
)

expect(logger.loggedErrors).toHaveLength(1)
expect(logger.loggedErrors[0]).toBe(
'Error while creating subscription for queue "queue", topic "topic": Invalid parameter: Attributes Reason: Subscription already exists with different attributes',
)
})
})
})
4 changes: 2 additions & 2 deletions packages/sqs/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
localstack:
image: localstack/localstack:2.1.0
image: localstack/localstack:2.3.2
network_mode: bridge
hostname: localstack
ports:
Expand All @@ -10,9 +10,9 @@ services:
- SERVICES=sns
- DEBUG=0
- DATA_DIR=${DATA_DIR-}
- LAMBDA_EXECUTOR=local
- DOCKER_HOST=unix:///var/run/docker.sock
- HOSTNAME_EXTERNAL=localstack
- LOCALSTACK_HOST=localstack
# - LOCALSTACK_API_KEY=someDummyKey
volumes:
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
Expand Down

0 comments on commit dcaaf2e

Please sign in to comment.