Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update job init task hanling #20

Merged
merged 11 commits into from
Oct 28, 2024
2 changes: 1 addition & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"type": "INGESTION_SWAP_UPDATE_JOB_TYPE"
}
},
"task": {
"tasks": {
"tilesMerging": {
"type": "TILES_MERGING_TASK_TYPE",
"tileBatchSize": "TILES_MERGING_TILE_BATCH_SIZE",
Expand Down
11 changes: 6 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"@map-colonies/error-express-handler": "^2.1.0",
"@map-colonies/express-access-log-middleware": "^2.0.1",
"@map-colonies/js-logger": "^1.1.0",
"@map-colonies/mc-model-types": "^17.6.0",
"@map-colonies/mc-model-types": "^17.8.0",
"@map-colonies/mc-priority-queue": "^8.1.1",
"@map-colonies/mc-utils": "^3.1.0",
"@map-colonies/read-pkg": "0.0.1",
Expand All @@ -61,7 +61,8 @@
"lodash.has": "^4.5.2",
"prom-client": "^15.1.2",
"reflect-metadata": "^0.1.13",
"tsyringe": "^4.8.0"
"tsyringe": "^4.8.0",
"zod": "^3.23.8"
},
"devDependencies": {
"@commitlint/cli": "^17.6.6",
Expand Down
4 changes: 2 additions & 2 deletions src/httpClients/catalogClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ export class CatalogClient extends HttpClient {

private mapToCatalogRecordMetadata(job: IJobResponse<ExtendedNewRasterLayer, unknown>): LayerMetadata {
const { parameters, version } = job;
const { partData, metadata } = parameters;
const { partsData, metadata } = parameters;

const aggregatedPartData = this.polygonPartMangerClient.getAggregatedPartData(partData);
const aggregatedPartData = this.polygonPartMangerClient.getAggregatedPartData(partsData);

return {
id: metadata.catalogId,
Expand Down
6 changes: 3 additions & 3 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class NewJobHandler implements IJobHandler {
try {
logger.info({ msg: `handling ${job.type} job with "init" task` });

const { inputFiles, metadata, partData } = job.parameters;
const { inputFiles, metadata, partsData } = job.parameters;
const extendedLayerMetadata = this.mapToExtendedNewLayerMetadata(metadata);

const taskBuildParams: MergeTilesTaskParams = {
Expand All @@ -40,7 +40,7 @@ export class NewJobHandler implements IJobHandler {
isNewTarget: true,
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
grid: extendedLayerMetadata.grid,
},
partsData: partData,
partsData,
};

logger.info({ msg: 'building tasks' });
Expand All @@ -52,7 +52,7 @@ export class NewJobHandler implements IJobHandler {
logger.info({ msg: 'Updating job with new metadata', ...metadata, extendedLayerMetadata });
await this.queueClient.jobManagerClient.updateJob(job.id, {
internalId: extendedLayerMetadata.catalogId,
parameters: { metadata: extendedLayerMetadata, partData, inputFiles },
parameters: { metadata: extendedLayerMetadata, partsData, inputFiles },
});

logger.info({ msg: 'Acking task' });
Expand Down
67 changes: 59 additions & 8 deletions src/job/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,74 @@
import { ZodError } from 'zod';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler } from '../../common/interfaces';
import { OperationStatus, TaskHandler as QueueClient, IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { IngestionUpdateJobParams, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { Grid, IConfig, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { UpdateAdditionalParams, updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';

@injectable()
export class UpdateJobHandler implements IJobHandler {
public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger) {}
private readonly isNewTarget: boolean;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.CONFIG) private readonly config: IConfig,
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
) {
this.isNewTarget = config.get<boolean>('jobManagement.ingestion.tasks.tilesMerging.useNewTargetFlagInUpdate');
}

public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId });
logger.info({ msg: `handling ${job.type} job with "init" task` });
await Promise.reject('not implemented');
public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, jobType: job.type });
try {
logger.info({ msg: `handling ${job.type} job with "init" task` });
const { inputFiles, partsData, additionalParams } = job.parameters;

const validAdditionalParams = this.validateAdditionalParams(additionalParams);

const taskBuildParams: MergeTilesTaskParams = {
inputFiles,
taskMetadata: {
layerRelativePath: `${job.internalId}/${validAdditionalParams.displayPath}`,
tileOutputFormat: validAdditionalParams.tileOutputFormat,
isNewTarget: this.isNewTarget,
grid: Grid.TWO_ON_ONE,
},
partsData,
};

logger.info({ msg: 'building tasks' });
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
await this.taskBuilder.pushTasks(job.id, mergeTasks);

logger.info({ msg: 'Acking task' });
await this.queueClient.ack(job.id, taskId);
} catch (err) {
if (err instanceof ZodError) {
const errorMsg = `Failed to validate additionalParams: ${err.message}`;
logger.error({ msg: errorMsg, err });
await this.queueClient.reject(job.id, taskId, false, err.message);
return await this.queueClient.jobManagerClient.updateJob(job.id, { status: OperationStatus.FAILED, reason: errorMsg });
}
if (err instanceof Error) {
razbroc marked this conversation as resolved.
Show resolved Hide resolved
logger.error({ msg: 'Failed to handle job init', error: err });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.queueClient.reject(job.id, taskId, true, err.message);
}
}
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, task: ITaskResponse<unknown>): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
}

private validateAdditionalParams(additionalParams: Record<string, unknown>): UpdateAdditionalParams {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const validatedParams = updateAdditionalParamsSchema.parse(additionalParams);
return validatedParams;
}
}
10 changes: 10 additions & 0 deletions src/utils/zod/schemas/jobParametersSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { TileOutputFormat } from '@map-colonies/mc-model-types';
import { z } from 'zod';

export const updateAdditionalParamsSchema = z.object({
jobTrackerServiceUrl: z.string().url(),
displayPath: z.string().uuid(),
tileOutputFormat: z.nativeEnum(TileOutputFormat),
});

export type UpdateAdditionalParams = z.infer<typeof updateAdditionalParamsSchema>;
90 changes: 90 additions & 0 deletions tests/unit/job/updateJobHandler/updateJobHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { ZodError } from 'zod';
import { OperationStatus } from '@map-colonies/mc-priority-queue';
import { Grid, IMergeTaskParameters } from '../../../../src/common/interfaces';
import { finalizeTaskForIngestionUpdate } from '../../mocks/tasksMockData';
import { updateAdditionalParamsSchema } from '../../../../src/utils/zod/schemas/jobParametersSchema';
import { registerDefaultConfig } from '../../mocks/configMock';
import { ingestionUpdateJob } from '../../mocks/jobsMockData';
import { setupUpdateJobHandlerTest } from './updateJobHandlerSetup';

describe('updateJobHandler', () => {
beforeEach(() => {
jest.resetAllMocks();
registerDefaultConfig();
});

describe('handleJobInit', () => {
it('should handle job init successfully', async () => {
const { updateJobHandler, queueClientMock, taskBuilderMock } = setupUpdateJobHandlerTest();
const job = ingestionUpdateJob;
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';

const additionalParams = updateAdditionalParamsSchema.parse(job.parameters.additionalParams);

const taskBuildParams = {
inputFiles: job.parameters.inputFiles,
taskMetadata: {
layerRelativePath: `${job.internalId}/${additionalParams.displayPath}`,
tileOutputFormat: additionalParams.tileOutputFormat,
isNewTarget: true,
grid: Grid.TWO_ON_ONE,
},
partsData: job.parameters.partsData,
};

const mergeTasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

taskBuilderMock.buildTasks.mockReturnValue(mergeTasks);
taskBuilderMock.pushTasks.mockResolvedValue(undefined);
queueClientMock.ack.mockResolvedValue(undefined);

await updateJobHandler.handleJobInit(job, taskId);

expect(taskBuilderMock.buildTasks).toHaveBeenCalledWith(taskBuildParams);
expect(taskBuilderMock.pushTasks).toHaveBeenCalledWith(job.id, mergeTasks);
expect(queueClientMock.ack).toHaveBeenCalledWith(job.id, taskId);
});

it('should handle job init failure and reject the task', async () => {
const { updateJobHandler, taskBuilderMock, queueClientMock } = setupUpdateJobHandlerTest();

const job = { ...ingestionUpdateJob };

const taskId = '7e630dea-ea29-4b30-a88e-5407bf67d1bc';
const tasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

const error = new Error('Test error');

taskBuilderMock.buildTasks.mockReturnValue(tasks);
taskBuilderMock.pushTasks.mockRejectedValue(error);
queueClientMock.reject.mockResolvedValue(undefined);

await updateJobHandler.handleJobInit(job, taskId);

expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, true, error.message);
});

it('should handle job init failure with ZodError and Failed the job', async () => {
const { updateJobHandler, jobManagerClientMock, queueClientMock } = setupUpdateJobHandlerTest();
const job = ingestionUpdateJob;
job.parameters.additionalParams = { wrongField: 'wrongValue' };
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';
const validAdditionalParamsSpy = jest.spyOn(updateAdditionalParamsSchema, 'parse');

await updateJobHandler.handleJobInit(job, taskId);

expect(validAdditionalParamsSpy).toThrow(ZodError);
expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, false, expect.any(String));
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
expect(jobManagerClientMock.updateJob).toHaveBeenCalledWith(job.id, { status: OperationStatus.FAILED, reason: expect.any(String) });
});
});
describe('handleJobFinalize', () => {
it('should throw not implemented Error', async () => {
const { updateJobHandler } = setupUpdateJobHandlerTest();

await expect(updateJobHandler.handleJobFinalize(ingestionUpdateJob, finalizeTaskForIngestionUpdate)).rejects.toBe('not implemented');
});
});
});
52 changes: 52 additions & 0 deletions tests/unit/job/updateJobHandler/updateJobHandlerSetup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import jsLogger from '@map-colonies/js-logger';
import { JobManagerClient, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { configMock } from '../../mocks/configMock';
import { TileMergeTaskManager } from '../../../../src/task/models/tileMergeTaskManager';
import { MapproxyApiClient } from '../../../../src/httpClients/mapproxyClient';
import { GeoserverClient } from '../../../../src/httpClients/geoserverClient';
import { CatalogClient } from '../../../../src/httpClients/catalogClient';
import { UpdateJobHandler } from '../../../../src/job/models/updateJobHandler';

export interface UpdateJobHandlerTestContext {
updateJobHandler: UpdateJobHandler;
taskBuilderMock: jest.Mocked<TileMergeTaskManager>;
queueClientMock: jest.Mocked<QueueClient>;
jobManagerClientMock: jest.Mocked<JobManagerClient>;
mapproxyClientMock: jest.Mocked<MapproxyApiClient>;
geoserverClientMock: jest.Mocked<GeoserverClient>;
catalogClientMock: jest.Mocked<CatalogClient>;
}

export const setupUpdateJobHandlerTest = (): UpdateJobHandlerTestContext => {
const taskBuilderMock = {
buildTasks: jest.fn(),
pushTasks: jest.fn(),
} as unknown as jest.Mocked<TileMergeTaskManager>;

const jobManagerClientMock = {
updateJob: jest.fn(),
updateTask: jest.fn(),
} as unknown as jest.Mocked<JobManagerClient>;

const queueClientMock = {
jobManagerClient: jobManagerClientMock,
ack: jest.fn(),
reject: jest.fn(),
} as unknown as jest.Mocked<QueueClient>;

const mapproxyClientMock = { publish: jest.fn() } as unknown as jest.Mocked<MapproxyApiClient>;
const geoserverClientMock = { publish: jest.fn() } as unknown as jest.Mocked<GeoserverClient>;
const catalogClientMock = { publish: jest.fn() } as unknown as jest.Mocked<CatalogClient>;

const updateJobHandler = new UpdateJobHandler(jsLogger({ enabled: false }), configMock, taskBuilderMock, queueClientMock);

return {
updateJobHandler,
taskBuilderMock,
queueClientMock,
jobManagerClientMock,
mapproxyClientMock,
geoserverClientMock,
catalogClientMock,
};
};
22 changes: 17 additions & 5 deletions tests/unit/mocks/jobsMockData.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { NewRasterLayer, ProductType, TileOutputFormat, Transparency, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import {
IngestionUpdateJobParams,
NewRasterLayer,
ProductType,
TileOutputFormat,
Transparency,
UpdateRasterLayer,
} from '@map-colonies/mc-model-types';
import { IJobResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { ExtendedNewRasterLayer, Grid } from '../../../src/common/interfaces';
import { partsData } from './partsMockData';
Expand Down Expand Up @@ -26,7 +33,7 @@ export const ingestionNewJob: IJobResponse<NewRasterLayer, unknown> = {
classification: '6',
productSubType: 'string',
},
partData: partsData,
partsData,
inputFiles: {
fileNames: ['blueMarble.gpkg'],
originDirectory: 'tests',
Expand Down Expand Up @@ -71,7 +78,7 @@ export const ingestionNewJobExtended: IJobResponse<ExtendedNewRasterLayer, unkno
},
};

export const ingestionUpdateJob: IJobResponse<UpdateRasterLayer, unknown> = {
export const ingestionUpdateJob: IJobResponse<IngestionUpdateJobParams, unknown> = {
id: 'd027b3aa-272b-4dc9-91d7-ba8343af5ed1',
resourceId: 'another-product-id',
version: '1.0',
Expand All @@ -81,11 +88,16 @@ export const ingestionUpdateJob: IJobResponse<UpdateRasterLayer, unknown> = {
metadata: {
classification: '6',
},
partData: partsData,
partsData,
inputFiles: {
fileNames: ['blueMarble.gpkg'],
originDirectory: 'tests',
},
additionalParams: {
jobTrackerServiceUrl: 'http://job-tracker-service',
displayPath: 'd1e9fe74-2a8f-425f-ac46-d65bb5c5756d',
tileOutputFormat: TileOutputFormat.PNG,
},
},
status: OperationStatus.PENDING,
percentage: 0,
Expand Down Expand Up @@ -120,7 +132,7 @@ export const ingestionSwapUpdateJob: IJobResponse<UpdateRasterLayer, unknown> =
metadata: {
classification: '6',
},
partData: partsData,
partsData,
inputFiles: {
fileNames: ['blueMarble.gpkg'],
originDirectory: 'tests',
Expand Down
Loading