Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-5046 outbox-core package for transactional outbox pattern #204

Merged
merged 31 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
64756c4
AP-5046 New package.
kamilwylegala Sep 3, 2024
cf87550
AP-5046 Implementation + interfaces snippets.
kamilwylegala Sep 4, 2024
047dc08
Tests WIP.
kamilwylegala Sep 4, 2024
9a9bf28
WIP tests
kamilwylegala Sep 4, 2024
c3b08ea
another test
kamilwylegala Sep 4, 2024
043c626
AP-5046 Test.
kamilwylegala Sep 5, 2024
67808d5
AP-5046 Comment.
kamilwylegala Sep 5, 2024
cae5c34
Configurable intervalInMs via constructor.
kamilwylegala Sep 5, 2024
9ac95fa
Fixed import.
kamilwylegala Sep 5, 2024
3dcc037
No locked until for now.
kamilwylegala Sep 5, 2024
4fe8053
Direct uuidv7 lib usage.
kamilwylegala Sep 5, 2024
eb6aa5e
Updated keywords.
kamilwylegala Sep 5, 2024
3333be5
AP-5046 PromisePool
kamilwylegala Sep 6, 2024
31522bb
Divided into files.
kamilwylegala Sep 6, 2024
be3f37b
Removed update method from storage.
kamilwylegala Sep 6, 2024
d460478
JS doc for flush method.
kamilwylegala Sep 6, 2024
1d0ef62
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
fffca57
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
d45a374
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
037c8f1
Updated comment.
kamilwylegala Sep 9, 2024
29bc350
AP-5046 config + dependencies object.
kamilwylegala Sep 9, 2024
94bbe85
AP-5046 OutboxProcessorConfig.
kamilwylegala Sep 9, 2024
db1e1e3
Prefilter entries if any exist in accumulator.
kamilwylegala Sep 9, 2024
34dff00
AP-5046 createEntry.
kamilwylegala Sep 9, 2024
e7a80de
Extra exports.
kamilwylegala Sep 9, 2024
b79bc83
Extended readme.
kamilwylegala Sep 11, 2024
e35c334
Updated comment.
kamilwylegala Sep 11, 2024
23d380c
Push to array.
kamilwylegala Sep 11, 2024
6792331
Dedicated directory for tests.
kamilwylegala Sep 11, 2024
9849e68
AP-5046 Skip filtering if empty entries.
kamilwylegala Sep 11, 2024
239914e
AP-5046 Updated package json + ignore job definition cc.
kamilwylegala Sep 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions packages/outbox-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# outbox-core

Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages.

## Installation

```bash
npm i -S @message-queue-toolkit/outbox-core
```

## Usage

To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class:

```typescript
import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core';

const job = new OutboxPeriodicJob(
//Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit
outboxStorage,
//Default available accumulator for gathering outbox entries as the process job is progressing.
new InMemoryOutboxAccumulator(),
//DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core
eventEmitter,
//See PeriodicJobDependencies from @lokalise/background-jobs-common
dependencies,
//Retry count, how many times outbox entries should be retried to be processed
3,
//emitBatchSize - how many outbox entries should be emitted at once
10,
//internalInMs - how often the job should be executed, e.g. below it runs every 1sec
1000
)
```

Job will take care of processing outbox entries emitted by:
kamilwylegala marked this conversation as resolved.
Show resolved Hide resolved
```typescript
import {
type CommonEventDefinition,
enrichMessageSchemaWithBase,
} from '@message-queue-toolkit/schemas'

const MyEvents = {
created: {
...enrichMessageSchemaWithBase(
'entity.created',
z.object({
message: z.string(),
}),
),
},
} as const satisfies Record<string, CommonEventDefinition>

type MySupportedEvents = (typeof TestEvents)[keyof typeof TestEvents][]

const emitter = new OutboxEventEmitter<MySupportedEvents>(
//Same instance of outbox storage that is used by OutboxPeriodicJob
outboxStorage
)

//It pushes the entry to the storage, later will be picked up by the OutboxPeriodicJob
await emitter.emit(/* args */)
```
4 changes: 4 additions & 0 deletions packages/outbox-core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './lib/outbox'
export * from './lib/objects'
export * from './lib/accumulators'
export * from './lib/storage'
73 changes: 73 additions & 0 deletions packages/outbox-core/lib/accumulators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { OutboxEntry } from './objects.ts'

/**
* Accumulator is responsible for storing outbox entries in two cases:
* - successfully dispatched event
* - failed events
*
* Thanks to this, we can use aggregated result and persist in the storage in batches.
*/
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
/**
* Accumulates successfully dispatched event.
* @param outboxEntry
*/
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* Accumulates failed event.
* @param outboxEntry
*/
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* Returns all entries that should be persisted as successful ones.
*/
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* Returns all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented.
*/
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* After running clear(), no entries should be returned by getEntries() and getFailedEntries().
*
* clear() is always called after flush() in OutboxStorage.
*/
clear(): Promise<void>
}

export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
implements OutboxAccumulator<SupportedEvents>
{
private entries: OutboxEntry<SupportedEvents[number]>[] = []
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []

public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.entries.push(outboxEntry)

return Promise.resolve()
}

public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.failedEntries.push(outboxEntry)

return Promise.resolve()
}

getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.entries)
}

getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.failedEntries)
}

public clear(): Promise<void> {
this.entries = []
this.failedEntries = []
return Promise.resolve()
}
}
25 changes: 25 additions & 0 deletions packages/outbox-core/lib/objects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageMetadataType,
} from '@message-queue-toolkit/schemas'

/**
* Status of the outbox entry.
* - CREATED - entry was created and is waiting to be processed to publish actual event
* - ACKED - entry was picked up by outbox job and is being processed
* - SUCCESS - entry was successfully processed, event was published
* - FAILED - entry processing failed, it will be retried
*/
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'

export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
id: string
event: SupportedEvent
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
status: OutboxEntryStatus
created: Date
updated?: Date
retryCount: number
}
150 changes: 150 additions & 0 deletions packages/outbox-core/lib/outbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common'
import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common'
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageMetadataType,
DomainEventEmitter,
} from '@message-queue-toolkit/core'
import { PromisePool } from '@supercharge/promise-pool'
import { uuidv7 } from 'uuidv7'
import type { OutboxAccumulator } from './accumulators'
import type { OutboxEntry } from './objects'
import type { OutboxStorage } from './storage'

export type OutboxDependencies<SupportedEvents extends CommonEventDefinition[]> = {
outboxStorage: OutboxStorage<SupportedEvents>
outboxAccumulator: OutboxAccumulator<SupportedEvents>
eventEmitter: DomainEventEmitter<SupportedEvents>
}

export type OutboxProcessorConfiguration = {
maxRetryCount: number
emitBatchSize: number
}

export type OutboxConfiguration = {
jobIntervalInMs: number
} & OutboxProcessorConfiguration

/**
* Main logic for handling outbox entries.
*
* If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up.
*/
export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
constructor(
private readonly outboxDependencies: OutboxDependencies<SupportedEvents>,
private readonly outboxProcessorConfiguration: OutboxProcessorConfiguration,
) {}

public async processOutboxEntries(context: JobExecutionContext) {
const { outboxStorage, eventEmitter, outboxAccumulator } = this.outboxDependencies

const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount)

const filteredEntries =
entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator)

await PromisePool.for(filteredEntries)
.withConcurrency(this.outboxProcessorConfiguration.emitBatchSize)
.process(async (entry) => {
try {
await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
await outboxAccumulator.add(entry)
} catch (e) {
kamilwylegala marked this conversation as resolved.
Show resolved Hide resolved
context.logger.error({ error: e }, 'Failed to process outbox entry.')

await outboxAccumulator.addFailure(entry)
}
})

await outboxStorage.flush(outboxAccumulator)
await outboxAccumulator.clear()
}

private async getFilteredEntries(
entries: OutboxEntry<SupportedEvents[number]>[],
outboxAccumulator: OutboxAccumulator<SupportedEvents>,
) {
const currentEntriesInAccumulator = new Set(
(await outboxAccumulator.getEntries()).map((entry) => entry.id),
)
return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id))
}
}

/**
* Periodic job that processes outbox entries every "intervalInMs". If processing takes longer than defined interval, another subsequent job WILL NOT be started.
*
* When event is published, and then entry is accumulated into SUCCESS group. If processing fails, entry is accumulated as FAILED and will be retried.
*
* Max retry count is defined by the user.
*/
/* c8 ignore start */
export class OutboxPeriodicJob<
SupportedEvents extends CommonEventDefinition[],
> extends AbstractPeriodicJob {
private readonly outboxProcessor: OutboxProcessor<SupportedEvents>

constructor(
outboxDependencies: OutboxDependencies<SupportedEvents>,
outboxConfiguration: OutboxConfiguration,
dependencies: PeriodicJobDependencies,
) {
super(
{
jobId: 'OutboxJob',
schedule: {
intervalInMs: outboxConfiguration.jobIntervalInMs,
},
singleConsumerMode: {
enabled: true,
},
},
{
redis: dependencies.redis,
logger: dependencies.logger,
transactionObservabilityManager: dependencies.transactionObservabilityManager,
errorReporter: dependencies.errorReporter,
scheduler: dependencies.scheduler,
},
)

this.outboxProcessor = new OutboxProcessor<SupportedEvents>(
outboxDependencies,
outboxConfiguration,
)
}

protected async processInternal(context: JobExecutionContext): Promise<void> {
await this.outboxProcessor.processOutboxEntries(context)
}
}
/* c8 ignore stop */

export class OutboxEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
constructor(private storage: OutboxStorage<SupportedEvents>) {}

/**
* Persists outbox entry in persistence layer, later it will be picked up by outbox job.
* @param supportedEvent
* @param data
* @param precedingMessageMetadata
*/
public async emit<SupportedEvent extends SupportedEvents[number]>(
supportedEvent: SupportedEvent,
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
) {
await this.storage.createEntry({
id: uuidv7(),
event: supportedEvent,
data,
precedingMessageMetadata,
status: 'CREATED',
created: new Date(),
retryCount: 0,
})
}
}
32 changes: 32 additions & 0 deletions packages/outbox-core/lib/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { OutboxAccumulator } from './accumulators'
import type { OutboxEntry } from './objects'

/**
* Takes care of persisting and retrieving outbox entries.
*
* Implementation is required:
* - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction
* - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter)
* - returned entries should not include the ones with 'SUCCESS' status
*/
export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]> {
createEntry(
outboxEntry: OutboxEntry<SupportedEvents[number]>,
): Promise<OutboxEntry<SupportedEvents[number]>>

/**
* Responsible for taking all entries from the accumulator and persisting them in the storage.
*
* - Items that are in OutboxAccumulator::getEntries MUST be changed to SUCCESS status and `updatedAt` field needs to be set.
* - Items that are in OutboxAccumulator::getFailedEntries MUST be changed to FAILED status, `updatedAt` field needs to be set and retryCount needs to be incremented.
*/
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void>

/**
* Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times.
*
* For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned.
*/
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
kibertoad marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading