Skip to content

Commit

Permalink
feat: Ingestion swap update init(MAPCO-5073) (#25)
Browse files Browse the repository at this point in the history
* feat: update job init task hanling

* fix: remove liveServer form settings.json

* fix: custom-environment-variables.json typo mistake task->tasks

* fix: adjust to new partsData property from mc-model-types

* fix: adjust to new partsData property from mc-model-types

* fix: adjust to new partsData property from mc-model-types

* fix: pr fixes

* fix: pr fixes

* fix: pr fixes

* feat: handle ingestion_update finalize task

* fix: pr fixes

* fix: remove space

* fix: markFinalizeStepAsCompletedchanged parameters order

* Revert "fix: markFinalizeStepAsCompletedchanged parameters order"

This reverts commit 85a70e3.

* feat: handle ingestion_swap_update init task

* fix: chnage additionalParmas jobTrackerServiceUrl to jobTrackerServiceURL
  • Loading branch information
almog8k authored Oct 31, 2024
1 parent c7efd4e commit 71b0558
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/httpClients/polygonPartMangerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class PolygonPartMangerClient extends HttpClient {
maxResolutionMeter: partsData[0].resolutionMeter,
minResolutionMeter: partsData[0].resolutionMeter,
footprint: partsData[0].footprint,
productBoundingBox: '',
productBoundingBox: '-180,90,180,90',
};
}
}
10 changes: 10 additions & 0 deletions src/job/models/jobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/member-ordering */
import z from 'zod';
import { IJobResponse, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Logger } from '@map-colonies/js-logger';
import { layerNameSchema } from '../../utils/zod/schemas/jobParametersSchema';
Expand Down Expand Up @@ -30,4 +31,13 @@ export class JobHandler {
this.logger.debug({ msg: 'checking if all steps are completed', steps });
return Object.values(steps).every((step) => step);
}

protected validateAdditionalParams<T extends z.ZodSchema>(additionalParams: unknown, schema: T): z.infer<T> {
const result = schema.safeParse(additionalParams);
if (!result.success) {
throw result.error;
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return result.data as z.infer<T>;
}
}
10 changes: 6 additions & 4 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse, ITaskResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { TilesMimeFormat, lookup as mimeLookup } from '@map-colonies/types';
import { IngestionNewFinalizeTaskParams, NewRasterLayer, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { IngestionNewFinalizeTaskParams, IngestionNewJobParams, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { newAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { Grid, IJobHandler, MergeTilesTaskParams, ExtendedRasterLayerMetadata, ExtendedNewRasterLayer } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
Expand All @@ -27,12 +28,13 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
public async handleJobInit(job: IJobResponse<IngestionNewJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, jobType: job.type, taskId });
try {
logger.info({ msg: `handling ${job.type} job with "init" task` });

const { inputFiles, metadata, partsData } = job.parameters;
const { inputFiles, metadata, partsData, additionalParams } = job.parameters;
const validAdditionalParams = this.validateAdditionalParams(additionalParams, newAdditionalParamsSchema);
const extendedLayerMetadata = this.mapToExtendedNewLayerMetadata(metadata);

const taskBuildParams: MergeTilesTaskParams = {
Expand All @@ -55,7 +57,7 @@ export class NewJobHandler extends JobHandler implements IJobHandler {
logger.info({ msg: 'Updating job with new metadata', ...metadata, extendedLayerMetadata });
await this.queueClient.jobManagerClient.updateJob(job.id, {
internalId: extendedLayerMetadata.catalogId,
parameters: { metadata: extendedLayerMetadata, partsData, inputFiles },
parameters: { metadata: extendedLayerMetadata, partsData, inputFiles, additionalParams: validAdditionalParams },
});

logger.info({ msg: 'Acking task' });
Expand Down
85 changes: 75 additions & 10 deletions src/job/models/swapJobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,88 @@
import { randomUUID } from 'crypto';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler } from '../../common/interfaces';
import { IJobResponse, ITaskResponse, OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { ZodError } from 'zod';
import { IngestionSwapUpdateFinalizeTaskParams, IngestionUpdateJobParams, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { Grid, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
import { swapUpdateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { SERVICES } from '../../common/constants';
import { JobHandler } from './jobHandler';

@injectable()
export class SwapJobHandler implements IJobHandler {
public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger) {}
export class SwapJobHandler extends JobHandler implements IJobHandler {
public constructor(
@inject(SERVICES.LOGGER) logger: Logger,
@inject(SERVICES.QUEUE_CLIENT) protected queueClient: QueueClient,
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager
) {
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId });
logger.info({ msg: `handling ${job.type} job with "init" task` });
await Promise.reject('not implemented');
public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, jobType: job.type });

try {
logger.info({ msg: `handling ${job.type} job with "init" task` });
const { inputFiles, partsData, additionalParams } = job.parameters;

const validAdditionalParams = this.validateAdditionalParams(additionalParams, swapUpdateAdditionalParamsSchema);
const displayPath = randomUUID();

const taskBuildParams: MergeTilesTaskParams = {
inputFiles,
partsData,
taskMetadata: {
tileOutputFormat: validAdditionalParams.tileOutputFormat,
isNewTarget: true,
layerRelativePath: `${job.internalId}/${displayPath}`,
grid: Grid.TWO_ON_ONE,
},
};

logger.info({ msg: 'building tasks' });
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
await this.taskBuilder.pushTasks(job.id, mergeTasks);

logger.info({ msg: 'Acking task' });
await this.queueClient.ack(job.id, taskId);

await this.updateJobAdditionalParams(job, validAdditionalParams, displayPath);
} catch (err) {
if (err instanceof ZodError) {
const errorMsg = `Failed to validate additionalParams: ${err.message}`;
logger.error({ msg: errorMsg, err });
await this.queueClient.reject(job.id, taskId, false, err.message);
return await this.queueClient.jobManagerClient.updateJob(job.id, { status: OperationStatus.FAILED, reason: errorMsg });
}
if (err instanceof Error) {
logger.error({ msg: 'Failed to handle job init', error: err });
await this.queueClient.reject(job.id, taskId, true, err.message);
}
}
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, task: ITaskResponse<unknown>): Promise<void> {
public async handleJobFinalize(
job: IJobResponse<UpdateRasterLayer, unknown>,
task: ITaskResponse<IngestionSwapUpdateFinalizeTaskParams>
): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
}

private async updateJobAdditionalParams(
job: IJobResponse<IngestionUpdateJobParams, unknown>,
additionalParams: Record<string, unknown>,
displayPath: string
): Promise<void> {
const newAdditionalParams = { ...additionalParams, displayPath };
this.logger.info({ msg: 'Updating job additional params with new displayPath', jobId: job.id, newAdditionalParams });
return this.queueClient.jobManagerClient.updateJob(job.id, {
parameters: { ...job.parameters, additionalParams: newAdditionalParams },
});
}
}
9 changes: 2 additions & 7 deletions src/job/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { IngestionUpdateFinalizeTaskParams, IngestionUpdateJobParams } from '@ma
import { CatalogClient } from '../../httpClients/catalogClient';
import { Grid, IConfig, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { UpdateAdditionalParams, updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
import { JobHandler } from './jobHandler';

Expand All @@ -28,7 +28,7 @@ export class UpdateJobHandler extends JobHandler implements IJobHandler {
logger.info({ msg: `handling ${job.type} job with "init" task` });
const { inputFiles, partsData, additionalParams } = job.parameters;

const validAdditionalParams = this.validateAdditionalParams(additionalParams);
const validAdditionalParams = this.validateAdditionalParams(additionalParams, updateAdditionalParamsSchema);

const taskBuildParams: MergeTilesTaskParams = {
inputFiles,
Expand Down Expand Up @@ -92,9 +92,4 @@ export class UpdateJobHandler extends JobHandler implements IJobHandler {
}
}
}

private validateAdditionalParams(additionalParams: Record<string, unknown>): UpdateAdditionalParams {
const validatedParams = updateAdditionalParamsSchema.parse(additionalParams);
return validatedParams;
}
}
14 changes: 11 additions & 3 deletions src/utils/zod/schemas/jobParametersSchema.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import { TileOutputFormat } from '@map-colonies/mc-model-types';
import { z } from 'zod';

export const updateAdditionalParamsSchema = z.object({
jobTrackerServiceUrl: z.string().url(),
displayPath: z.string().uuid(),
export const newAdditionalParamsSchema = z.object({
jobTrackerServiceURL: z.string().url(),
});

export const swapUpdateAdditionalParamsSchema = newAdditionalParamsSchema.extend({
tileOutputFormat: z.nativeEnum(TileOutputFormat),
});

export type SwapUpdateAdditionalParams = z.infer<typeof swapUpdateAdditionalParamsSchema>;

export const updateAdditionalParamsSchema = swapUpdateAdditionalParamsSchema.extend({
displayPath: z.string().uuid(),
});

export type UpdateAdditionalParams = z.infer<typeof updateAdditionalParamsSchema>;

export const layerNameSchema = z.object({
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/job/swapJobHandler/swapJobHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* eslint-disable @typescript-eslint/unbound-method */
import crypto from 'crypto';
import { ZodError } from 'zod';
import { OperationStatus } from '@map-colonies/mc-priority-queue';
import { swapUpdateAdditionalParamsSchema } from '../../../../src/utils/zod/schemas/jobParametersSchema';
import { registerDefaultConfig } from '../../mocks/configMock';
import { Grid, IMergeTaskParameters } from '../../../../src/common/interfaces';
import { ingestionSwapUpdateJob } from '../../mocks/jobsMockData';
import { setupSwapJobHandlerTest } from './swapJobHandlerSetup';

describe('swapJobHandler', () => {
beforeEach(() => {
jest.resetAllMocks();
registerDefaultConfig();
});

describe('handleJobInit', () => {
it('should handle job init successfully', async () => {
const { swapJobHandler, queueClientMock, taskBuilderMock, jobManagerClientMock } = setupSwapJobHandlerTest();
const job = { ...ingestionSwapUpdateJob };
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';

const additionalParams = swapUpdateAdditionalParamsSchema.parse(job.parameters.additionalParams);

const newDisplayPath = crypto.randomUUID();

jest.spyOn(crypto, 'randomUUID').mockReturnValue(newDisplayPath);

const taskBuildParams = {
inputFiles: job.parameters.inputFiles,
taskMetadata: {
layerRelativePath: `${job.internalId}/${newDisplayPath}`,
tileOutputFormat: additionalParams.tileOutputFormat,
isNewTarget: true,
grid: Grid.TWO_ON_ONE,
},
partsData: job.parameters.partsData,
};

const mergeTasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

taskBuilderMock.buildTasks.mockReturnValue(mergeTasks);
taskBuilderMock.pushTasks.mockResolvedValue(undefined);
queueClientMock.ack.mockResolvedValue(undefined);

await swapJobHandler.handleJobInit(job, taskId);

expect(taskBuilderMock.buildTasks).toHaveBeenCalledWith(taskBuildParams);
expect(taskBuilderMock.pushTasks).toHaveBeenCalledWith(job.id, mergeTasks);
expect(queueClientMock.ack).toHaveBeenCalledWith(job.id, taskId);
expect(jobManagerClientMock.updateJob).toHaveBeenCalledWith(job.id, {
parameters: { ...job.parameters, additionalParams: { ...additionalParams, displayPath: newDisplayPath } },
});
});

it('should handle job init failure and reject the task', async () => {
const { swapJobHandler, taskBuilderMock, queueClientMock } = setupSwapJobHandlerTest();

const job = { ...ingestionSwapUpdateJob };

const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';
const tasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

const error = new Error('Test error');

taskBuilderMock.buildTasks.mockReturnValue(tasks);
taskBuilderMock.pushTasks.mockRejectedValue(error);
queueClientMock.reject.mockResolvedValue(undefined);

await swapJobHandler.handleJobInit(job, taskId);

expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, true, error.message);
});

it('should handle job init failure with ZodError and Failed the job', async () => {
const { swapJobHandler, jobManagerClientMock, queueClientMock } = setupSwapJobHandlerTest();
const job = { ...ingestionSwapUpdateJob };
job.parameters.additionalParams = { wrongField: 'wrongValue' };
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';
const validAdditionalParamsSpy = jest.spyOn(swapUpdateAdditionalParamsSchema, 'parse');

await swapJobHandler.handleJobInit(job, taskId);

expect(validAdditionalParamsSpy).toThrow(ZodError);
expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, false, expect.any(String));
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
expect(jobManagerClientMock.updateJob).toHaveBeenCalledWith(job.id, { status: OperationStatus.FAILED, reason: expect.any(String) });
});
});
});
47 changes: 47 additions & 0 deletions tests/unit/job/swapJobHandler/swapJobHandlerSetup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import jsLogger from '@map-colonies/js-logger';
import { JobManagerClient, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { TileMergeTaskManager } from '../../../../src/task/models/tileMergeTaskManager';
import { MapproxyApiClient } from '../../../../src/httpClients/mapproxyClient';
import { CatalogClient } from '../../../../src/httpClients/catalogClient';
import { SwapJobHandler } from '../../../../src/job/models/swapJobHandler';

export interface SwapJobHandlerTestContext {
swapJobHandler: SwapJobHandler;
taskBuilderMock: jest.Mocked<TileMergeTaskManager>;
queueClientMock: jest.Mocked<QueueClient>;
jobManagerClientMock: jest.Mocked<JobManagerClient>;
mapproxyClientMock: jest.Mocked<MapproxyApiClient>;
catalogClientMock: jest.Mocked<CatalogClient>;
}

export const setupSwapJobHandlerTest = (): SwapJobHandlerTestContext => {
const taskBuilderMock = {
buildTasks: jest.fn(),
pushTasks: jest.fn(),
} as unknown as jest.Mocked<TileMergeTaskManager>;

const jobManagerClientMock = {
updateJob: jest.fn(),
updateTask: jest.fn(),
} as unknown as jest.Mocked<JobManagerClient>;

const queueClientMock = {
jobManagerClient: jobManagerClientMock,
ack: jest.fn(),
reject: jest.fn(),
} as unknown as jest.Mocked<QueueClient>;

const mapproxyClientMock = { publish: jest.fn() } as unknown as jest.Mocked<MapproxyApiClient>;
const catalogClientMock = { publish: jest.fn() } as unknown as jest.Mocked<CatalogClient>;

const swapJobHandler = new SwapJobHandler(jsLogger({ enabled: false }), queueClientMock, taskBuilderMock);

return {
swapJobHandler,
taskBuilderMock,
queueClientMock,
jobManagerClientMock,
mapproxyClientMock,
catalogClientMock,
};
};
Loading

0 comments on commit 71b0558

Please sign in to comment.