Skip to content

Commit

Permalink
Separate AbstractAmqpConsumer from AbstractAmqpQueueConsumer (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored May 19, 2024
1 parent 868a546 commit e62e2ca
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 62 deletions.
2 changes: 2 additions & 0 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export type { AMQPQueueConfig } from './lib/AbstractAmqpService'

export { AbstractAmqpQueueConsumer } from './lib/AbstractAmqpQueueConsumer'
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'

export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export type { AmqpConfig } from './lib/amqpConnectionResolver'
Expand Down
5 changes: 5 additions & 0 deletions packages/amqp/lib/AbstractAmqpExchangePublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ export abstract class AbstractAmqpExchangePublisher<
options.publishOptions,
)
}

protected createMissingEntities(): Promise<void> {
this.logger.warn('Missing entity creation is not implemented')
return Promise.resolve()
}
}
12 changes: 12 additions & 0 deletions packages/amqp/lib/AbstractAmqpQueueConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { AbstractAmqpConsumer } from './AbstractAmqpConsumer'
import { ensureAmqpQueue } from './utils/amqpQueueUtils'

export class AbstractAmqpQueueConsumer<
MessagePayloadType extends object,
ExecutionContext,
PrehandlerOutput = undefined,
> extends AbstractAmqpConsumer<MessagePayloadType, ExecutionContext, PrehandlerOutput> {
protected override createMissingEntities(): Promise<void> {
return ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig)
}
}
5 changes: 5 additions & 0 deletions packages/amqp/lib/AbstractAmqpQueuePublisher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Options } from 'amqplib/properties'

import { AbstractAmqpPublisher } from './AbstractAmqpPublisher'
import { ensureAmqpQueue } from './utils/amqpQueueUtils'

export type AmqpQueueMessageOptions = {
publishOptions: Options.Publish
Expand All @@ -20,4 +21,8 @@ export abstract class AbstractAmqpQueuePublisher<
publish(message: MessagePayloadType, options: AmqpQueueMessageOptions = NO_PARAMS) {
super.publish(message, options)
}

protected override createMissingEntities(): Promise<void> {
return ensureAmqpQueue(this.connection!, this.channel, this.creationConfig, this.locatorConfig)
}
}
27 changes: 4 additions & 23 deletions packages/amqp/lib/AbstractAmqpService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Channel, Connection, Message } from 'amqplib'
import type { Options } from 'amqplib/properties'

import type { AmqpConnectionManager, ConnectionReceiver } from './AmqpConnectionManager'
import { deleteAmqp } from './utils/amqpInitter'
import { deleteAmqpQueue } from './utils/amqpQueueUtils'

export type AMQPDependencies = QueueDependencies & {
amqpConnectionManager: AmqpConnectionManager
Expand Down Expand Up @@ -99,7 +99,7 @@ export abstract class AbstractAmqpService<
}

if (this.deletionConfig && this.creationConfig) {
await deleteAmqp(this.channel, this.deletionConfig, this.creationConfig)
await deleteAmqpQueue(this.channel, this.deletionConfig, this.creationConfig)
}

this.channel.on('close', () => {
Expand All @@ -115,29 +115,10 @@ export abstract class AbstractAmqpService<
this.handleError(err)
})

if (this.creationConfig) {
await this.channel.assertQueue(
this.creationConfig.queueName,
this.creationConfig.queueOptions,
)
} else {
await this.checkQueueExists()
}
await this.createMissingEntities()
}

private async checkQueueExists() {
// queue check breaks channel if not successful
const checkChannel = await this.connection!.createChannel()
checkChannel.on('error', () => {
// it's OK
})
try {
await checkChannel.checkQueue(this.locatorConfig!.queueName)
await checkChannel.close()
} catch (err) {
throw new Error(`Queue with queueName ${this.locatorConfig!.queueName} does not exist.`)
}
}
protected abstract createMissingEntities(): Promise<void>

private async destroyChannel(): Promise<void> {
if (this.channel) {
Expand Down
21 changes: 17 additions & 4 deletions packages/amqp/lib/AmqpExchangePublisherManager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type { MessageMetadataType } from '@message-queue-toolkit/core/lib/messages/baseMessageSchemas'
import type { TypeOf } from 'zod'
import type {
MessagePublishType,
MessageSchemaType,
MessageMetadataType,
} from '@message-queue-toolkit/core'
import type z from 'zod'

import type {
AbstractAmqpExchangePublisher,
AmqpExchangeMessageOptions,
AMQPExchangePublisherOptions,
} from './AbstractAmqpExchangePublisher'
import type { AmqpQueueMessageOptions } from './AbstractAmqpQueuePublisher'
Expand Down Expand Up @@ -37,7 +41,7 @@ export class AmqpExchangePublisherManager<
>,
SupportedEventDefinitions,
MetadataType,
z.infer<SupportedEventDefinitions[number]['publisherSchema']>
AmqpExchangeMessageOptions
> {
constructor(
dependencies: AmqpPublisherManagerDependencies<SupportedEventDefinitions>,
Expand Down Expand Up @@ -68,7 +72,7 @@ export class AmqpExchangePublisherManager<
exchange: string,
): Partial<
Omit<
AMQPExchangePublisherOptions<TypeOf<SupportedEventDefinitions[number]['publisherSchema']>>,
AMQPExchangePublisherOptions<z.infer<SupportedEventDefinitions[number]['publisherSchema']>>,
'messageSchemas' | 'locatorConfig'
>
> {
Expand All @@ -87,6 +91,15 @@ export class AmqpExchangePublisherManager<
}
}

publish(
eventTarget: NonNullable<SupportedEventDefinitions[number]['exchange']>,
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: MetadataType,
messageOptions?: AmqpExchangeMessageOptions,
): Promise<MessageSchemaType<SupportedEventDefinitions[number]>> {
return super.publish(eventTarget, message, precedingEventMetadata, messageOptions)
}

protected override resolveEventTarget(
event: AmqpAwareEventDefinition,
): NonNullable<SupportedEventDefinitions[number]['exchange']> | undefined {
Expand Down
17 changes: 15 additions & 2 deletions packages/amqp/lib/AmqpQueuePublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type {
EventRegistry,
MetadataFiller,
MessageMetadataType,
MessagePublishType,
MessageSchemaType,
} from '@message-queue-toolkit/core'
import { AbstractPublisherManager } from '@message-queue-toolkit/core'
import type z from 'zod'
Expand Down Expand Up @@ -62,7 +64,7 @@ export class AmqpQueuePublisherManager<
MetadataType = MessageMetadataType,
> extends AbstractPublisherManager<
AmqpAwareEventDefinition,
NonNullable<SupportedEventDefinitions[number]['exchange']>,
NonNullable<SupportedEventDefinitions[number]['queueName']>,
AbstractAmqpQueuePublisher<z.infer<SupportedEventDefinitions[number]['publisherSchema']>>,
AMQPDependencies,
AMQPCreationConfig,
Expand All @@ -74,7 +76,7 @@ export class AmqpQueuePublisherManager<
>,
SupportedEventDefinitions,
MetadataType,
z.infer<SupportedEventDefinitions[number]['publisherSchema']>
AmqpQueueMessageOptions
> {
constructor(
dependencies: AmqpPublisherManagerDependencies<SupportedEventDefinitions>,
Expand Down Expand Up @@ -111,6 +113,17 @@ export class AmqpQueuePublisherManager<
}
}

publish(
queue: NonNullable<SupportedEventDefinitions[number]['queueName']>,
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: MetadataType,
messageOptions?: AmqpQueueMessageOptions,
): Promise<MessageSchemaType<SupportedEventDefinitions[number]>> {
// Purpose of this override is to provide better name for the first argument
// For AMQP Queues it is going to be queue
return super.publish(queue, message, precedingEventMetadata, messageOptions)
}

protected override resolveEventTarget(
event: AmqpAwareEventDefinition,
): NonNullable<SupportedEventDefinitions[number]['queueName']> | undefined {
Expand Down
27 changes: 0 additions & 27 deletions packages/amqp/lib/utils/amqpInitter.ts

This file was deleted.

57 changes: 57 additions & 0 deletions packages/amqp/lib/utils/amqpQueueUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { DeletionConfig } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'
import type { Channel, Connection } from 'amqplib'

import type { AMQPCreationConfig, AMQPLocator } from '../AbstractAmqpService'

export async function checkQueueExists(connection: Connection, locatorConfig: AMQPLocator) {
// queue check breaks channel if not successful
const checkChannel = await connection.createChannel()
checkChannel.on('error', () => {
// it's OK
})
try {
await checkChannel.checkQueue(locatorConfig.queueName)
await checkChannel.close()
} catch (err) {
throw new Error(`Queue with queueName ${locatorConfig.queueName} does not exist.`)
}
}

export async function ensureAmqpQueue(
connection: Connection,
channel: Channel,
creationConfig?: AMQPCreationConfig,
locatorConfig?: AMQPLocator,
) {
if (creationConfig) {
await channel.assertQueue(creationConfig.queueName, creationConfig.queueOptions)
} else {
if (!locatorConfig) {
throw new Error('locatorConfig is mandatory when creationConfig is not set')
}
await checkQueueExists(connection, locatorConfig)
}
}

export async function deleteAmqpQueue(
channel: Channel,
deletionConfig: DeletionConfig,
creationConfig: AMQPCreationConfig,
) {
if (!deletionConfig.deleteIfExists) {
return
}

if (isProduction() && !deletionConfig.forceDeleteInProduction) {
throw new Error(
'You are running autodeletion in production. This can and probably will cause a loss of data. If you are absolutely sure you want to do this, please set deletionConfig.forceDeleteInProduction to true',
)
}

if (!creationConfig.queueName) {
throw new Error('QueueName must be set for automatic deletion')
}

await channel.deleteQueue(creationConfig.queueName)
}
4 changes: 2 additions & 2 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { BarrierResult, Prehandler, PreHandlingOutputs } from '@message-que
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

import type { AMQPConsumerOptions } from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'

import type {
Expand Down Expand Up @@ -36,7 +36,7 @@ type AmqpPermissionConsumerOptions = Pick<
removePreHandlers?: Prehandler<SupportedEvents, ExecutionContext, PrehandlerOutput>[]
}

export class AmqpPermissionConsumer extends AbstractAmqpConsumer<
export class AmqpPermissionConsumer extends AbstractAmqpQueueConsumer<
SupportedEvents,
ExecutionContext,
PrehandlerOutput
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/test/fakes/CustomFakeConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import type { BaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'

export class CustomFakeConsumer extends AbstractAmqpConsumer<BaseMessageType, unknown> {
export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<BaseMessageType, unknown> {
public static readonly QUEUE_NAME = 'dummy-queue'
constructor(dependencies: AMQPConsumerDependencies, schema: ZodSchema) {
super(
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/test/fakes/FakeConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { BaseMessageType } from '@message-queue-toolkit/core'
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'

import { AbstractAmqpConsumer } from '../../lib/AbstractAmqpConsumer'
import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService'
import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager'

export class FakeConsumer extends AbstractAmqpConsumer<BaseMessageType, unknown> {
export class FakeConsumer extends AbstractAmqpQueueConsumer<BaseMessageType, unknown> {
public static readonly QUEUE_NAME = 'dummy-queue'
constructor(dependencies: AMQPConsumerDependencies, eventDefinition: AmqpAwareEventDefinition) {
super(
Expand Down
13 changes: 13 additions & 0 deletions packages/sns/lib/sns/SnsPublisherManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type {
CommonEventDefinition,
EventRegistry,
MessagePublishType,
MessageSchemaType,
MetadataFiller,
PublisherBaseEventType,
} from '@message-queue-toolkit/core'
Expand Down Expand Up @@ -87,6 +89,17 @@ export class SnsPublisherManager<
})
}

publish(
topic: NonNullable<SupportedEventDefinitions[number]['snsTopic']>,
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: MetadataType,
messageOptions?: SNSMessageOptions,
): Promise<MessageSchemaType<SupportedEventDefinitions[number]>> {
// Purpose of this override is to provide better name for the first argument
// For SNS it is going to be topic
return super.publish(topic, message, precedingEventMetadata, messageOptions)
}

protected override resolveCreationConfig(
eventTarget: NonNullable<SupportedEventDefinitions[number]['snsTopic']>,
): SNSCreationConfig {
Expand Down

0 comments on commit e62e2ca

Please sign in to comment.