Skip to content

Commit

Permalink
Explicitly set multi-schema consumer execution context (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored Oct 19, 2023
1 parent a0843e3 commit a7a0289
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 147 deletions.
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,26 +116,33 @@ Here is an example:

```ts
type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
type ExecutionContext = {
userService: UserService
}

export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
SupportedMessages,
TestConsumerMultiSchema
ExecutionContext
> {
constructor(
dependencies: SQSConsumerDependencies,
userService: UserService,
) {
super(dependencies, {
//
// rest of configuration skipped
//
handlers: new MessageHandlerConfigBuilder<
SupportedMessages,
SqsPermissionConsumerMultiSchema
ExecutionContext
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (message, context) => {
// process message
const users = await context.userService.getUsers()
//
// execute some domain logic here
//
return {
result: 'success',
}
Expand All @@ -149,12 +156,17 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA,
async (message, context) => {
// process message
const users = await context.userService.getUsers()
//
// execute some domain logic here
//
return {
result: 'success',
}
})
.build(),
}, {
userService,
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ export abstract class AbstractAmqpConsumerMultiSchema<
{
messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
handlerContainer: HandlerContainer<MessagePayloadType, ExecutionContext>
protected readonly executionContext: ExecutionContext

constructor(
dependencies: AMQPConsumerDependencies,
options: NewAMQPConsumerOptions &
MultiSchemaConsumerOptions<MessagePayloadType, ExecutionContext>,
executionContext: ExecutionContext,
) {
super(dependencies, options)
const messageSchemas = options.handlers.map((entry) => entry.schema)
Expand All @@ -36,6 +38,7 @@ export abstract class AbstractAmqpConsumerMultiSchema<
messageTypeField: this.messageTypeField,
messageHandlers: options.handlers,
})
this.executionContext = executionContext
}

protected override resolveSchema(message: MessagePayloadType) {
Expand All @@ -48,9 +51,7 @@ export abstract class AbstractAmqpConsumerMultiSchema<
barrierOutput: unknown,
): Promise<Either<'retryLater', 'success'>> {
const handler = this.handlerContainer.resolveHandler(messageType)

// @ts-ignore
return handler.handler(message, this, barrierOutput)
return handler.handler(message, this.executionContext, barrierOutput)
}

protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
Expand All @@ -64,8 +65,7 @@ export abstract class AbstractAmqpConsumerMultiSchema<
): Promise<BarrierResult<unknown>> {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.preHandlerBarrier
? // @ts-ignore
await handler.preHandlerBarrier(message, this)
? await handler.preHandlerBarrier(message, this.executionContext)
: {
isPassing: true,
output: undefined,
Expand Down
80 changes: 43 additions & 37 deletions packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import {
} from './userConsumerSchemas'

type SupportedEvents = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
type ExecutionContext = {
incrementAmount: number
}

export class AmqpPermissionConsumerMultiSchema extends AbstractAmqpConsumerMultiSchema<
SupportedEvents,
AmqpPermissionConsumerMultiSchema
ExecutionContext
> {
public static QUEUE_NAME = 'user_permissions_multi'

Expand All @@ -31,45 +34,48 @@ export class AmqpPermissionConsumerMultiSchema extends AbstractAmqpConsumerMulti
addPreHandlerBarrier?: (message: SupportedEvents) => Promise<BarrierResult<number>>
},
) {
super(dependencies, {
creationConfig: {
queueName: AmqpPermissionConsumerMultiSchema.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
super(
dependencies,
{
creationConfig: {
queueName: AmqpPermissionConsumerMultiSchema.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
},
deletionConfig: {
deleteIfExists: true,
},
handlers: new MessageHandlerConfigBuilder<
SupportedEvents,
AmqpPermissionConsumerMultiSchema
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, _context, barrierOutput) => {
if (options?.addPreHandlerBarrier && !barrierOutput) {
return { error: 'retryLater' }
}
this.addCounter++
deletionConfig: {
deleteIfExists: true,
},
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, context, barrierOutput) => {
if (options?.addPreHandlerBarrier && !barrierOutput) {
return { error: 'retryLater' }
}
this.addCounter += context.incrementAmount
return {
result: 'success',
}
},
{
preHandlerBarrier: options?.addPreHandlerBarrier,
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, context) => {
this.removeCounter += context.incrementAmount
return {
result: 'success',
}
},
{
preHandlerBarrier: options?.addPreHandlerBarrier,
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, _context) => {
this.removeCounter++
return {
result: 'success',
}
})
.build(),
messageTypeField: 'messageType',
...options,
})
})
.build(),
messageTypeField: 'messageType',
...options,
},
{
incrementAmount: 1,
},
)
}
}
35 changes: 20 additions & 15 deletions packages/amqp/test/fakes/FakeConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,34 @@ import { AbstractAmqpConsumerMultiSchema } from '../../lib/AbstractAmqpConsumerM
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'
import type { CommonMessage } from '../../lib/types/MessageTypes'

export class FakeConsumerMultiSchema extends AbstractAmqpConsumerMultiSchema<
export class FakeConsumerMultiSchema<ExecutionContext> extends AbstractAmqpConsumerMultiSchema<
CommonMessage,
FakeConsumerMultiSchema
ExecutionContext
> {
constructor(
dependencies: AMQPConsumerDependencies,
queueName = 'dummy',
handlers: MessageHandlerConfig<CommonMessage, FakeConsumerMultiSchema>[],
handlers: MessageHandlerConfig<CommonMessage, ExecutionContext>[],
executionContext: ExecutionContext,
) {
super(dependencies, {
creationConfig: {
queueName: queueName,
queueOptions: {
durable: true,
autoDelete: false,
super(
dependencies,
{
creationConfig: {
queueName: queueName,
queueOptions: {
durable: true,
autoDelete: false,
},
},
deletionConfig: {
deleteIfExists: true,
},
handlers,
messageTypeField: 'messageType',
},
deletionConfig: {
deleteIfExists: true,
},
handlers,
messageTypeField: 'messageType',
})
executionContext,
)
}

processMessage(): Promise<Either<'retryLater', 'success'>> {
Expand Down
11 changes: 8 additions & 3 deletions packages/sns/lib/sns/AbstractSnsSqsConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ export abstract class AbstractSnsSqsConsumerMultiSchema<
options:
| NewSnsSqsConsumerOptionsMulti<MessagePayloadSchemas, ExecutionContext>
| ExistingSnsSqsConsumerOptionsMulti<MessagePayloadSchemas, ExecutionContext>,
executionContext: ExecutionContext,
) {
super(dependencies, {
...options,
})
super(
dependencies,
{
...options,
},
executionContext,
)

this.subscriptionConfig = options.subscriptionConfig
this.snsClient = dependencies.snsClient
Expand Down
94 changes: 50 additions & 44 deletions packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import {
} from './userConsumerSchemas'

type SupportedEvents = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
type ExecutionContext = {
incrementAmount: number
}

export class SnsSqsPermissionConsumerMultiSchema extends AbstractSnsSqsConsumerMultiSchema<
SupportedEvents,
SnsSqsPermissionConsumerMultiSchema
ExecutionContext
> {
public static CONSUMED_QUEUE_NAME = 'user_permissions_multi'
public static SUBSCRIBED_TOPIC_NAME = 'user_permissions_multi'
Expand All @@ -44,53 +47,56 @@ export class SnsSqsPermissionConsumerMultiSchema extends AbstractSnsSqsConsumerM
},
},
) {
super(dependencies, {
handlers: new MessageHandlerConfigBuilder<
SupportedEvents,
SnsSqsPermissionConsumerMultiSchema
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, _context, _barrierOutput: number) => {
this.addCounter++
return {
result: 'success',
}
},
{
preHandlerBarrier: async (_message, _context) => {
this.addBarrierCounter++
if (this.addBarrierCounter < 3) {
return {
isPassing: false,
}
}

super(
dependencies,
{
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, context, _barrierOutput: number) => {
this.addCounter += context.incrementAmount
return {
isPassing: true,
output: this.addBarrierCounter,
result: 'success',
}
},
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, _context) => {
this.removeCounter++
return {
result: 'success',
}
})
.build(),
messageTypeField: 'messageType',
deletionConfig: {
deleteIfExists: true,
},
consumerOverrides: {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
{
preHandlerBarrier: async (_message, context) => {
this.addBarrierCounter += context.incrementAmount
if (this.addBarrierCounter < 3) {
return {
isPassing: false,
}
}

return {
isPassing: true,
output: this.addBarrierCounter,
}
},
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, context) => {
this.removeCounter += context.incrementAmount
return {
result: 'success',
}
})
.build(),
messageTypeField: 'messageType',
deletionConfig: {
deleteIfExists: true,
},
consumerOverrides: {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
subscriptionConfig: {
updateAttributesIfExists: false,
},
...options,
},
subscriptionConfig: {
updateAttributesIfExists: false,
{
incrementAmount: 1,
},
...options,
})
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ describe('SNS PermissionsConsumerMultiSchema', () => {
() => {
return consumer.addCounter === 1 && consumer.removeCounter === 2
},
// Removing this makes test flaky
30,
20,
)
Expand Down
Loading

0 comments on commit a7a0289

Please sign in to comment.