Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AMQP publisherManager #144

Merged
merged 8 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ export type { AMQPQueueConfig } from './lib/AbstractAmqpService'
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export { AbstractAmqpPublisher, AMQPPublisherOptions } from './lib/AbstractAmqpPublisher'

export type { AmqpConfig } from './lib/amqpConnectionResolver'

export { resolveAmqpConnection } from './lib/amqpConnectionResolver'
export { AmqpConnectionManager } from './lib/AmqpConnectionManager'
export type { ConnectionReceiver } from './lib/AmqpConnectionManager'
export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer'

export * from './lib/AbstractAmqpQueuePublisher'
export * from './lib/AbstractAmqpExchangePublisher'
export * from './lib/AmqpExchangePublisherManager'
export * from './lib/AmqpQueuePublisherManager'
50 changes: 50 additions & 0 deletions packages/amqp/lib/AbstractAmqpExchangePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import type * as Buffer from 'node:buffer'

import { objectToBuffer } from '@message-queue-toolkit/core'
import type { Options } from 'amqplib/properties'

import type { AMQPPublisherOptions } from './AbstractAmqpPublisher'
import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'
import type { AMQPDependencies } from './AbstractAmqpService'

export type AMQPExchangePublisherOptions<MessagePayloadType extends object> = Omit<
AMQPPublisherOptions<MessagePayloadType>,
'creationConfig'
> & {
exchange: string
}

export type AmqpExchangeMessageOptions = {
routingKey: string
publishOptions: Options.Publish
}

export abstract class AbstractAmqpExchangePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpPublisher<MessagePayloadType, AmqpExchangeMessageOptions> {
constructor(
dependencies: AMQPDependencies,
options: AMQPExchangePublisherOptions<MessagePayloadType>,
) {
super(dependencies, {
...options,
// FixMe exchange publisher doesn't need queue at all
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be addressed during auto-init rework, as we need to implement exchange and subscription support

creationConfig: {
queueName: 'dummy',
queueOptions: {},
updateAttributesIfExists: false,
},
exchange: options.exchange,
locatorConfig: undefined,
})
}

protected publishInternal(message: Buffer, options: AmqpExchangeMessageOptions): void {
this.channel.publish(
this.exchange!,
options.routingKey,
objectToBuffer(message),
options.publishOptions,
)
}
}
18 changes: 12 additions & 6 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ export type AMQPPublisherOptions<MessagePayloadType extends object> = QueuePubli
AMQPCreationConfig,
AMQPLocator,
MessagePayloadType
>
> & {
exchange?: string
}

export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
export abstract class AbstractAmqpPublisher<MessagePayloadType extends object, MessageOptionsType>
extends AbstractAmqpService<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType, MessageOptionsType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
protected readonly exchange?: string

private initPromise?: Promise<void>

Expand All @@ -35,9 +38,10 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.exchange = options.exchange
}

publish(message: MessagePayloadType): void {
publish(message: MessagePayloadType, options: MessageOptionsType): void {
const resolveSchemaResult = this.resolveSchema(message)
if (resolveSchemaResult.error) {
throw resolveSchemaResult.error
Expand All @@ -57,7 +61,7 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
*/
this.initPromise
.then(() => {
this.publish(message)
this.publish(message, options)
})
.catch((err) => {
this.handleError(err)
Expand All @@ -82,7 +86,7 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}

try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
this.publishInternal(objectToBuffer(message), options)
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
Expand All @@ -107,6 +111,8 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}
}

protected abstract publishInternal(message: Buffer, options: MessageOptionsType): void

protected override resolveSchema(
message: MessagePayloadType,
): Either<Error, ZodSchema<MessagePayloadType>> {
Expand Down
23 changes: 23 additions & 0 deletions packages/amqp/lib/AbstractAmqpQueuePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { Options } from 'amqplib/properties'

import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'

export type AmqpQueueMessageOptions = {
publishOptions: Options.Publish
}

const NO_PARAMS: AmqpQueueMessageOptions = {
publishOptions: {},
}

export abstract class AbstractAmqpQueuePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpPublisher<MessagePayloadType, AmqpQueueMessageOptions> {
protected publishInternal(message: Buffer, options: AmqpQueueMessageOptions): void {
this.channel.sendToQueue(this.queueName, message, options.publishOptions)
}

publish(message: MessagePayloadType, options: AmqpQueueMessageOptions = NO_PARAMS) {
super.publish(message, options)
}
}
5 changes: 5 additions & 0 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export type AMQPCreationConfig = {
updateAttributesIfExists?: boolean
}

export type AMQPSubscriptionConfig = {
exchange: string
routingKey: string
}

export type AMQPLocator = {
queueName: string
}
Expand Down
96 changes: 96 additions & 0 deletions packages/amqp/lib/AmqpExchangePublisherManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas'
import type { TypeOf } from 'zod'
import type z from 'zod'
import { util } from 'zod'

import type {
AbstractAmqpExchangePublisher,
AMQPExchangePublisherOptions,
} from './AbstractAmqpExchangePublisher'
import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher'
import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService'
import type {
AmqpAwareEventDefinition,
AmqpMessageSchemaType,
AmqpPublisherManagerDependencies,
AmqpPublisherManagerOptions,
} from './AmqpQueuePublisherManager'
import { CommonAmqpExchangePublisherFactory } from './CommonAmqpPublisherFactory'

import Omit = util.Omit
Copy link
Collaborator

@CarlosGamero CarlosGamero May 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 nit: Importing omit could cause confusing with TS Omit, I would prefer using util.Omit or even z.util.Omit (I think it is coming from zod, sorry if I misunderstood it) to make it more specific. But this is a very nitpicky comment, feel free to ignore

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, this was autoresolve by WebStorm, not intentional. I wanted to use TS Omit


export class AmqpExchangePublisherManager<
T extends AbstractAmqpExchangePublisher<z.infer<SupportedEventDefinitions[number]['schema']>>,
SupportedEventDefinitions extends AmqpAwareEventDefinition[],
MetadataType = MessageMetadataType,
> extends AbstractPublisherManager<
AmqpAwareEventDefinition,
NonNullable<SupportedEventDefinitions[number]['exchange']>,
AbstractAmqpExchangePublisher<z.infer<SupportedEventDefinitions[number]['schema']>>,
AMQPDependencies,
AMQPCreationConfig,
AMQPLocator,
AmqpMessageSchemaType<AmqpAwareEventDefinition>,
Omit<
AMQPExchangePublisherOptions<z.infer<SupportedEventDefinitions[number]['schema']>>,
'messageSchemas' | 'locatorConfig' | 'exchange'
>,
SupportedEventDefinitions,
MetadataType,
z.infer<SupportedEventDefinitions[number]['schema']>
> {
constructor(
dependencies: AmqpPublisherManagerDependencies<SupportedEventDefinitions>,
options: AmqpPublisherManagerOptions<
T,
AmqpQueueMessageOptions,
AMQPExchangePublisherOptions<z.infer<SupportedEventDefinitions[number]['schema']>>,
z.infer<SupportedEventDefinitions[number]['schema']>,
MetadataType
>,
) {
super({
isAsync: false,
eventRegistry: dependencies.eventRegistry,
metadataField: options.metadataField ?? 'metadata',
metadataFiller: options.metadataFiller,
newPublisherOptions: options.newPublisherOptions,
publisherDependencies: {
amqpConnectionManager: dependencies.amqpConnectionManager,
logger: dependencies.logger,
errorReporter: dependencies.errorReporter,
},
publisherFactory: options.publisherFactory ?? new CommonAmqpExchangePublisherFactory(),
})
}

protected resolvePublisherConfigOverrides(
exchange: string,
): Partial<
util.Omit<
AMQPExchangePublisherOptions<TypeOf<SupportedEventDefinitions[number]['schema']>>,
'messageSchemas' | 'locatorConfig'
>
> {
return {
exchange,
}
}

protected override resolveCreationConfig(
queueName: NonNullable<SupportedEventDefinitions[number]['exchange']>,
): AMQPCreationConfig {
return {
...this.newPublisherOptions,
queueOptions: {},
queueName,
}
}

protected override resolveEventTarget(
event: AmqpAwareEventDefinition,
): NonNullable<SupportedEventDefinitions[number]['exchange']> | undefined {
return event.queueName
}
}
34 changes: 34 additions & 0 deletions packages/amqp/lib/AmqpQueuePublisherManager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { AwilixContainer } from 'awilix'
import { beforeAll } from 'vitest'

import { FakeConsumer } from '../test/fakes/FakeConsumer'
import { TEST_AMQP_CONFIG } from '../test/utils/testAmqpConfig'
import { registerDependencies, TestEvents } from '../test/utils/testContext'
import type { Dependencies } from '../test/utils/testContext'

describe('AmqpQueuePublisherManager', () => {
describe('publish', () => {
let diContainer: AwilixContainer<Dependencies>
beforeAll(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG)
})

it('publishes to the correct queue', async () => {
const { queuePublisherManager } = diContainer.cradle
const fakeConsumer = new FakeConsumer(diContainer.cradle, TestEvents.updated)
await fakeConsumer.start()

const publishedMessage = await queuePublisherManager.publish(FakeConsumer.QUEUE_NAME, {
...queuePublisherManager.resolveBaseFields(),
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
})

const result = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id)

expect(result.processingResult).toBe('consumed')
})
})
})
Loading
Loading