Skip to content

Commit

Permalink
chore(geoprocessing): protected-areas: geo extractor: emit api events
Browse files Browse the repository at this point in the history
  • Loading branch information
kgajowy committed May 27, 2021
1 parent 02c0978 commit 0aeab28
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 40 deletions.
18 changes: 18 additions & 0 deletions api/src/migrations/api/1621963684141-ProjectSpecificWdpaEvents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class CostSurfaceEvents1621963684141 implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
INSERT INTO api_event_kinds (id) values
('project.protectedAreas.submitted/v1/alpha'),
('project.protectedAreas.finished/v1/alpha'),
('project.protectedAreas.failed/v1/alpha');
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`DELETE FROM api_event_kinds WHERE id like 'project.protectedAreas.%';`,
);
}
}
3 changes: 3 additions & 0 deletions api/src/modules/api-events/api-event.api.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ export enum API_EVENT_KINDS {
scenario__costSurface__shapeConversionFailed__v1_alpha1 = 'scenario.costSurface.shapeConversionFailed/v1alpha1',
scenario__costSurface__costUpdateFailed__v1_alpha1 = 'scenario.costSurface.costUpdateFailed/v1alpha1',
scenario__costSurface__finished__v1_alpha1 = 'scenario.costSurface.finished/v1alpha1',
project__protectedAreas__submitted__v1__alpha = 'project.protectedAreas.submitted/v1/alpha',
project__protectedAreas__finished__v1__alpha = 'project.protectedAreas.finished/v1/alpha',
project__protectedAreas__failed__v1__alpha = 'project.protectedAreas.failed/v1/alpha',
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { CreateApiEventDTO } from '../../../api-events/dto/create.api-event.dto';
import { AppInfoDTO } from '../../../../dto/info.dto';
import { ApiEvent } from '../../../api-events/api-event.api.entity';

export class ApiServiceFake {
mock = jest.fn();

create(createModel: CreateApiEventDTO, info?: AppInfoDTO): Promise<ApiEvent> {
return this.mock(createModel, info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import { Queue } from 'bullmq';

import { QueueService } from '../../queue/queue.service';
import { FakeLogger } from '../../../utils/__mocks__/fake-logger';
import { ApiServiceFake } from './__mocks__/api-service.fake';
import { ApiEventsService } from '../../api-events/api-events.service';

let sut: ProtectedAreasFacade;
let logger: FakeLogger;
let addJobMock: jest.SpyInstance;
let apiEvents: ApiServiceFake;

const projectId = 'project-id';
const file: Express.Multer.File = {
Expand All @@ -35,11 +38,16 @@ beforeEach(async () => {
provide: Logger,
useClass: FakeLogger,
},
{
provide: ApiEventsService,
useClass: ApiServiceFake,
},
],
}).compile();

sut = sandbox.get(ProtectedAreasFacade);
logger = sandbox.get(Logger);
apiEvents = sandbox.get(ApiEventsService);
});

describe(`when job submits successfully`, () => {
Expand Down Expand Up @@ -68,6 +76,20 @@ describe(`when job submits successfully`, () => {
]
`);
});

it(`should emit 'submitted' event`, () => {
expect(apiEvents.mock.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Object {
"kind": "project.protectedAreas.submitted/v1/alpha",
"topic": "project-id",
},
undefined,
],
]
`);
});
});

describe(`when job submission fails`, () => {
Expand All @@ -87,7 +109,32 @@ describe(`when job submission fails`, () => {
expect(logger.error.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"Failed submitting job to queue for project-id",
[Error: Oups],
"Error: Oups",
]
`);
});

it(`emits both submitted&failed events`, () => {
expect(apiEvents.mock.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Object {
"kind": "project.protectedAreas.submitted/v1/alpha",
"topic": "project-id",
},
undefined,
],
Array [
Object {
"data": Object {
"error": "Failed submission",
"message": "Error: Oups",
},
"kind": "project.protectedAreas.failed/v1/alpha",
"topic": "project-id",
},
undefined,
],
]
`);
});
Expand Down
39 changes: 32 additions & 7 deletions api/src/modules/projects/protected-areas/protected-areas.facade.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Injectable, Logger } from '@nestjs/common';
import { Express } from 'express';
import { QueueService } from '../../queue/queue.service';
import { ApiEventsService } from '../../api-events/api-events.service';
import { API_EVENT_KINDS } from '../../api-events/api-event.api.entity';

export interface ProtectedAreasJobInput {
projectId: string;
Expand All @@ -11,6 +13,7 @@ export interface ProtectedAreasJobInput {
export class ProtectedAreasFacade {
constructor(
private readonly queueService: QueueService<ProtectedAreasJobInput>,
private readonly apiEvents: ApiEventsService,
private readonly logger: Logger = new Logger(ProtectedAreasFacade.name),
) {}

Expand All @@ -21,14 +24,36 @@ export class ProtectedAreasFacade {
file,
})
.then(() => {
// ok
return this.apiEvents.create({
kind: API_EVENT_KINDS.project__protectedAreas__submitted__v1__alpha,
topic: projectId,
});
})
.catch((error) => {
this.logger.error(
`Failed submitting job to queue for ${projectId}`,
error,
);
throw error; // failed submission
.catch(async (error) => {
await this.markAsFailedSubmission(projectId, error);
throw error;
});
}

private markAsFailedSubmission = async (
projectId: string,
error: unknown,
) => {
this.logger.error(
`Failed submitting job to queue for ${projectId}`,
String(error),
);
await this.apiEvents.create({
kind: API_EVENT_KINDS.project__protectedAreas__submitted__v1__alpha,
topic: projectId,
});
await this.apiEvents.create({
kind: API_EVENT_KINDS.project__protectedAreas__failed__v1__alpha,
topic: projectId,
data: {
error: `Failed submission`,
message: String(error),
},
});
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { Logger, Module } from '@nestjs/common';
import { ProtectedAreasFacade } from './protected-areas.facade';
import { QueueModule } from '../../queue/queue.module';
import { queueName } from './queue-name';
import { ApiEventsModule } from '../../api-events/api-events.module';

@Module({
imports: [
ApiEventsModule,
QueueModule.register({
name: queueName,
}),
Expand Down
1 change: 1 addition & 0 deletions geoprocessing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"dependencies": {
"@nestjs/common": "^7.6.5",
"@nestjs/core": "^7.5.1",
"@nestjs/cqrs": "7.0.1",
"@nestjs/platform-express": "^7.5.1",
"@nestjs/swagger": "^4.8.0",
"@nestjs/typeorm": "^7.1.5",
Expand Down
63 changes: 63 additions & 0 deletions geoprocessing/src/modules/api-events/api-events-subscriber.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { ApiEventsSubscriber } from './api-events-subscriber';
import { CqrsModule, EventBus } from '@nestjs/cqrs';
import { Test } from '@nestjs/testing';
import { ApiEventsService } from './api-events.service';
import { API_EVENT_KINDS } from './events.enum';
import { ApiEvent } from './api.event';

let eventForwarder: FakeForwarder;
let eventBus: EventBus;

beforeEach(async () => {
const sandbox = await Test.createTestingModule({
imports: [CqrsModule],
providers: [
{
provide: ApiEventsService,
useClass: FakeForwarder,
},
ApiEventsSubscriber,
],
})
.compile()
.then((app) => app.init());

eventBus = sandbox.get(EventBus);
eventForwarder = sandbox.get(ApiEventsService);
});

describe(`when emitting event`, () => {
beforeEach(async () => {
await eventBus.publish(
new ApiEvent(
'resource',
API_EVENT_KINDS.project__protectedAreas__failed__v1__alpha,
{ 1: 'one' },
),
);
});

it(`should forward it`, () => {
expect(eventForwarder.mock.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"resource",
"project.protectedAreas.failed/v1/alpha",
Object {
"1": "one",
},
]
`);
});
});

class FakeForwarder {
mock = jest.fn();

async create<T>(
resourceId: string,
kind: API_EVENT_KINDS,
data?: T,
): Promise<void> {
return this.mock(resourceId, kind, data);
}
}
39 changes: 39 additions & 0 deletions geoprocessing/src/modules/api-events/api-events-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { EventBus } from '@nestjs/cqrs';
import { Subscription } from 'rxjs';
import { ApiEventsService } from './api-events.service';
import { ApiEvent } from './api.event';

@Injectable()
export class ApiEventsSubscriber implements OnModuleDestroy {
#sub: Subscription;
private readonly logger = new Logger(ApiEventsSubscriber.name);

constructor(
private readonly eventBus: EventBus,
private readonly eventForwarder: ApiEventsService, // "repository"
) {
this.#sub = eventBus.subscribe({
next: async (value) => {
if (value instanceof ApiEvent) {
try {
await this.eventForwarder.create(
value.resourceId,
value.kind,
value.data,
);
} catch (error) {
this.logger.error(
`Unable to emit ApiEvent ${value.resourceId}+${value.kind}`,
error,
);
}
}
},
});
}

onModuleDestroy() {
this.#sub.unsubscribe();
}
}
6 changes: 4 additions & 2 deletions geoprocessing/src/modules/api-events/api-events.module.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { HttpModule, Module } from '@nestjs/common';
import { CqrsModule } from '@nestjs/cqrs';
import { ApiEventsSubscriber } from './api-events-subscriber';
import { ApiEventsService } from './api-events.service';

@Module({
imports: [HttpModule],
providers: [ApiEventsService],
imports: [HttpModule, CqrsModule],
providers: [ApiEventsService, ApiEventsSubscriber],
exports: [ApiEventsService],
})
export class ApiEventsModule {}
9 changes: 5 additions & 4 deletions geoprocessing/src/modules/api-events/api-events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,24 @@ export class ApiEventsService {
this.#apiUrl = AppConfig.get<string>('api.url')!;
}

async create<T>(
async create<T extends Record<string, unknown>>(
resourceId: string,
kind: API_EVENT_KINDS,
data?: T,
): Promise<void> {
// TODO what if it failed? (currently validateStatus "swallows" the error)
await this.http
.post(
this.#apiUrl + `/v1/api-events`,
this.#apiUrl + `/api/v1/api-events`,
{
kind,
topic: resourceId,
data,
data: JSON.stringify(data),
},
{
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': this.#secret,
},
validateStatus: () => true,
},
Expand Down
10 changes: 10 additions & 0 deletions geoprocessing/src/modules/api-events/api.event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { IEvent } from '@nestjs/cqrs';
import { API_EVENT_KINDS } from './events.enum';

export class ApiEvent<T extends Record<string, unknown>> implements IEvent {
constructor(
public readonly resourceId: string,
public readonly kind: API_EVENT_KINDS,
public readonly data?: T,
) {}
}
8 changes: 8 additions & 0 deletions geoprocessing/src/modules/api-events/events.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@ export enum API_EVENT_KINDS {
user__passwordResetTokenGenerated__v1alpha1 = 'user.passwordResetTokenGenerated/v1alpha1',
user__passwordResetSucceeded__v1alpha1 = 'user.passwordResetSucceeded/v1alpha1',
user__passwordResetFailed__v1alpha1 = 'user.passwordResetFailed/v1alpha1',
scenario__costSurface__submitted__v1_alpha1 = 'scenario.costSurface.submitted/v1alpha1',
scenario__costSurface__shapeConverted__v1_alpha1 = 'scenario.costSurface.shapeConverted/v1alpha1',
scenario__costSurface__shapeConversionFailed__v1_alpha1 = 'scenario.costSurface.shapeConversionFailed/v1alpha1',
scenario__costSurface__costUpdateFailed__v1_alpha1 = 'scenario.costSurface.costUpdateFailed/v1alpha1',
scenario__costSurface__finished__v1_alpha1 = 'scenario.costSurface.finished/v1alpha1',
project__protectedAreas__submitted__v1__alpha = 'project.protectedAreas.submitted/v1/alpha',
project__protectedAreas__finished__v1__alpha = 'project.protectedAreas.finished/v1/alpha',
project__protectedAreas__failed__v1__alpha = 'project.protectedAreas.failed/v1/alpha',
}
2 changes: 2 additions & 0 deletions geoprocessing/src/modules/api-events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { ApiEventsModule } from './api-events.module';
export { ApiEvent } from './api.event';
Loading

0 comments on commit 0aeab28

Please sign in to comment.