diff --git a/packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts b/packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts index dd908479..33fcf214 100644 --- a/packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts +++ b/packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts @@ -45,11 +45,12 @@ export abstract class AbstractAmqpConsumerMultiSchema< public override async processMessage( message: MessagePayloadType, messageType: string, + barrierOutput: unknown, ): Promise> { const handler = this.handlerContainer.resolveHandler(messageType) // @ts-ignore - return handler.handler(message, this) + return handler.handler(message, this, barrierOutput) } protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts b/packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts index 7bee589e..efe5359c 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts @@ -48,7 +48,10 @@ export class AmqpPermissionConsumerMultiSchema extends AbstractAmqpConsumerMulti >() .addConfig( PERMISSIONS_ADD_MESSAGE_SCHEMA, - async (_message, _context) => { + async (_message, _context, barrierOutput) => { + if (options?.addPreHandlerBarrier && !barrierOutput) { + return { error: 'retryLater' } + } this.addCounter++ return { result: 'success', diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumerMultiSchema.ts b/packages/sqs/lib/sqs/AbstractSqsConsumerMultiSchema.ts index c8cadf93..ed9598e7 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumerMultiSchema.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumerMultiSchema.ts @@ -72,10 +72,11 @@ export abstract class AbstractSqsConsumerMultiSchema< public override async processMessage( message: MessagePayloadType, messageType: string, + barrierOutput: unknown, ): Promise> { const handler = this.handlerContainer.resolveHandler(messageType) // @ts-ignore - return handler.handler(message, this) + return handler.handler(message, this, barrierOutput) } protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { diff --git a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts index 100e20e2..c8f5c1c4 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumerMultiSchema.ts @@ -70,7 +70,10 @@ export class SqsPermissionConsumerMultiSchema extends AbstractSqsConsumerMultiSc >() .addConfig( PERMISSIONS_ADD_MESSAGE_SCHEMA, - async (_message, _context, _barrierOutput) => { + async (_message, _context, barrierOutput) => { + if (options.addPreHandlerBarrier && !barrierOutput) { + return { error: 'retryLater' } + } this.addCounter++ return { result: 'success',