Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-5046 outbox-core package for transactional outbox pattern #204

Merged
merged 31 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
64756c4
AP-5046 New package.
kamilwylegala Sep 3, 2024
cf87550
AP-5046 Implementation + interfaces snippets.
kamilwylegala Sep 4, 2024
047dc08
Tests WIP.
kamilwylegala Sep 4, 2024
9a9bf28
WIP tests
kamilwylegala Sep 4, 2024
c3b08ea
another test
kamilwylegala Sep 4, 2024
043c626
AP-5046 Test.
kamilwylegala Sep 5, 2024
67808d5
AP-5046 Comment.
kamilwylegala Sep 5, 2024
cae5c34
Configurable intervalInMs via constructor.
kamilwylegala Sep 5, 2024
9ac95fa
Fixed import.
kamilwylegala Sep 5, 2024
3dcc037
No locked until for now.
kamilwylegala Sep 5, 2024
4fe8053
Direct uuidv7 lib usage.
kamilwylegala Sep 5, 2024
eb6aa5e
Updated keywords.
kamilwylegala Sep 5, 2024
3333be5
AP-5046 PromisePool
kamilwylegala Sep 6, 2024
31522bb
Divided into files.
kamilwylegala Sep 6, 2024
be3f37b
Removed update method from storage.
kamilwylegala Sep 6, 2024
d460478
JS doc for flush method.
kamilwylegala Sep 6, 2024
1d0ef62
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
fffca57
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
d45a374
AP-5046 Readme + extra comments.
kamilwylegala Sep 9, 2024
037c8f1
Updated comment.
kamilwylegala Sep 9, 2024
29bc350
AP-5046 config + dependencies object.
kamilwylegala Sep 9, 2024
94bbe85
AP-5046 OutboxProcessorConfig.
kamilwylegala Sep 9, 2024
db1e1e3
Prefilter entries if any exist in accumulator.
kamilwylegala Sep 9, 2024
34dff00
AP-5046 createEntry.
kamilwylegala Sep 9, 2024
e7a80de
Extra exports.
kamilwylegala Sep 9, 2024
b79bc83
Extended readme.
kamilwylegala Sep 11, 2024
e35c334
Updated comment.
kamilwylegala Sep 11, 2024
23d380c
Push to array.
kamilwylegala Sep 11, 2024
6792331
Dedicated directory for tests.
kamilwylegala Sep 11, 2024
9849e68
AP-5046 Skip filtering if empty entries.
kamilwylegala Sep 11, 2024
239914e
AP-5046 Updated package json + ignore job definition cc.
kamilwylegala Sep 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions packages/outbox-core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# outbox-core

Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages.

## Installation

```bash
npm i -S @message-queue-toolkit/outbox-core
```

## Usage

To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class:

```typescript
import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core';

const job = new OutboxPeriodicJob(
//Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit
outboxStorage,
//Default available accumulator for gathering outbox entries as the process job is progressing.
new InMemoryOutboxAccumulator(),
//DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core
eventEmitter,
//See PeriodicJobDependencies from @lokalise/background-jobs-common
dependencies,
//Retry count, how many times outbox entries should be retried to be processed
3,
//emitBatchSize - how many outbox entries should be emitted at once
10,
//internalInMs - how often the job should be executed, e.g. below it runs every 1sec
1000
)
```

Job will take care of processing outbox entries emitted by:
kamilwylegala marked this conversation as resolved.
Show resolved Hide resolved
```typescript
const emitter = new OutboxEventEmitter(
//Same instance of outbox storage that is used by OutboxPeriodicJob
outboxStorage
)
```
4 changes: 4 additions & 0 deletions packages/outbox-core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './lib/outbox'
export * from './lib/objects'
export * from './lib/accumulators'
export * from './lib/storage'
73 changes: 73 additions & 0 deletions packages/outbox-core/lib/accumulators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import type { OutboxEntry } from './objects.ts'

/**
* Accumulator is responsible for storing outbox entries in two cases:
* - successfully dispatched event
* - failed events
*
* Thanks to this, we can use aggregated result and persist in the storage in batches.
*/
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
/**
* Accumulates successfully dispatched event.
* @param outboxEntry
*/
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* Accumulates failed event.
* @param outboxEntry
*/
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>

/**
* It's meant to be used by OutboxStorage::flush() to get all entries that should be persisted as successful ones.
kamilwylegala marked this conversation as resolved.
Show resolved Hide resolved
*/
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* Also used by OutboxStorage::flush() to get all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented.
*/
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>

/**
* After running clear(), no entries should be returned by getEntries() and getFailedEntries().
*
* clear() is always called after flush() in OutboxStorage.
*/
clear(): Promise<void>
}

export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
implements OutboxAccumulator<SupportedEvents>
{
private entries: OutboxEntry<SupportedEvents[number]>[] = []
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []

public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.entries = [...this.entries, outboxEntry]
kamilwylegala marked this conversation as resolved.
Show resolved Hide resolved

return Promise.resolve()
}

public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
this.failedEntries = [...this.failedEntries, outboxEntry]

return Promise.resolve()
}

getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.entries)
}

getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return Promise.resolve(this.failedEntries)
}

public clear(): Promise<void> {
this.entries = []
this.failedEntries = []
return Promise.resolve()
}
}
25 changes: 25 additions & 0 deletions packages/outbox-core/lib/objects.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
ConsumerMessageMetadataType,
} from '@message-queue-toolkit/schemas'

/**
* Status of the outbox entry.
* - CREATED - entry was created and is waiting to be processed to publish actual event
* - ACKED - entry was picked up by outbox job and is being processed
* - SUCCESS - entry was successfully processed, event was published
* - FAILED - entry processing failed, it will be retried
*/
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'

export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
id: string
event: SupportedEvent
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
status: OutboxEntryStatus
created: Date
updated?: Date
retryCount: number
}
Loading
Loading