Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages barrier max time #141

Merged
merged 34 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
992f4bc
Different handling if barrier is not passing
CarlosGamero May 9, 2024
4886c0a
Adding message timestamp field
CarlosGamero May 9, 2024
f2d31a2
Publishers add a timestamp automatically
CarlosGamero May 9, 2024
f4cfef7
Fixing message deserializer to not remove properties
CarlosGamero May 9, 2024
aec91c2
Adding test
CarlosGamero May 9, 2024
8642419
Test fix
CarlosGamero May 9, 2024
56a48bf
Adding TODO and skipping test for now
CarlosGamero May 9, 2024
abd0a3e
parseMessage method adjusted
CarlosGamero May 9, 2024
a722bb2
adjusting all desereliazers
CarlosGamero May 9, 2024
9121581
Adding warning log
CarlosGamero May 9, 2024
3739077
SQS fixes for new parse method return type
CarlosGamero May 9, 2024
6577309
SNS fixes for new parse method
CarlosGamero May 9, 2024
89c714d
AMQP fixes
CarlosGamero May 10, 2024
8d6f786
Test fixes
CarlosGamero May 10, 2024
fb79e3e
SQS implementing last retry date
CarlosGamero May 10, 2024
cc64313
Minor error fixes + tests
CarlosGamero May 10, 2024
1d71b86
Extracting tryToExtractTimestamp to core
CarlosGamero May 10, 2024
446272e
SNS adjusted + tests
CarlosGamero May 10, 2024
e4c6325
Adding doc
CarlosGamero May 10, 2024
c08f81f
Lint fix
CarlosGamero May 10, 2024
1c0ee31
Minor improvement
CarlosGamero May 10, 2024
bae5f3e
maxRetryDuration implementation on AMQP
CarlosGamero May 10, 2024
c34865a
doc
CarlosGamero May 10, 2024
92c2120
Prepare minor release
CarlosGamero May 10, 2024
65102e5
Improving doc
CarlosGamero May 10, 2024
8bce202
Allowing timestamp as date object
CarlosGamero May 10, 2024
a11e22a
Extracting common part to dateUtils method
CarlosGamero May 10, 2024
422b2c5
Fix test
CarlosGamero May 10, 2024
af057fa
Improving coverage
CarlosGamero May 10, 2024
69b2234
Update README.md
CarlosGamero May 10, 2024
dd1444a
Adding ToDo
CarlosGamero May 10, 2024
830e38e
Merge branch 'feat/messages_barrier_max_time' of https://github.com/k…
CarlosGamero May 10, 2024
0eae787
Update README.md
CarlosGamero May 10, 2024
d731aef
Update README.md
CarlosGamero May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ They implement the following public methods:
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `messageTimestampField` - which field in the message contain the message creation date. This field needs to be ISO-8601 date string and by default it is `timestamp`, if your messages doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
Expand All @@ -54,6 +55,8 @@ Multi-schema consumers support multiple message types via handler configs. They
* `options`, composed by
* `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler;
* `messageTimestampField` - which field in the message contain the message creation date. This field needs to be ISO-8601 date string and by default it is `timestamp`;
* `maxRetryDuration` - how long (in seconds) the message should be retried due to the `retryLater` result before marking it as consumed (avoid infinite loops). Default is 4 days;
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
Expand Down
42 changes: 30 additions & 12 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Either, ErrorResolver } from '@lokalise/node-core'
import type {
BarrierResult,
DeadLetterQueueOptions,
ParseMessageResult,
Prehandler,
PreHandlingOutputs,
QueueConsumer,
Expand All @@ -25,6 +26,7 @@ import { AbstractAmqpService } from './AbstractAmqpService'
import { readAmqpMessage } from './amqpMessageReader'

const ABORT_EARLY_EITHER: Either<'abort', never> = { error: 'abort' }
const DEFAULT_MAX_RETRY_DURATION = 4 * 24 * 60 * 60

export type AMQPConsumerOptions<
MessagePayloadType extends object,
Expand Down Expand Up @@ -60,6 +62,7 @@ export abstract class AbstractAmqpConsumer<
AMQPLocator,
NonNullable<unknown>
>
private readonly maxRetryDuration: number

private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
private readonly handlerContainer: HandlerContainer<
Expand All @@ -78,6 +81,7 @@ export abstract class AbstractAmqpConsumer<
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
this.errorResolver = dependencies.consumerErrorResolver
this.deadLetterQueueOptions = options.deadLetterQueue
this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION

const messageSchemas = options.handlers.map((entry) => entry.schema)
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
Expand Down Expand Up @@ -128,37 +132,48 @@ export abstract class AbstractAmqpConsumer<
this.handleMessageProcessed(null, 'invalid_message', messageId.result)
return
}
const { originalMessage, parsedMessage } = deserializedMessage.result

// @ts-ignore
const messageType = deserializedMessage.result[this.messageTypeField]
const messageType = parsedMessage[this.messageTypeField]
const transactionSpanId = `queue_${this.queueName}:${
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
deserializedMessage.result[this.messageTypeField]
parsedMessage[this.messageTypeField]
}`

// @ts-ignore
const uniqueTransactionKey = deserializedMessage.result[this.messageIdField]
const uniqueTransactionKey = parsedMessage[this.messageIdField]
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType)
const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType)
this.logMessage(resolvedLogMessage)
}
this.internalProcessMessage(deserializedMessage.result, messageType)
this.internalProcessMessage(parsedMessage, messageType)
.then((result) => {
if (result.error === 'retryLater') {
this.channel.nack(message, false, true)
this.handleMessageProcessed(deserializedMessage.result, 'retryLater')
}
if (result.result === 'success') {
this.channel.ack(message)
this.handleMessageProcessed(deserializedMessage.result, 'consumed')
this.handleMessageProcessed(originalMessage, 'consumed')
return
}

// retryLater
const timestamp = this.tryToExtractTimestamp(originalMessage) ?? new Date()
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
const lastRetryDate = new Date(timestamp.getTime() + this.maxRetryDuration * 1000)
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
if (lastRetryDate > new Date()) {
this.channel.nack(message, false, true)
this.handleMessageProcessed(originalMessage, 'retryLater')
} else {
this.channel.ack(message)
this.handleMessageProcessed(originalMessage, 'error')
CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
}
})
.catch((err) => {
// ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter
// If we fail due to unknown reason, let's retry
this.channel.nack(message, false, true)
this.handleMessageProcessed(deserializedMessage.result, 'retryLater')
this.handleMessageProcessed(originalMessage, 'retryLater')
this.handleError(err)
})
.finally(() => {
Expand All @@ -180,6 +195,7 @@ export abstract class AbstractAmqpConsumer<
barrierOutput: barrierResult.output,
})
}

return { error: 'retryLater' }
}

Expand Down Expand Up @@ -245,7 +261,9 @@ export abstract class AbstractAmqpConsumer<
)
}

private deserializeMessage(message: Message): Either<'abort', MessagePayloadType> {
private deserializeMessage(
message: Message,
): Either<'abort', ParseMessageResult<MessagePayloadType>> {
const resolveMessageResult = this.resolveMessage(message)
if (isMessageError(resolveMessageResult.error)) {
this.handleError(resolveMessageResult.error)
Expand Down
10 changes: 10 additions & 0 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}
resolveSchemaResult.result.parse(message)

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
message[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
Expand Down
13 changes: 11 additions & 2 deletions packages/amqp/lib/amqpMessageDeserializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import { AmqpConsumerErrorResolver } from './errors/AmqpConsumerErrorResolver'

describe('messageDeserializer', () => {
it('deserializes valid JSON', () => {
const messagePayload: PERMISSIONS_MESSAGE_TYPE = {
const messagePayload = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
nonSchemaField: 'nonSchemaField',
}
const message: Message = {
content: Buffer.from(JSON.stringify(messagePayload)),
Expand All @@ -26,7 +27,15 @@ describe('messageDeserializer', () => {
errorProcessor,
)

expect(deserializedPayload.result).toMatchObject(messagePayload)
expect(deserializedPayload.result).toMatchObject({
originalMessage: messagePayload,
parsedMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
},
})
})

it('throws an error on invalid JSON', () => {
Expand Down
13 changes: 8 additions & 5 deletions packages/amqp/lib/amqpMessageDeserializer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import type { Either } from '@lokalise/node-core'
import type { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core'
import type {
MessageInvalidFormatError,
MessageValidationError,
ParseMessageResult,
} from '@message-queue-toolkit/core'
import { parseMessage } from '@message-queue-toolkit/core'
import type { Message } from 'amqplib'
import type { ZodType } from 'zod'

Expand All @@ -9,11 +14,9 @@ export const deserializeAmqpMessage = <T extends object>(
message: Message,
type: ZodType<T>,
errorProcessor: AmqpConsumerErrorResolver,
): Either<MessageInvalidFormatError | MessageValidationError, T> => {
): Either<MessageInvalidFormatError | MessageValidationError, ParseMessageResult<T>> => {
try {
return {
result: type.parse(JSON.parse(message.content.toString())),
}
return parseMessage(JSON.parse(message.content.toString()), type, errorProcessor)
} catch (exception) {
return {
error: errorProcessor.processError(exception),
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/amqp",
"version": "14.0.0",
"version": "14.1.0",
"private": false,
"license": "MIT",
"description": "AMQP adapter for message-queue-toolkit",
Expand Down Expand Up @@ -29,7 +29,7 @@
"zod": "^3.23.6"
},
"peerDependencies": {
"@message-queue-toolkit/core": "^12.0.0",
"@message-queue-toolkit/core": "^12.1.0",
"amqplib": "^0.10.3"
},
"devDependencies": {
Expand Down
102 changes: 84 additions & 18 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { Dependencies } from '../utils/testContext'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'

import { AmqpPermissionConsumer } from './AmqpPermissionConsumer'
import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas'

describe('AmqpPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -86,24 +87,24 @@ describe('AmqpPermissionConsumer', () => {

await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

expect(logger.loggedMessages.length).toBe(4)
expect(logger.loggedMessages).toMatchInlineSnapshot(`
[
"Propagating new connection across 0 receivers",
{
"id": "1",
"messageType": "add",
},
{
"id": "1",
"messageType": "add",
},
{
"messageId": "1",
"processingResult": "consumed",
},
]
`)
expect(logger.loggedMessages.length).toBe(5)
expect(logger.loggedMessages).toEqual([
'Propagating new connection across 0 receivers',
'timestamp not defined, adding it automatically',
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
},
{
id: '1',
messageType: 'add',
},
{
messageId: '1',
processingResult: 'consumed',
},
])
})
})

Expand Down Expand Up @@ -380,4 +381,69 @@ describe('AmqpPermissionConsumer', () => {
await consumer.close()
})
})

describe('messages stuck on retryLater', () => {
let diContainer: AwilixContainer<Dependencies>
let publisher: AmqpPermissionPublisher
beforeEach(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG, undefined, false)
publisher = diContainer.cradle.permissionPublisher
await publisher.init()
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('stuck on barrier', async () => {
let counter = 0
const consumer = new AmqpPermissionConsumer(diContainer.cradle, {
addPreHandlerBarrier: async () => {
counter++
return { isPassing: false }
},
maxRetryDuration: 3,
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)

const jobSpy = await consumer.handlerSpy.waitForMessageWithId('1', 'error')
expect(jobSpy.message).toEqual(message)
expect(counter).toBeGreaterThan(2)
})

it('stuck on handler', async () => {
let counter = 0
const consumer = new AmqpPermissionConsumer(diContainer.cradle, {
removeHandlerOverride: async () => {
counter++
return { error: 'retryLater' }
},
maxRetryDuration: 3,
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
id: '1',
messageType: 'remove',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)

const jobSpy = await consumer.handlerSpy.waitForMessageWithId('1', 'error')
expect(jobSpy.message).toEqual(message)
expect(counter).toBeGreaterThan(2)
})
})
})
7 changes: 3 additions & 4 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type PrehandlerOutput = {

type AmqpPermissionConsumerOptions = Pick<
AMQPConsumerOptions<SupportedEvents, ExecutionContext, PrehandlerOutput>,
'creationConfig' | 'locatorConfig' | 'logMessages' | 'deadLetterQueue'
'creationConfig' | 'locatorConfig' | 'logMessages' | 'deadLetterQueue' | 'maxRetryDuration'
> & {
addPreHandlerBarrier?: (message: SupportedEvents) => Promise<BarrierResult<number>>
removeHandlerOverride?: (
Expand Down Expand Up @@ -76,9 +76,8 @@ export class AmqpPermissionConsumer extends AbstractAmqpConsumer<
logMessages: options?.logMessages,
handlerSpy: true,
messageTypeField: 'messageType',
deletionConfig: {
deleteIfExists: true,
},
deletionConfig: { deleteIfExists: true },
maxRetryDuration: options?.maxRetryDuration,
handlers: new MessageHandlerConfigBuilder<
SupportedEvents,
ExecutionContext,
Expand Down
1 change: 1 addition & 0 deletions packages/amqp/test/consumers/userConsumerSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({
messageType: z.enum(['add', 'remove']),
userIds: z.array(z.number()).describe('User IDs'),
permissions: z.array(z.string()).nonempty().describe('List of user permissions'),
timestamp: z.string().optional(),
})

export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({
Expand Down
Loading
Loading