Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement AbstractPublisherManager #143

Merged
merged 3 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ export * from './lib/events/baseEventSchemas'
export * from './lib/messages/baseMessageSchemas'

export * from './lib/messages/MetadataFiller'

export * from './lib/queues/AbstractPublisherManager'
229 changes: 229 additions & 0 deletions packages/core/lib/queues/AbstractPublisherManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import type {
AsyncPublisher,
CommonEventDefinition,
QueuePublisherOptions,
} from '@message-queue-toolkit/core'
import type { TypeOf, z } from 'zod'

import type { EventRegistry } from '../events/EventRegistry'
import type { BaseEventType } from '../events/baseEventSchemas'
import type { MetadataFiller } from '../messages/MetadataFiller'
import type { SyncPublisher } from '../types/MessageQueueTypes'
import type { CommonCreationConfigType } from '../types/queueOptionsTypes'

export type MessagePublishType<T extends CommonEventDefinition> = Pick<
z.infer<T['schema']>,
'type' | 'payload'
> & { id?: string }

export type MessageSchemaType<T extends CommonEventDefinition> = z.infer<T['schema']>

export type AbstractPublisherFactory<
PublisherType extends AsyncPublisher<object, unknown> | SyncPublisher<object>,
DependenciesType,
CreationConfigType extends CommonCreationConfigType,
QueueLocatorType extends object,
EventType extends BaseEventType,
OptionsType extends Omit<
QueuePublisherOptions<CreationConfigType, QueueLocatorType, EventType>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
>,
> = {
buildPublisher(dependencies: DependenciesType, options: OptionsType): PublisherType
}

export abstract class AbstractPublisherManager<
EventDefinitionType extends CommonEventDefinition,
EventTargets extends string,
PublisherType extends AsyncPublisher<object, unknown> | SyncPublisher<object>,
DependenciesType,
CreationConfigType extends CommonCreationConfigType,
QueueLocatorType extends object,
EventType extends BaseEventType,
OptionsType extends Omit<
QueuePublisherOptions<CreationConfigType, QueueLocatorType, EventType>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
>,
SupportedEventDefinitions extends EventDefinitionType[],
MetadataType,
MessageOptionsType,
> {
private readonly publisherFactory: AbstractPublisherFactory<
PublisherType,
DependenciesType,
CreationConfigType,
QueueLocatorType,
EventType,
OptionsType
>

protected readonly newPublisherOptions: OptionsType

protected readonly metadataFiller: MetadataFiller<
z.infer<SupportedEventDefinitions[number]['schema']>,
MetadataType
>
protected readonly metadataField: string

// In this context "target" can be a topic or an exchange, depending on the transport
protected readonly targetToEventMap: Record<EventTargets, EventDefinitionType[]> = {} as Record<
EventTargets,
EventDefinitionType[]
>
protected readonly isAsync: boolean
protected targetToPublisherMap: Record<EventTargets, PublisherType> = {} as Record<
EventTargets,
PublisherType
>
private readonly publisherDependencies: DependenciesType

protected constructor({
publisherFactory,
newPublisherOptions,
publisherDependencies,
metadataFiller,
eventRegistry,
metadataField,
isAsync,
}: {
publisherFactory: AbstractPublisherFactory<
PublisherType,
DependenciesType,
CreationConfigType,
QueueLocatorType,
EventType,
OptionsType
>
newPublisherOptions: OptionsType
publisherDependencies: DependenciesType
metadataFiller: MetadataFiller<
TypeOf<SupportedEventDefinitions[number]['schema']>,
MetadataType
>
eventRegistry: EventRegistry<SupportedEventDefinitions>
metadataField: string
isAsync: boolean
}) {
this.publisherFactory = publisherFactory
this.newPublisherOptions = newPublisherOptions
this.metadataFiller = metadataFiller
this.metadataField = metadataField
this.isAsync = isAsync
this.publisherDependencies = publisherDependencies

this.registerEvents(eventRegistry.supportedEvents)
this.registerPublishers()
}

protected abstract resolveEventTarget(event: EventDefinitionType): EventTargets | undefined

private registerEvents(events: SupportedEventDefinitions) {
for (const supportedEvent of events) {
const eventTarget = this.resolveEventTarget(supportedEvent)

if (!eventTarget) {
continue
}

if (!this.targetToEventMap[eventTarget]) {
this.targetToEventMap[eventTarget] = []
}

this.targetToEventMap[eventTarget].push(supportedEvent)
}
}

protected abstract resolveCreationConfig(eventTarget: string): CreationConfigType

private registerPublishers() {
for (const eventTarget in this.targetToEventMap) {
if (this.targetToPublisherMap[eventTarget]) {
continue
}

const messageSchemas = this.targetToEventMap[eventTarget].map((entry) => {
return entry.schema
})
const creationConfig = this.resolveCreationConfig(eventTarget)

this.targetToPublisherMap[eventTarget] = this.publisherFactory.buildPublisher(
this.publisherDependencies,
{
...this.newPublisherOptions,
creationConfig,
messageSchemas,
},
)
}
}

public injectPublisher(eventTarget: EventTargets, publisher: PublisherType) {
this.targetToPublisherMap[eventTarget] = publisher
}

public injectEventDefinition(eventDefinition: EventDefinitionType) {
const eventTarget = this.resolveEventTarget(eventDefinition)
if (!eventTarget) {
throw new Error('eventTarget could not be resolved for the event')
}

if (!this.targetToEventMap[eventTarget]) {
this.targetToEventMap[eventTarget] = []
}

this.targetToEventMap[eventTarget].push(eventDefinition)
}

public async publish(
eventTarget: EventTargets,
message: MessagePublishType<SupportedEventDefinitions[number]>,
precedingEventMetadata?: MetadataType,
messageOptions?: MessageOptionsType,
): Promise<MessageSchemaType<SupportedEventDefinitions[number]>> {
const publisher = this.targetToPublisherMap[eventTarget]
if (!publisher) {
throw new Error(`No publisher for target ${eventTarget}`)
}

// ToDo optimize the lookup
const messageDefinition = this.targetToEventMap[eventTarget].find(
(entry) => entry.schema.shape.type.value === message.type,
)

// @ts-ignore
const resolvedMetadata = message[this.metadataField]
? // @ts-ignore
message[this.metadataField]
: // @ts-ignore
this.metadataFiller.produceMetadata(message, messageDefinition, precedingEventMetadata)

const resolvedMessage: MessageSchemaType<SupportedEventDefinitions[number]> = {
id: message.id ? message.id : this.metadataFiller.produceId(),
timestamp: this.metadataFiller.produceTimestamp(),
...message,
// @ts-ignore
metadata: resolvedMetadata,
}

if (this.isAsync) {
await (publisher as AsyncPublisher<object, unknown>).publish(resolvedMessage, messageOptions)
} else {
(publisher as SyncPublisher<object>).publish(resolvedMessage)
}

return resolvedMessage
}

/**
* @param eventTarget - topic or exchange
*/
public handlerSpy(eventTarget: EventTargets) {
const publisher = this.targetToPublisherMap[eventTarget]

if (!publisher) {
throw new Error(`No publisher for target ${eventTarget}`)
}

return publisher.handlerSpy
}
}
8 changes: 6 additions & 2 deletions packages/core/lib/types/MessageQueueTypes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { TransactionObservabilityManager } from '@lokalise/node-core'
import type { ZodSchema } from 'zod'

import type { PublicHandlerSpy } from '../queues/HandlerSpy'

export interface QueueConsumer {
start(): Promise<unknown> // subscribe and start listening
close(): Promise<unknown>
Expand All @@ -13,11 +15,13 @@ export type MessageProcessingResult =
| 'error'
| 'invalid_message'

export interface SyncPublisher<MessagePayloadType> {
export interface SyncPublisher<MessagePayloadType extends object> {
handlerSpy: PublicHandlerSpy<MessagePayloadType>
publish(message: MessagePayloadType): void
}

export interface AsyncPublisher<MessagePayloadType, MessageOptions> {
export interface AsyncPublisher<MessagePayloadType extends object, MessageOptions> {
handlerSpy: PublicHandlerSpy<MessagePayloadType>
publish(message: MessagePayloadType, options: MessageOptions): Promise<unknown>
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export type CommonQueueOptions = {
deletionConfig?: DeletionConfig
}

type CommonCreationConfigType = {
export type CommonCreationConfigType = {
updateAttributesIfExists?: boolean
}

Expand Down
9 changes: 8 additions & 1 deletion packages/core/vitest.config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ export default defineConfig({
environment: 'node',
reporters: ['default'],
coverage: {
provider: 'v8',
include: ['lib/**/*.ts'],
exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*', 'lib/types/**/*.*'],
exclude: [
'lib/**/*.spec.ts',
'lib/**/*.test.ts',
'test/**/*.*',
'lib/types/**/*.*',
'lib/queues/AbstractPublisherManager.ts',
],
reporter: ['text'],
all: true,
thresholds: {
Expand Down
8 changes: 6 additions & 2 deletions packages/sns/lib/sns/SnsPublisherManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ describe('SnsPublisherManager', () => {

it('publish to a non-existing topic will throw error', async () => {
await expect(
// @ts-expect-error Testing error scenario
publisherManager.publish('non-existing-topic', {
type: 'entity.created',
payload: {
message: 'msg',
},
}),
).rejects.toThrow('No publisher for topic non-existing-topic')
).rejects.toThrow('No publisher for target non-existing-topic')
})
})

Expand All @@ -99,8 +100,9 @@ describe('SnsPublisherManager', () => {
})

it('returns error when no publisher for topic', () => {
// @ts-expect-error Testing incorrect scenario
expect(() => publisherManager.handlerSpy('non-existing-topic')).toThrow(
'No publisher for topic non-existing-topic',
'No publisher for target non-existing-topic',
)
})
})
Expand Down Expand Up @@ -132,6 +134,7 @@ describe('SnsPublisherManager', () => {
schemaVersion: '2.0.0',
})

// @ts-expect-error Testing injected publisher
await publisherManager.publish(topic, {
id: messageId,
type: 'entity.created',
Expand All @@ -142,6 +145,7 @@ describe('SnsPublisherManager', () => {

// Then
const spyRes = await publisherManager
// @ts-expect-error Testing injected publisher
.handlerSpy(topic)
.waitForMessageWithId(messageId, 'published')
expect(spyRes.processingResult).toBe('published')
Expand Down
Loading
Loading