Skip to content

Commit

Permalink
AP-5046 outbox-core package for transactional outbox pattern (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilwylegala authored Sep 16, 2024
1 parent ab8ce42 commit 1faf89b
Show file tree
Hide file tree
Showing 12 changed files with 793 additions and 0 deletions.
63 changes: 63 additions & 0 deletions packages/outbox-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# outbox-core

Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages.

## Installation

```bash
npm i -S @message-queue-toolkit/outbox-core
```

## Usage

To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class:

```typescript
import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core';

const job = new OutboxPeriodicJob(
//Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit
outboxStorage,
//Default available accumulator for gathering outbox entries as the process job is progressing.
new InMemoryOutboxAccumulator(),
//DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core
eventEmitter,
//See PeriodicJobDependencies from @lokalise/background-jobs-common
dependencies,
//Retry count, how many times outbox entries should be retried to be processed
3,
//emitBatchSize - how many outbox entries should be emitted at once
10,
//internalInMs - how often the job should be executed, e.g. below it runs every 1sec
1000
)
```

Job will take care of processing outbox entries emitted by:
```typescript
import {
type CommonEventDefinition,
enrichMessageSchemaWithBase,
} from '@message-queue-toolkit/schemas'

const MyEvents = {
created: {
...enrichMessageSchemaWithBase(
'entity.created',
z.object({
message: z.string(),
}),
),
},
} as const satisfies Record<string, CommonEventDefinition>

type MySupportedEvents = (typeof TestEvents)[keyof typeof TestEvents][]

const emitter = new OutboxEventEmitter<MySupportedEvents>(
//Same instance of outbox storage that is used by OutboxPeriodicJob
outboxStorage
)

//It pushes the entry to the storage, later will be picked up by the OutboxPeriodicJob
await emitter.emit(/* args */)
```
4 changes: 4 additions & 0 deletions packages/outbox-core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './lib/outbox'
export * from './lib/objects'
export * from './lib/accumulators'
export * from './lib/storage'
73 changes: 73 additions & 0 deletions packages/outbox-core/lib/accumulators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { OutboxEntry } from './objects.ts'

/**
* Accumulator is responsible for storing outbox entries in two cases:
* - successfully dispatched event
* - failed events
*
* Thanks to this, we can use aggregated result and persist in the storage in batches.
*/
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
/**
* Accumulates successfully dispatched event.
* @param outboxEntry
*/
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* Accumulates failed event.
* @param outboxEntry
*/
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* Returns all entries that should be persisted as successful ones.
*/
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* Returns all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented.
*/
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* After running clear(), no entries should be returned by getEntries() and getFailedEntries().
*
* clear() is always called after flush() in OutboxStorage.
*/
clear(): Promise<void>
}

export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
implements OutboxAccumulator<SupportedEvents>
{
private entries: OutboxEntry<SupportedEvents[number]>[] = []
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []

public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.entries.push(outboxEntry)

return Promise.resolve()
}

public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.failedEntries.push(outboxEntry)

return Promise.resolve()
}

getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.entries)
}

getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.failedEntries)
}

public clear(): Promise<void> {
this.entries = []
this.failedEntries = []
return Promise.resolve()
}
}
25 changes: 25 additions & 0 deletions packages/outbox-core/lib/objects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageMetadataType,
} from '@message-queue-toolkit/schemas'

/**
* Status of the outbox entry.
* - CREATED - entry was created and is waiting to be processed to publish actual event
* - ACKED - entry was picked up by outbox job and is being processed
* - SUCCESS - entry was successfully processed, event was published
* - FAILED - entry processing failed, it will be retried
*/
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'

export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
id: string
event: SupportedEvent
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
status: OutboxEntryStatus
created: Date
updated?: Date
retryCount: number
}
150 changes: 150 additions & 0 deletions packages/outbox-core/lib/outbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common'
import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common'
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageMetadataType,
DomainEventEmitter,
} from '@message-queue-toolkit/core'
import { PromisePool } from '@supercharge/promise-pool'
import { uuidv7 } from 'uuidv7'
import type { OutboxAccumulator } from './accumulators'
import type { OutboxEntry } from './objects'
import type { OutboxStorage } from './storage'

export type OutboxDependencies<SupportedEvents extends CommonEventDefinition[]> = {
outboxStorage: OutboxStorage<SupportedEvents>
outboxAccumulator: OutboxAccumulator<SupportedEvents>
eventEmitter: DomainEventEmitter<SupportedEvents>
}

export type OutboxProcessorConfiguration = {
maxRetryCount: number
emitBatchSize: number
}

export type OutboxConfiguration = {
jobIntervalInMs: number
} & OutboxProcessorConfiguration

/**
* Main logic for handling outbox entries.
*
* If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up.
*/
export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
constructor(
private readonly outboxDependencies: OutboxDependencies<SupportedEvents>,
private readonly outboxProcessorConfiguration: OutboxProcessorConfiguration,
) {}

public async processOutboxEntries(context: JobExecutionContext) {
const { outboxStorage, eventEmitter, outboxAccumulator } = this.outboxDependencies

const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount)

const filteredEntries =
entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator)

await PromisePool.for(filteredEntries)
.withConcurrency(this.outboxProcessorConfiguration.emitBatchSize)
.process(async (entry) => {
try {
await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
await outboxAccumulator.add(entry)
} catch (e) {
context.logger.error({ error: e }, 'Failed to process outbox entry.')

await outboxAccumulator.addFailure(entry)
}
})

await outboxStorage.flush(outboxAccumulator)
await outboxAccumulator.clear()
}

private async getFilteredEntries(
entries: OutboxEntry<SupportedEvents[number]>[],
outboxAccumulator: OutboxAccumulator<SupportedEvents>,
) {
const currentEntriesInAccumulator = new Set(
(await outboxAccumulator.getEntries()).map((entry) => entry.id),
)
return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id))
}
}

/**
* Periodic job that processes outbox entries every "intervalInMs". If processing takes longer than defined interval, another subsequent job WILL NOT be started.
*
* When event is published, and then entry is accumulated into SUCCESS group. If processing fails, entry is accumulated as FAILED and will be retried.
*
* Max retry count is defined by the user.
*/
/* c8 ignore start */
export class OutboxPeriodicJob<
SupportedEvents extends CommonEventDefinition[],
> extends AbstractPeriodicJob {
private readonly outboxProcessor: OutboxProcessor<SupportedEvents>

constructor(
outboxDependencies: OutboxDependencies<SupportedEvents>,
outboxConfiguration: OutboxConfiguration,
dependencies: PeriodicJobDependencies,
) {
super(
{
jobId: 'OutboxJob',
schedule: {
intervalInMs: outboxConfiguration.jobIntervalInMs,
},
singleConsumerMode: {
enabled: true,
},
},
{
redis: dependencies.redis,
logger: dependencies.logger,
transactionObservabilityManager: dependencies.transactionObservabilityManager,
errorReporter: dependencies.errorReporter,
scheduler: dependencies.scheduler,
},
)

this.outboxProcessor = new OutboxProcessor<SupportedEvents>(
outboxDependencies,
outboxConfiguration,
)
}

protected async processInternal(context: JobExecutionContext): Promise<void> {
await this.outboxProcessor.processOutboxEntries(context)
}
}
/* c8 ignore stop */

export class OutboxEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
constructor(private storage: OutboxStorage<SupportedEvents>) {}

/**
* Persists outbox entry in persistence layer, later it will be picked up by outbox job.
* @param supportedEvent
* @param data
* @param precedingMessageMetadata
*/
public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
) {
await this.storage.createEntry({
id: uuidv7(),
event: supportedEvent,
data,
precedingMessageMetadata,
status: 'CREATED',
created: new Date(),
retryCount: 0,
})
}
}
32 changes: 32 additions & 0 deletions packages/outbox-core/lib/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { OutboxAccumulator } from './accumulators'
import type { OutboxEntry } from './objects'

/**
* Takes care of persisting and retrieving outbox entries.
*
* Implementation is required:
* - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction
* - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter)
* - returned entries should not include the ones with 'SUCCESS' status
*/
export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]> {
createEntry(
outboxEntry: OutboxEntry<SupportedEvents[number]>,
): Promise<OutboxEntry<SupportedEvents[number]>>

/**
* Responsible for taking all entries from the accumulator and persisting them in the storage.
*
* - Items that are in OutboxAccumulator::getEntries MUST be changed to SUCCESS status and `updatedAt` field needs to be set.
* - Items that are in OutboxAccumulator::getFailedEntries MUST be changed to FAILED status, `updatedAt` field needs to be set and retryCount needs to be incremented.
*/
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void>

/**
* Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times.
*
* For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned.
*/
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
}
Loading

0 comments on commit 1faf89b

Please sign in to comment.