Skip to content

Commit

Permalink
Implement AMQP publisherManager (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored May 18, 2024
1 parent 6f07e59 commit 55ae060
Show file tree
Hide file tree
Showing 24 changed files with 619 additions and 122 deletions.
7 changes: 5 additions & 2 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ export type { AMQPQueueConfig } from './lib/AbstractAmqpService'
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export { AbstractAmqpPublisher, AMQPPublisherOptions } from './lib/AbstractAmqpPublisher'

export type { AmqpConfig } from './lib/amqpConnectionResolver'

export { resolveAmqpConnection } from './lib/amqpConnectionResolver'
export { AmqpConnectionManager } from './lib/AmqpConnectionManager'
export type { ConnectionReceiver } from './lib/AmqpConnectionManager'
export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer'

export * from './lib/AbstractAmqpQueuePublisher'
export * from './lib/AbstractAmqpExchangePublisher'
export * from './lib/AmqpExchangePublisherManager'
export * from './lib/AmqpQueuePublisherManager'
50 changes: 50 additions & 0 deletions packages/amqp/lib/AbstractAmqpExchangePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import type * as Buffer from 'node:buffer'

import { objectToBuffer } from '@message-queue-toolkit/core'
import type { Options } from 'amqplib/properties'

import type { AMQPPublisherOptions } from './AbstractAmqpPublisher'
import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'
import type { AMQPDependencies } from './AbstractAmqpService'

export type AMQPExchangePublisherOptions<MessagePayloadType extends object> = Omit<
AMQPPublisherOptions<MessagePayloadType>,
'creationConfig'
> & {
exchange: string
}

export type AmqpExchangeMessageOptions = {
routingKey: string
publishOptions: Options.Publish
}

export abstract class AbstractAmqpExchangePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpPublisher<MessagePayloadType, AmqpExchangeMessageOptions> {
constructor(
dependencies: AMQPDependencies,
options: AMQPExchangePublisherOptions<MessagePayloadType>,
) {
super(dependencies, {
...options,
// FixMe exchange publisher doesn't need queue at all
creationConfig: {
queueName: 'dummy',
queueOptions: {},
updateAttributesIfExists: false,
},
exchange: options.exchange,
locatorConfig: undefined,
})
}

protected publishInternal(message: Buffer, options: AmqpExchangeMessageOptions): void {
this.channel.publish(
this.exchange!,
options.routingKey,
objectToBuffer(message),
options.publishOptions,
)
}
}
18 changes: 12 additions & 6 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ export type AMQPPublisherOptions<MessagePayloadType extends object> = QueuePubli
AMQPCreationConfig,
AMQPLocator,
MessagePayloadType
>
> & {
exchange?: string
}

export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
export abstract class AbstractAmqpPublisher<MessagePayloadType extends object, MessageOptionsType>
extends AbstractAmqpService<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType, MessageOptionsType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
protected readonly exchange?: string

private initPromise?: Promise<void>

Expand All @@ -35,9 +38,10 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
messageSchemas,
messageTypeField: options.messageTypeField,
})
this.exchange = options.exchange
}

publish(message: MessagePayloadType): void {
publish(message: MessagePayloadType, options: MessageOptionsType): void {
const resolveSchemaResult = this.resolveSchema(message)
if (resolveSchemaResult.error) {
throw resolveSchemaResult.error
Expand All @@ -57,7 +61,7 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
*/
this.initPromise
.then(() => {
this.publish(message)
this.publish(message, options)
})
.catch((err) => {
this.handleError(err)
Expand All @@ -82,7 +86,7 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}

try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
this.publishInternal(objectToBuffer(message), options)
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
Expand All @@ -107,6 +111,8 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
}
}

protected abstract publishInternal(message: Buffer, options: MessageOptionsType): void

protected override resolveSchema(
message: MessagePayloadType,
): Either<Error, ZodSchema<MessagePayloadType>> {
Expand Down
23 changes: 23 additions & 0 deletions packages/amqp/lib/AbstractAmqpQueuePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { Options } from 'amqplib/properties'

import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'

export type AmqpQueueMessageOptions = {
publishOptions: Options.Publish
}

const NO_PARAMS: AmqpQueueMessageOptions = {
publishOptions: {},
}

export abstract class AbstractAmqpQueuePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpPublisher<MessagePayloadType, AmqpQueueMessageOptions> {
protected publishInternal(message: Buffer, options: AmqpQueueMessageOptions): void {
this.channel.sendToQueue(this.queueName, message, options.publishOptions)
}

publish(message: MessagePayloadType, options: AmqpQueueMessageOptions = NO_PARAMS) {
super.publish(message, options)
}
}
5 changes: 5 additions & 0 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export type AMQPCreationConfig = {
updateAttributesIfExists?: boolean
}

export type AMQPSubscriptionConfig = {
exchange: string
routingKey: string
}

export type AMQPLocator = {
queueName: string
}
Expand Down
93 changes: 93 additions & 0 deletions packages/amqp/lib/AmqpExchangePublisherManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas'
import type { TypeOf } from 'zod'
import type z from 'zod'

import type {
AbstractAmqpExchangePublisher,
AMQPExchangePublisherOptions,
} from './AbstractAmqpExchangePublisher'
import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher'
import type { AMQPCreationConfig, AMQPDependencies, AMQPLocator } from './AbstractAmqpService'
import type {
AmqpAwareEventDefinition,
AmqpMessageSchemaType,
AmqpPublisherManagerDependencies,
AmqpPublisherManagerOptions,
} from './AmqpQueuePublisherManager'
import { CommonAmqpExchangePublisherFactory } from './CommonAmqpPublisherFactory'

export class AmqpExchangePublisherManager<
T extends AbstractAmqpExchangePublisher<z.infer<SupportedEventDefinitions[number]['schema']>>,
SupportedEventDefinitions extends AmqpAwareEventDefinition[],
MetadataType = MessageMetadataType,
> extends AbstractPublisherManager<
AmqpAwareEventDefinition,
NonNullable<SupportedEventDefinitions[number]['exchange']>,
AbstractAmqpExchangePublisher<z.infer<SupportedEventDefinitions[number]['schema']>>,
AMQPDependencies,
AMQPCreationConfig,
AMQPLocator,
AmqpMessageSchemaType<AmqpAwareEventDefinition>,
Omit<
AMQPExchangePublisherOptions<z.infer<SupportedEventDefinitions[number]['schema']>>,
'messageSchemas' | 'locatorConfig' | 'exchange'
>,
SupportedEventDefinitions,
MetadataType,
z.infer<SupportedEventDefinitions[number]['schema']>
> {
constructor(
dependencies: AmqpPublisherManagerDependencies<SupportedEventDefinitions>,
options: AmqpPublisherManagerOptions<
T,
AmqpQueueMessageOptions,
AMQPExchangePublisherOptions<z.infer<SupportedEventDefinitions[number]['schema']>>,
z.infer<SupportedEventDefinitions[number]['schema']>,
MetadataType
>,
) {
super({
isAsync: false,
eventRegistry: dependencies.eventRegistry,
metadataField: options.metadataField ?? 'metadata',
metadataFiller: options.metadataFiller,
newPublisherOptions: options.newPublisherOptions,
publisherDependencies: {
amqpConnectionManager: dependencies.amqpConnectionManager,
logger: dependencies.logger,
errorReporter: dependencies.errorReporter,
},
publisherFactory: options.publisherFactory ?? new CommonAmqpExchangePublisherFactory(),
})
}

protected resolvePublisherConfigOverrides(
exchange: string,
): Partial<
Omit<
AMQPExchangePublisherOptions<TypeOf<SupportedEventDefinitions[number]['schema']>>,
'messageSchemas' | 'locatorConfig'
>
> {
return {
exchange,
}
}

protected override resolveCreationConfig(
queueName: NonNullable<SupportedEventDefinitions[number]['exchange']>,
): AMQPCreationConfig {
return {
...this.newPublisherOptions,
queueOptions: {},
queueName,
}
}

protected override resolveEventTarget(
event: AmqpAwareEventDefinition,
): NonNullable<SupportedEventDefinitions[number]['exchange']> | undefined {
return event.queueName
}
}
34 changes: 34 additions & 0 deletions packages/amqp/lib/AmqpQueuePublisherManager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import type { AwilixContainer } from 'awilix'
import { beforeAll } from 'vitest'

import { FakeConsumer } from '../test/fakes/FakeConsumer'
import { TEST_AMQP_CONFIG } from '../test/utils/testAmqpConfig'
import { registerDependencies, TestEvents } from '../test/utils/testContext'
import type { Dependencies } from '../test/utils/testContext'

describe('AmqpQueuePublisherManager', () => {
describe('publish', () => {
let diContainer: AwilixContainer<Dependencies>
beforeAll(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG)
})

it('publishes to the correct queue', async () => {
const { queuePublisherManager } = diContainer.cradle
const fakeConsumer = new FakeConsumer(diContainer.cradle, TestEvents.updated)
await fakeConsumer.start()

const publishedMessage = await queuePublisherManager.publish(FakeConsumer.QUEUE_NAME, {
...queuePublisherManager.resolveBaseFields(),
type: 'entity.updated',
payload: {
updatedData: 'msg',
},
})

const result = await fakeConsumer.handlerSpy.waitForMessageWithId(publishedMessage.id)

expect(result.processingResult).toBe('consumed')
})
})
})
Loading

0 comments on commit 55ae060

Please sign in to comment.