Skip to content

Commit

Permalink
implement getLastUnpublishedConventionBroadcastRequestedEventByConven…
Browse files Browse the repository at this point in the history
…tionId in outbox repo
  • Loading branch information
clement-duport committed Nov 5, 2024
1 parent 3ac9146 commit 9e80f3d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { subHours } from "date-fns";
import { CompiledQuery } from "kysely";
import { Pool } from "pg";
import {
Expand Down Expand Up @@ -237,6 +238,49 @@ describe("PgOutboxRepository", () => {
});
});

describe("getLastConventionBroadcastRequestedEventByConventionId", () => {
const now = timeGateway.now();
it("get the last unpublished convention broadcastrequested event by conventionId", async () => {
const convention = new ConventionDtoBuilder().build();
uuidGenerator.setNextUuids([
"aaaaac99-9c0a-aaaa-aa6d-6aa9ad38aaaa",
"bbbbbc99-9c0b-bbbb-bb6d-6bb9bd38bbbb",
"cccccc99-9c0c-cccc-cc6d-6cc9cd38cccc",
]);
const eventAlreadyPublished = createNewEvent({
topic: "ConventionBroadcastRequested",
occurredAt: subHours(now, 1).toISOString(),
status: "published",
payload: { convention, triggeredBy: null },
});
const latestEventInProcess = createNewEvent({
topic: "ConventionBroadcastRequested",
occurredAt: subHours(now, 1).toISOString(),
status: "in-process",
payload: { convention, triggeredBy: null },
});
const olderEventInProcess = createNewEvent({
topic: "ConventionBroadcastRequested",
occurredAt: subHours(now, 2).toISOString(),
status: "in-process",
payload: { convention, triggeredBy: null },
});

await storeInOutbox([
eventAlreadyPublished,
latestEventInProcess,
olderEventInProcess,
]);

expectToEqual(
await outboxRepository.getLastUnpublishedConventionBroadcastRequestedEventByConventionId(
convention.id,
),
latestEventInProcess,
);
});
});

describe("markEventsAsInProcess", () => {
it("works even if there is no events to mark", async () => {
await outboxRepository.markEventsAsInProcess([]);
Expand Down
21 changes: 20 additions & 1 deletion back/src/domains/core/events/adapters/PgOutboxRepository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { sql } from "kysely";
import { differenceWith, propEq } from "ramda";
import { DateString, replaceArrayElement } from "shared";
import { ConventionId, DateString, replaceArrayElement } from "shared";
import { KyselyDb } from "../../../../config/pg/kysely/kyselyUtils";
import { counterEventsSavedBeforePublish } from "../../../../utils/counters";
import { createLogger } from "../../../../utils/logger";
Expand Down Expand Up @@ -41,6 +42,24 @@ export class PgOutboxRepository implements OutboxRepository {
return parseInt(result.total);
}

public async getLastUnpublishedConventionBroadcastRequestedEventByConventionId(
conventionId: ConventionId,
): Promise<DomainEvent | undefined> {
const results = await this.transaction
.selectFrom("outbox")
.selectAll()
.where("topic", "=", "ConventionBroadcastRequested")
.where("status", "!=", "published")
.where(sql`outbox.payload -> 'convention' ->> 'id'`, "=", conventionId)
.orderBy("occurred_at", "desc")
.limit(1)
.execute();

return !results.length
? undefined
: storedEventRowsToDomainEvent(results as StoredEventRow[]);
}

public async markEventsAsInProcess(events: DomainEvent[]): Promise<void> {
if (events.length)
await this.transaction
Expand Down

0 comments on commit 9e80f3d

Please sign in to comment.