Skip to content

Commit

Permalink
Wrong key was used for stopping transactions (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored May 6, 2024
1 parent 101810d commit 4efd1e6
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 64 deletions.
10 changes: 4 additions & 6 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,9 @@ export abstract class AbstractAmqpConsumer<
deserializedMessage.result[this.messageTypeField]
}`

this.transactionObservabilityManager?.start(
transactionSpanId,
// @ts-ignore
deserializedMessage.result[this.messageIdField],
)
// @ts-ignore
const uniqueTransactionKey = deserializedMessage.result[this.messageIdField]
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType)
this.logMessage(resolvedLogMessage)
Expand All @@ -164,7 +162,7 @@ export abstract class AbstractAmqpConsumer<
this.handleError(err)
})
.finally(() => {
this.transactionObservabilityManager?.stop(transactionSpanId)
this.transactionObservabilityManager?.stop(uniqueTransactionKey)
})
})
}
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.16.1",
"@lokalise/node-core": "^9.17.0",
"zod": "^3.23.6"
},
"peerDependencies": {
Expand Down
6 changes: 2 additions & 4 deletions packages/core/lib/types/MessageQueueTypes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { TransactionObservabilityManager } from '@lokalise/node-core'
import type { ZodSchema } from 'zod'

export interface QueueConsumer {
Expand All @@ -20,10 +21,7 @@ export interface AsyncPublisher<MessagePayloadType, MessageOptions> {
publish(message: MessagePayloadType, options: MessageOptions): Promise<unknown>
}

export type TransactionObservabilityManager = {
start: (transactionSpanId: string, uniqueTransactionKey: string) => unknown
stop: (transactionSpanId: string) => unknown
}
export { TransactionObservabilityManager }

export type LogFn = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.16.1",
"@lokalise/node-core": "^9.17.0",
"fast-equals": "^5.0.1",
"toad-cache": "^3.7.0",
"zod": "^3.23.6"
Expand Down
2 changes: 1 addition & 1 deletion packages/sns/lib/sns/fakes/FakeConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { BaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { SNSSQSConsumerDependencies } from '../AbstractSnsSqsConsumer';
import type { SNSSQSConsumerDependencies } from '../AbstractSnsSqsConsumer'
import { AbstractSnsSqsConsumer } from '../AbstractSnsSqsConsumer'

export class FakeConsumer<T extends BaseMessageType> extends AbstractSnsSqsConsumer<T, unknown> {
Expand Down
2 changes: 1 addition & 1 deletion packages/sns/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.16.1",
"@lokalise/node-core": "^9.17.0",
"sqs-consumer": "^10.2.0",
"zod": "^3.23.6"
},
Expand Down
90 changes: 47 additions & 43 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,52 +465,56 @@ describe('SnsSqsPermissionConsumer', () => {
await diContainer.dispose()
})

it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
topic: { Name: topicName },
queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } },
},
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(3100) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()
it.each([false, true])(
'using 2 consumers with heartbeat -> %s',
async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
topic: { Name: topicName },
queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } },
},
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(3100) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()

const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: consumer1.subscriptionProps.queueUrl,
topicArn: consumer1.subscriptionProps.topicArn,
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
},
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
})
const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: consumer1.subscriptionProps.queueUrl,
topicArn: consumer1.subscriptionProps.topicArn,
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
},
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
})

await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()
await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()

// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)
// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)

expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
}, 10000)
expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
},
10000,
)
})
})
10 changes: 4 additions & 6 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,9 @@ export abstract class AbstractSqsConsumer<
const messageType = deserializedMessage.result[this.messageTypeField]
const transactionSpanId = `queue_${this.queueName}:${messageType}`

this.transactionObservabilityManager?.start(
transactionSpanId,
// @ts-ignore
deserializedMessage.result[this.messageIdField],
)
// @ts-ignore
const uniqueTransactionKey = deserializedMessage.result[this.messageIdField]
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType)
this.logMessage(resolvedLogMessage)
Expand All @@ -223,7 +221,7 @@ export abstract class AbstractSqsConsumer<
return { error: err }
})
.finally(() => {
this.transactionObservabilityManager?.stop(transactionSpanId)
this.transactionObservabilityManager?.stop(uniqueTransactionKey)
})

// success
Expand Down
2 changes: 1 addition & 1 deletion packages/sqs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.16.1",
"@lokalise/node-core": "^9.17.0",
"sqs-consumer": "^10.2.0",
"zod": "^3.23.6"
},
Expand Down

0 comments on commit 4efd1e6

Please sign in to comment.