diff --git a/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts b/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts new file mode 100644 index 0000000000..e8bff2ddca --- /dev/null +++ b/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddExternalUniqueIdToApiEvents1626458359147 + implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE api_events ADD external_id character varying`, + ); + + await queryRunner.query( + `ALTER TABLE api_events ADD CONSTRAINT api_events_external_id_unique UNIQUE ("external_id")`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE api_events DROP CONSTRAINT api_events_external_id_unique`, + ); + await queryRunner.query(`ALTER TABLE api_events DROP COLUMN external_id`); + } +} diff --git a/api/apps/api/src/modules/api-events/api-event.api.entity.ts b/api/apps/api/src/modules/api-events/api-event.api.entity.ts index 00d715cd79..705a62e5f4 100644 --- a/api/apps/api/src/modules/api-events/api-event.api.entity.ts +++ b/api/apps/api/src/modules/api-events/api-event.api.entity.ts @@ -33,6 +33,18 @@ export class ApiEvent { @PrimaryGeneratedColumn('uuid') id!: string; + /** + * Unique identifier of an event, when the event origins from other sources than the application itself and has its own id. + * It is unique to prevent duplicates in case of multiple instances listening on the same source. + */ + @Column({ + name: 'external_id', + nullable: true, + type: 'varchar', + unique: true, + }) + externalId?: string | null; + /** * Timestamp of the event. */ diff --git a/api/apps/api/src/modules/api-events/api-events.service.ts b/api/apps/api/src/modules/api-events/api-events.service.ts index db90af3d59..32df7b4235 100644 --- a/api/apps/api/src/modules/api-events/api-events.service.ts +++ b/api/apps/api/src/modules/api-events/api-events.service.ts @@ -2,6 +2,7 @@ import { Injectable, NotFoundException } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { DeleteResult, Repository } from 'typeorm'; +import { Either, left, right } from 'fp-ts/lib/Either'; import { ApiEvent, @@ -23,6 +24,8 @@ import { UpdateApiEventDTO } from './dto/update.api-event.dto'; import { AppInfoDTO } from '../../dto/info.dto'; import { AppConfig } from '../../utils/config.utils'; +export const duplicate = Symbol(); + @Injectable() /** * API Events @@ -69,6 +72,27 @@ export class ApiEventsService extends AppBaseService< return result; } + /** + * recognizes duplicates on {@link CreateApiEventDTO.externalId} + */ + async createIfNotExists( + data: CreateApiEventDTO, + ): Promise> { + try { + return right(await this.create(data)); + } catch (error) { + const postgresDuplicateKeyErrorCode = '23505'; + const externalIdConstraint = 'api_events_external_id_unique'; + if ( + error.code === postgresDuplicateKeyErrorCode && + error.constraint === externalIdConstraint + ) { + return left(duplicate); + } + throw error; + } + } + /** * Purge all events. Optionally this can be limited to events of a given * `QualifiedEventTopic` (i.e. a topic qualified by `kind` and `apiVersion`). diff --git a/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts b/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts index 2f2ddd464a..2113969fac 100644 --- a/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts +++ b/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts @@ -1,5 +1,5 @@ import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; -import { IsEnum, IsJSON, IsOptional, IsUUID } from 'class-validator'; +import { IsEnum, IsJSON, IsOptional, IsString, IsUUID } from 'class-validator'; import * as ApiEventsUserData from '@marxan-api/modules/api-events/dto/apiEvents.user.data.dto'; import { API_EVENT_KINDS } from '@marxan/api-events'; @@ -19,6 +19,11 @@ export class CreateApiEventDTO { @IsUUID(4) topic!: string; + @ApiPropertyOptional() + @IsString() + @IsOptional() + externalId?: string; + /** * Data payload of the event. Its semantics depend on kind. */ diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts index f4ef06b9c7..96f378f30c 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts @@ -24,7 +24,7 @@ beforeEach(async () => { }); test(`scheduling job`, async () => { - fixtures.setupMocksForSchedulingJobs(); + fixtures.setupMocksForSchedulingJobs(() => `1234`); // given fixtures.GivenAssetsAvailable(); @@ -33,12 +33,12 @@ test(`scheduling job`, async () => { // then fixtures.ThenShouldUpdateScenario(); - fixtures.ThenShouldEmitSubmittedEvent(); + fixtures.ThenShouldEmitSubmittedEvent(`1234`); fixtures.ThenShouldAddJob(); }); test(`scheduling job for scenario without assets`, async () => { - fixtures.setupMocksForSchedulingJobs(); + fixtures.setupMocksForSchedulingJobs(() => `12345`); // given fixtures.GivenAssetsNotAvailable(); @@ -94,14 +94,18 @@ describe(`with a single job in the queue`, () => { ${`failed`} | ${API_EVENT_KINDS.scenario__run__failed__v1__alpha1} ${`completed`} | ${API_EVENT_KINDS.scenario__run__finished__v1__alpha1} `(`when $GotEvent, saves $SavedKind`, async ({ GotEvent, SavedKind }) => { - fixtures.fakeEvents.emit(GotEvent, { - jobId: `123`, - data: { - scenarioId: `scenario-x`, + fixtures.fakeEvents.emit( + GotEvent, + { + jobId: `123`, + data: { + scenarioId: `scenario-x`, + }, }, - }); + `eventId1`, + ); - await fixtures.ThenEventCreated(SavedKind); + await fixtures.ThenEventCreated(SavedKind, `eventId1`); }); }); @@ -114,6 +118,7 @@ async function getFixtures() { }; const fakeEvents = new EventEmitter(); const fakeApiEvents = { + createIfNotExists: throwingMock(), create: throwingMock(), }; const fakeScenarioRepo = { @@ -190,17 +195,21 @@ async function getFixtures() { getRunService() { return testingModule.get(RunService); }, - setupMocksForSchedulingJobs() { - this.setupMockForCreatingEvents(); - fakeQueue.add.mockImplementation(() => { + setupMocksForSchedulingJobs(createId: () => string) { + fakeApiEvents.create.mockImplementation(() => { // }); + fakeQueue.add.mockImplementation(async () => { + return { + id: createId(), + }; + }); fakeScenarioRepo.update.mockImplementation(() => { // }); }, setupMockForCreatingEvents() { - fakeApiEvents.create.mockImplementation(() => { + fakeApiEvents.createIfNotExists.mockImplementation(() => { // }); }, @@ -238,11 +247,12 @@ async function getFixtures() { ranAtLeastOnce: true, }); }, - ThenShouldEmitSubmittedEvent() { + ThenShouldEmitSubmittedEvent(id: string) { expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); expect(fixtures.fakeApiEvents.create).toBeCalledWith({ topic: `scenario-1`, kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + externalId: `${id}${API_EVENT_KINDS.scenario__run__submitted__v1__alpha1}`, }); }, ThenShouldAddJob() { @@ -252,12 +262,13 @@ async function getFixtures() { assets: this.scenarioAssets, }); }, - async ThenEventCreated(kind: API_EVENT_KINDS) { + async ThenEventCreated(kind: API_EVENT_KINDS, eventId: string) { await waitForExpect(() => { - expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); - expect(fixtures.fakeApiEvents.create).toBeCalledWith({ + expect(fixtures.fakeApiEvents.createIfNotExists).toBeCalledTimes(1); + expect(fixtures.fakeApiEvents.createIfNotExists).toBeCalledWith({ kind, topic: `scenario-1`, + externalId: eventId, }); }); }, diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts index f4a6b1cc2f..4a4e48aec2 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts @@ -43,23 +43,29 @@ export class RunService { private readonly scenarios: Repository, private readonly assets: AssetsService, ) { - queueEvents.on(`completed`, ({ jobId }) => this.handleFinished(jobId)); - queueEvents.on(`failed`, ({ jobId }) => this.handleFailed(jobId)); + queueEvents.on(`completed`, ({ jobId }, eventId) => + this.handleFinished(jobId, eventId), + ); + queueEvents.on(`failed`, ({ jobId }, eventId) => + this.handleFailed(jobId, eventId), + ); } async run(scenarioId: string): Promise { const assets = await this.assets.forScenario(scenarioId); assertDefined(assets); - await this.queue.add(`run-scenario`, { + const job = await this.queue.add(`run-scenario`, { scenarioId, assets, }); await this.scenarios.update(scenarioId, { ranAtLeastOnce: true, }); + const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1; await this.apiEvents.create({ topic: scenarioId, - kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + kind, + externalId: job.id + kind, }); } @@ -83,19 +89,23 @@ export class RunService { return right(void 0); } - private async handleFinished(jobId: string) { + private async handleFinished(jobId: string, eventId: string) { const job = await this.getJob(jobId); - await this.apiEvents.create({ + const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1; + await this.apiEvents.createIfNotExists({ topic: job.data.scenarioId, - kind: API_EVENT_KINDS.scenario__run__finished__v1__alpha1, + kind, + externalId: eventId, }); } - private async handleFailed(jobId: string) { + private async handleFailed(jobId: string, eventId: string) { const job = await this.getJob(jobId); - await this.apiEvents.create({ + const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1; + await this.apiEvents.createIfNotExists({ topic: job.data.scenarioId, - kind: API_EVENT_KINDS.scenario__run__failed__v1__alpha1, + kind, + externalId: eventId, }); }