Skip to content

Commit

Permalink
AP-5046 Skip filtering if empty entries.
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilwylegala committed Sep 11, 2024
1 parent 6792331 commit 9849e68
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions packages/outbox-core/lib/outbox.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common'
import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common'
import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common'
import type {
CommonEventDefinition,
CommonEventDefinitionPublisherSchemaType,
Expand All @@ -9,6 +9,7 @@ import type {
import { PromisePool } from '@supercharge/promise-pool'
import { uuidv7 } from 'uuidv7'
import type { OutboxAccumulator } from './accumulators'
import type { OutboxEntry } from './objects'
import type { OutboxStorage } from './storage'

export type OutboxDependencies<SupportedEvents extends CommonEventDefinition[]> = {
Expand Down Expand Up @@ -42,10 +43,8 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {

const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount)

const currentEntriesInAccumulator = new Set(
(await outboxAccumulator.getEntries()).map((entry) => entry.id),
)
const filteredEntries = entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id))
const filteredEntries =
entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator)

await PromisePool.for(filteredEntries)
.withConcurrency(this.outboxProcessorConfiguration.emitBatchSize)
Expand All @@ -63,6 +62,16 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
await outboxStorage.flush(outboxAccumulator)
await outboxAccumulator.clear()
}

private async getFilteredEntries(
entries: OutboxEntry<SupportedEvents[number]>[],
outboxAccumulator: OutboxAccumulator<SupportedEvents>,
) {
const currentEntriesInAccumulator = new Set(
(await outboxAccumulator.getEntries()).map((entry) => entry.id),
)
return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id))
}
}

/**
Expand Down

0 comments on commit 9849e68

Please sign in to comment.