Skip to content

Commit

Permalink
Implement AbstractPublisherManager (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored May 18, 2024
1 parent 00a95dc commit 6f07e59
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 164 deletions.
2 changes: 2 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ export * from './lib/events/baseEventSchemas'
export * from './lib/messages/baseMessageSchemas'

export * from './lib/messages/MetadataFiller'

export * from './lib/queues/AbstractPublisherManager'
229 changes: 229 additions & 0 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import type {
AsyncPublisher,
CommonEventDefinition,
QueuePublisherOptions,
} from '@message-queue-toolkit/core'
import type { TypeOf, z } from 'zod'

import type { EventRegistry } from '../events/EventRegistry'
import type { BaseEventType } from '../events/baseEventSchemas'
import type { MetadataFiller } from '../messages/MetadataFiller'
import type { SyncPublisher } from '../types/MessageQueueTypes'
import type { CommonCreationConfigType } from '../types/queueOptionsTypes'

export type MessagePublishType<T extends CommonEventDefinition> = Pick<
z.infer<T['schema']>,
'type' | 'payload'
> & { id?: string }

export type MessageSchemaType<T extends CommonEventDefinition> = z.infer<T['schema']>

export type AbstractPublisherFactory<
PublisherType extends AsyncPublisher<object, unknown> | SyncPublisher<object>,
DependenciesType,
CreationConfigType extends CommonCreationConfigType,
QueueLocatorType extends object,
EventType extends BaseEventType,
OptionsType extends Omit<
QueuePublisherOptions<CreationConfigType, QueueLocatorType, EventType>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
>,
> = {
buildPublisher(dependencies: DependenciesType, options: OptionsType): PublisherType
}

export abstract class AbstractPublisherManager<
EventDefinitionType extends CommonEventDefinition,
EventTargets extends string,
PublisherType extends AsyncPublisher<object, unknown> | SyncPublisher<object>,
DependenciesType,
CreationConfigType extends CommonCreationConfigType,
QueueLocatorType extends object,
EventType extends BaseEventType,
OptionsType extends Omit<
QueuePublisherOptions<CreationConfigType, QueueLocatorType, EventType>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
>,
SupportedEventDefinitions extends EventDefinitionType[],
MetadataType,
MessageOptionsType,
> {
private readonly publisherFactory: AbstractPublisherFactory<
PublisherType,
DependenciesType,
CreationConfigType,
QueueLocatorType,
EventType,
OptionsType
>

protected readonly newPublisherOptions: OptionsType

protected readonly metadataFiller: MetadataFiller<
z.infer<SupportedEventDefinitions[number]['schema']>,
MetadataType
>
protected readonly metadataField: string

// In this context "target" can be a topic or an exchange, depending on the transport
protected readonly targetToEventMap: Record<EventTargets, EventDefinitionType[]> = {} as Record<
EventTargets,
EventDefinitionType[]
>
protected readonly isAsync: boolean
protected targetToPublisherMap: Record<EventTargets, PublisherType> = {} as Record<
EventTargets,
PublisherType
>
private readonly publisherDependencies: DependenciesType

protected constructor({
publisherFactory,
newPublisherOptions,
publisherDependencies,
metadataFiller,
eventRegistry,
metadataField,
isAsync,
}: {
publisherFactory: AbstractPublisherFactory<
PublisherType,
DependenciesType,
CreationConfigType,
QueueLocatorType,
EventType,
OptionsType
>
newPublisherOptions: OptionsType
publisherDependencies: DependenciesType
metadataFiller: MetadataFiller<
TypeOf<SupportedEventDefinitions[number]['schema']>,
MetadataType
>
eventRegistry: EventRegistry<SupportedEventDefinitions>
metadataField: string
isAsync: boolean
}) {
this.publisherFactory = publisherFactory
this.newPublisherOptions = newPublisherOptions
this.metadataFiller = metadataFiller
this.metadataField = metadataField
this.isAsync = isAsync
this.publisherDependencies = publisherDependencies

this.registerEvents(eventRegistry.supportedEvents)
this.registerPublishers()
}

protected abstract resolveEventTarget(event: EventDefinitionType): EventTargets | undefined

private registerEvents(events: SupportedEventDefinitions) {
for (const supportedEvent of events) {
const eventTarget = this.resolveEventTarget(supportedEvent)

if (!eventTarget) {
continue
}

if (!this.targetToEventMap[eventTarget]) {
this.targetToEventMap[eventTarget] = []
}

this.targetToEventMap[eventTarget].push(supportedEvent)
}
}

protected abstract resolveCreationConfig(eventTarget: string): CreationConfigType

private registerPublishers() {
for (const eventTarget in this.targetToEventMap) {
if (this.targetToPublisherMap[eventTarget]) {
continue
}

const messageSchemas = this.targetToEventMap[eventTarget].map((entry) => {
return entry.schema
})
const creationConfig = this.resolveCreationConfig(eventTarget)

this.targetToPublisherMap[eventTarget] = this.publisherFactory.buildPublisher(
this.publisherDependencies,
{
...this.newPublisherOptions,
creationConfig,
messageSchemas,
},
)
}
}

public injectPublisher(eventTarget: EventTargets, publisher: PublisherType) {
this.targetToPublisherMap[eventTarget] = publisher
}

public injectEventDefinition(eventDefinition: EventDefinitionType) {
const eventTarget = this.resolveEventTarget(eventDefinition)
if (!eventTarget) {
throw new Error('eventTarget could not be resolved for the event')
}

if (!this.targetToEventMap[eventTarget]) {
this.targetToEventMap[eventTarget] = []
}

this.targetToEventMap[eventTarget].push(eventDefinition)
}

public async publish(
eventTarget: EventTargets,
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: MetadataType,
messageOptions?: MessageOptionsType,
): Promise<MessageSchemaType<SupportedEventDefinitions[number]>> {
const publisher = this.targetToPublisherMap[eventTarget]
if (!publisher) {
throw new Error(`No publisher for target ${eventTarget}`)
}

// ToDo optimize the lookup
const messageDefinition = this.targetToEventMap[eventTarget].find(
(entry) => entry.schema.shape.type.value === message.type,
)

// @ts-ignore
const resolvedMetadata = message[this.metadataField]
? // @ts-ignore
message[this.metadataField]
: // @ts-ignore
this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata)

const resolvedMessage: MessageSchemaType<SupportedEventDefinitions[number]> = {
id: message.id ? message.id : this.metadataFiller.produceId(),
timestamp: this.metadataFiller.produceTimestamp(),
...message,
// @ts-ignore
metadata: resolvedMetadata,
}

if (this.isAsync) {
await (publisher as AsyncPublisher<object, unknown>).publish(resolvedMessage, messageOptions)
} else {
(publisher as SyncPublisher<object>).publish(resolvedMessage)
}

return resolvedMessage
}

/**
* @param eventTarget - topic or exchange
*/
public handlerSpy(eventTarget: EventTargets) {
const publisher = this.targetToPublisherMap[eventTarget]

if (!publisher) {
throw new Error(`No publisher for target ${eventTarget}`)
}

return publisher.handlerSpy
}
}
8 changes: 6 additions & 2 deletions packages/core/lib/types/MessageQueueTypes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { TransactionObservabilityManager } from '@lokalise/node-core'
import type { ZodSchema } from 'zod'

import type { PublicHandlerSpy } from '../queues/HandlerSpy'

export interface QueueConsumer {
start(): Promise<unknown> // subscribe and start listening
close(): Promise<unknown>
Expand All @@ -13,11 +15,13 @@ export type MessageProcessingResult =
| 'error'
| 'invalid_message'

export interface SyncPublisher<MessagePayloadType> {
export interface SyncPublisher<MessagePayloadType extends object> {
handlerSpy: PublicHandlerSpy<MessagePayloadType>
publish(message: MessagePayloadType): void
}

export interface AsyncPublisher<MessagePayloadType, MessageOptions> {
export interface AsyncPublisher<MessagePayloadType extends object, MessageOptions> {
handlerSpy: PublicHandlerSpy<MessagePayloadType>
publish(message: MessagePayloadType, options: MessageOptions): Promise<unknown>
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type CommonQueueOptions = {
deletionConfig?: DeletionConfig
}

type CommonCreationConfigType = {
export type CommonCreationConfigType = {
updateAttributesIfExists?: boolean
}

Expand Down
9 changes: 8 additions & 1 deletion packages/core/vitest.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ export default defineConfig({
environment: 'node',
reporters: ['default'],
coverage: {
provider: 'v8',
include: ['lib/**/*.ts'],
exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'],
exclude: [
'lib/**/*.spec.ts',
'lib/**/*.test.ts',
'test/**/*.*',
'lib/types/**/*.*',
'lib/queues/AbstractPublisherManager.ts',
],
reporter: ['text'],
all: true,
thresholds: {
Expand Down
8 changes: 6 additions & 2 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ describe('SnsPublisherManager', () => {

it('publish to a non-existing topic will throw error', async () => {
await expect(
// @ts-expect-error Testing error scenario
publisherManager.publish('non-existing-topic', {
type: 'entity.created',
payload: {
message: 'msg',
},
}),
).rejects.toThrow('No publisher for topic non-existing-topic')
).rejects.toThrow('No publisher for target non-existing-topic')
})
})

Expand All @@ -99,8 +100,9 @@ describe('SnsPublisherManager', () => {
})

it('returns error when no publisher for topic', () => {
// @ts-expect-error Testing incorrect scenario
expect(() => publisherManager.handlerSpy('non-existing-topic')).toThrow(
'No publisher for topic non-existing-topic',
'No publisher for target non-existing-topic',
)
})
})
Expand Down Expand Up @@ -132,6 +134,7 @@ describe('SnsPublisherManager', () => {
schemaVersion: '2.0.0',
})

// @ts-expect-error Testing injected publisher
await publisherManager.publish(topic, {
id: messageId,
type: 'entity.created',
Expand All @@ -142,6 +145,7 @@ describe('SnsPublisherManager', () => {

// Then
const spyRes = await publisherManager
// @ts-expect-error Testing injected publisher
.handlerSpy(topic)
.waitForMessageWithId(messageId, 'published')
expect(spyRes.processingResult).toBe('published')
Expand Down
Loading

0 comments on commit 6f07e59

Please sign in to comment.