From 16a6bd32c9adfacd5a278399cbda953f9e05a9dd Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Mon, 6 May 2024 10:44:10 +0300 Subject: [PATCH] Wrong key was used for stopping transactions --- packages/amqp/lib/AbstractAmqpConsumer.ts | 10 +-- packages/amqp/package.json | 2 +- packages/core/lib/types/MessageQueueTypes.ts | 6 +- packages/core/package.json | 2 +- packages/sns/lib/sns/fakes/FakeConsumer.ts | 2 +- packages/sns/package.json | 2 +- .../SnsSqsPermissionConsumer.spec.ts | 90 ++++++++++--------- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 10 +-- packages/sqs/package.json | 2 +- 9 files changed, 62 insertions(+), 64 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 8075b919..2384b232 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -136,11 +136,9 @@ export abstract class AbstractAmqpConsumer< deserializedMessage.result[this.messageTypeField] }` - this.transactionObservabilityManager?.start( - transactionSpanId, - // @ts-ignore - deserializedMessage.result[this.messageIdField], - ) + // @ts-ignore + const uniqueTransactionKey = deserializedMessage.result[this.messageIdField] + this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) if (this.logMessages) { const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType) this.logMessage(resolvedLogMessage) @@ -164,7 +162,7 @@ export abstract class AbstractAmqpConsumer< this.handleError(err) }) .finally(() => { - this.transactionObservabilityManager?.stop(transactionSpanId) + this.transactionObservabilityManager?.stop(uniqueTransactionKey) }) }) } diff --git a/packages/amqp/package.json b/packages/amqp/package.json index c13618ec..7223c09a 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.16.1", + "@lokalise/node-core": "^9.17.0", "zod": "^3.23.6" }, "peerDependencies": { diff --git a/packages/core/lib/types/MessageQueueTypes.ts b/packages/core/lib/types/MessageQueueTypes.ts index 5787f3d7..c7ea6732 100644 --- a/packages/core/lib/types/MessageQueueTypes.ts +++ b/packages/core/lib/types/MessageQueueTypes.ts @@ -1,3 +1,4 @@ +import type { TransactionObservabilityManager } from '@lokalise/node-core' import type { ZodSchema } from 'zod' export interface QueueConsumer { @@ -20,10 +21,7 @@ export interface AsyncPublisher { publish(message: MessagePayloadType, options: MessageOptions): Promise } -export type TransactionObservabilityManager = { - start: (transactionSpanId: string, uniqueTransactionKey: string) => unknown - stop: (transactionSpanId: string) => unknown -} +export { TransactionObservabilityManager } export type LogFn = { // eslint-disable-next-line @typescript-eslint/no-explicit-any diff --git a/packages/core/package.json b/packages/core/package.json index e046b1ac..5adfe79b 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.16.1", + "@lokalise/node-core": "^9.17.0", "fast-equals": "^5.0.1", "toad-cache": "^3.7.0", "zod": "^3.23.6" diff --git a/packages/sns/lib/sns/fakes/FakeConsumer.ts b/packages/sns/lib/sns/fakes/FakeConsumer.ts index 175a26ab..567dde71 100644 --- a/packages/sns/lib/sns/fakes/FakeConsumer.ts +++ b/packages/sns/lib/sns/fakes/FakeConsumer.ts @@ -2,7 +2,7 @@ import type { BaseMessageType } from '@message-queue-toolkit/core' import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' import type { ZodSchema } from 'zod' -import type { SNSSQSConsumerDependencies } from '../AbstractSnsSqsConsumer'; +import type { SNSSQSConsumerDependencies } from '../AbstractSnsSqsConsumer' import { AbstractSnsSqsConsumer } from '../AbstractSnsSqsConsumer' export class FakeConsumer extends AbstractSnsSqsConsumer { diff --git a/packages/sns/package.json b/packages/sns/package.json index 68a330c1..8aa6f680 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.16.1", + "@lokalise/node-core": "^9.17.0", "sqs-consumer": "^10.2.0", "zod": "^3.23.6" }, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 1462b6c9..441c9d94 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -465,52 +465,56 @@ describe('SnsSqsPermissionConsumer', () => { await diContainer.dispose() }) - it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => { - let consumer1IsProcessing = false - let consumer1Counter = 0 - let consumer2Counter = 0 - - const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, { - creationConfig: { - topic: { Name: topicName }, - queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } }, - }, - consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, - removeHandlerOverride: async () => { - consumer1IsProcessing = true - await setTimeout(3100) // Wait to the visibility timeout to expire - consumer1Counter++ - consumer1IsProcessing = false - return { result: 'success' } - }, - }) - await consumer1.start() + it.each([false, true])( + 'using 2 consumers with heartbeat -> %s', + async (heartbeatEnabled) => { + let consumer1IsProcessing = false + let consumer1Counter = 0 + let consumer2Counter = 0 + + const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicName }, + queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } }, + }, + consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, + removeHandlerOverride: async () => { + consumer1IsProcessing = true + await setTimeout(3100) // Wait to the visibility timeout to expire + consumer1Counter++ + consumer1IsProcessing = false + return { result: 'success' } + }, + }) + await consumer1.start() - const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, { - locatorConfig: { - queueUrl: consumer1.subscriptionProps.queueUrl, - topicArn: consumer1.subscriptionProps.topicArn, - subscriptionArn: consumer1.subscriptionProps.subscriptionArn, - }, - removeHandlerOverride: async () => { - consumer2Counter++ - return { result: 'success' } - }, - }) - const publisher = new SnsPermissionPublisher(diContainer.cradle, { - locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn }, - }) + const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { + queueUrl: consumer1.subscriptionProps.queueUrl, + topicArn: consumer1.subscriptionProps.topicArn, + subscriptionArn: consumer1.subscriptionProps.subscriptionArn, + }, + removeHandlerOverride: async () => { + consumer2Counter++ + return { result: 'success' } + }, + }) + const publisher = new SnsPermissionPublisher(diContainer.cradle, { + locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn }, + }) - await publisher.publish({ id: '10', messageType: 'remove' }) - // wait for consumer1 to start processing to start second consumer - await waitAndRetry(() => consumer1IsProcessing, 5, 5) - await consumer2.start() + await publisher.publish({ id: '10', messageType: 'remove' }) + // wait for consumer1 to start processing to start second consumer + await waitAndRetry(() => consumer1IsProcessing, 5, 5) + await consumer2.start() - // wait for both consumers to process message - await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) + // wait for both consumers to process message + await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) - expect(consumer1Counter).toBe(1) - expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) - }, 10000) + expect(consumer1Counter).toBe(1) + expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) + }, + 10000, + ) }) }) diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 052755c6..74773504 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -204,11 +204,9 @@ export abstract class AbstractSqsConsumer< const messageType = deserializedMessage.result[this.messageTypeField] const transactionSpanId = `queue_${this.queueName}:${messageType}` - this.transactionObservabilityManager?.start( - transactionSpanId, - // @ts-ignore - deserializedMessage.result[this.messageIdField], - ) + // @ts-ignore + const uniqueTransactionKey = deserializedMessage.result[this.messageIdField] + this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) if (this.logMessages) { const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType) this.logMessage(resolvedLogMessage) @@ -223,7 +221,7 @@ export abstract class AbstractSqsConsumer< return { error: err } }) .finally(() => { - this.transactionObservabilityManager?.stop(transactionSpanId) + this.transactionObservabilityManager?.stop(uniqueTransactionKey) }) // success diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 4ac86f3a..c221f215 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.16.1", + "@lokalise/node-core": "^9.17.0", "sqs-consumer": "^10.2.0", "zod": "^3.23.6" },