Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update job init task hanling #20

Merged
merged 11 commits into from
Oct 28, 2024
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
"lodash.has": "^4.5.2",
"prom-client": "^15.1.2",
"reflect-metadata": "^0.1.13",
"tsyringe": "^4.8.0"
"tsyringe": "^4.8.0",
"zod": "^3.23.8"
},
"devDependencies": {
"@commitlint/cli": "^17.6.6",
Expand Down
65 changes: 58 additions & 7 deletions src/job/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,74 @@
import { ZodError } from 'zod';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler } from '../../common/interfaces';
import { OperationStatus, TaskHandler as QueueClient, IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { IngestionUpdateJobParams, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { Grid, IConfig, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { UpdateAdditionalParams, updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';

@injectable()
export class UpdateJobHandler implements IJobHandler {
public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger) {}
private readonly isNewTarget: boolean;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.CONFIG) private readonly config: IConfig,
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
) {
this.isNewTarget = config.get<boolean>('jobManagement.ingestion.tasks.tilesMerging.useNewTargetFlagInUpdate');
}

public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId });
logger.info({ msg: `handling ${job.type} job with "init" task` });
await Promise.reject('not implemented');
try {
logger.info({ msg: `handling ${job.type} job with "init" task` });
const { inputFiles, partData, additionalParams } = job.parameters;

const validAdditionalParams = this.validateAdditionalParams(additionalParams);

const taskBuildParams: MergeTilesTaskParams = {
inputFiles,
taskMetadata: {
layerRelativePath: `${job.internalId}/${validAdditionalParams.displayPath}`,
tileOutputFormat: validAdditionalParams.tileOutputFormat,
isNewTarget: this.isNewTarget,
grid: Grid.TWO_ON_ONE,
},
partsData: partData,
};

logger.info({ msg: 'building tasks' });
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
await this.taskBuilder.pushTasks(job.id, mergeTasks);

logger.info({ msg: 'Acking task' });
await this.queueClient.ack(job.id, taskId);
} catch (err) {
if (err instanceof ZodError) {
const errorMsg = `Failed to validate additionalParams: ${err.message}`;
logger.error({ msg: errorMsg, err });
await this.queueClient.reject(job.id, taskId, false, err.message);
return await this.queueClient.jobManagerClient.updateJob(job.id, { status: OperationStatus.FAILED, reason: errorMsg });
}
if (err instanceof Error) {
razbroc marked this conversation as resolved.
Show resolved Hide resolved
logger.error({ msg: 'Failed to handle job init', error: err });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.queueClient.reject(job.id, taskId, true, err.message);
}
}
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, task: ITaskResponse<unknown>): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
}

private validateAdditionalParams(additionalParams: Record<string, unknown>): UpdateAdditionalParams {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const validatedParams = updateAdditionalParamsSchema.parse(additionalParams);
return validatedParams;
}
}
9 changes: 9 additions & 0 deletions src/utils/zod/schemas/jobParametersSchema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { TileOutputFormat } from '@map-colonies/mc-model-types';
import { z } from 'zod';

export const updateAdditionalParamsSchema = z.object({
displayPath: z.string().uuid(),
tileOutputFormat: z.nativeEnum(TileOutputFormat),
});

export type UpdateAdditionalParams = z.infer<typeof updateAdditionalParamsSchema>;
92 changes: 92 additions & 0 deletions tests/unit/job/updateJobHandler/updateJobHandler.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { ZodError } from 'zod';
import { OperationStatus } from '@map-colonies/mc-priority-queue';
import { TileOutputFormat } from '@map-colonies/mc-model-types';
import { Grid, IMergeTaskParameters } from '../../../../src/common/interfaces';
import { finalizeTaskForIngestionUpdate } from '../../mocks/tasksMockData';
import { updateAdditionalParamsSchema } from '../../../../src/utils/zod/schemas/jobParametersSchema';
import { registerDefaultConfig } from '../../mocks/configMock';
import { ingestionUpdateJob } from '../../mocks/jobsMockData';
import { setupUpdateJobHandlerTest } from './updateJobHandlerSetup';

describe('updateJobHandler', () => {
beforeEach(() => {
jest.resetAllMocks();
registerDefaultConfig();
});

describe('handleJobInit', () => {
it('should handle job init successfully', async () => {
const { updateJobHandler, queueClientMock, taskBuilderMock } = setupUpdateJobHandlerTest();
const job = ingestionUpdateJob;
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';

const additionalParams = updateAdditionalParamsSchema.parse(job.parameters.additionalParams);

const taskBuildParams = {
inputFiles: job.parameters.inputFiles,
taskMetadata: {
layerRelativePath: `${job.internalId}/${additionalParams.displayPath}`,
tileOutputFormat: additionalParams.tileOutputFormat,
isNewTarget: true,
grid: Grid.TWO_ON_ONE,
},
partsData: job.parameters.partData,
};

const mergeTasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

taskBuilderMock.buildTasks.mockReturnValue(mergeTasks);
taskBuilderMock.pushTasks.mockResolvedValue(undefined);
queueClientMock.ack.mockResolvedValue(undefined);

await updateJobHandler.handleJobInit(job, taskId);

expect(taskBuilderMock.buildTasks).toHaveBeenCalledWith(taskBuildParams);
expect(taskBuilderMock.pushTasks).toHaveBeenCalledWith(job.id, mergeTasks);
expect(queueClientMock.ack).toHaveBeenCalledWith(job.id, taskId);
});

it('should handle job init failure and reject the task', async () => {
const { updateJobHandler, taskBuilderMock, queueClientMock } = setupUpdateJobHandlerTest();

const job = ingestionUpdateJob;
job.parameters.additionalParams = { displayPath: 'df2fce09-a97c-462f-a537-9a8edca643e9', tileOutputFormat: TileOutputFormat.PNG };

const taskId = '7e630dea-ea29-4b30-a88e-5407bf67d1bc';
const tasks: AsyncGenerator<IMergeTaskParameters, void, void> = (async function* () {})();

const error = new Error('Test error');

taskBuilderMock.buildTasks.mockReturnValue(tasks);
taskBuilderMock.pushTasks.mockRejectedValue(error);
queueClientMock.reject.mockResolvedValue(undefined);

await updateJobHandler.handleJobInit(job, taskId);

expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, true, error.message);
});

it('should handle job init failure with ZodError and Failed the job', async () => {
const { updateJobHandler, jobManagerClientMock, queueClientMock } = setupUpdateJobHandlerTest();
const job = ingestionUpdateJob;
job.parameters.additionalParams = { wrongField: 'wrongValue' };
const taskId = '291bf779-efe0-42bd-8357-aaede47e4d37';
const validAdditionalParamsSpy = jest.spyOn(updateAdditionalParamsSchema, 'parse');

await updateJobHandler.handleJobInit(job, taskId);

expect(validAdditionalParamsSpy).toThrow(ZodError);
expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, taskId, false, expect.any(String));
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
expect(jobManagerClientMock.updateJob).toHaveBeenCalledWith(job.id, { status: OperationStatus.FAILED, reason: expect.any(String) });
});
});
describe('handleJobFinalize', () => {
it('should throw not implemented Error', async () => {
const { updateJobHandler } = setupUpdateJobHandlerTest();

await expect(updateJobHandler.handleJobFinalize(ingestionUpdateJob, finalizeTaskForIngestionUpdate)).rejects.toBe('not implemented');
});
});
});
52 changes: 52 additions & 0 deletions tests/unit/job/updateJobHandler/updateJobHandlerSetup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import jsLogger from '@map-colonies/js-logger';
import { JobManagerClient, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { configMock } from '../../mocks/configMock';
import { TileMergeTaskManager } from '../../../../src/task/models/tileMergeTaskManager';
import { MapproxyApiClient } from '../../../../src/httpClients/mapproxyClient';
import { GeoserverClient } from '../../../../src/httpClients/geoserverClient';
import { CatalogClient } from '../../../../src/httpClients/catalogClient';
import { UpdateJobHandler } from '../../../../src/job/models/updateJobHandler';

export interface UpdateJobHandlerTestContext {
updateJobHandler: UpdateJobHandler;
taskBuilderMock: jest.Mocked<TileMergeTaskManager>;
queueClientMock: jest.Mocked<QueueClient>;
jobManagerClientMock: jest.Mocked<JobManagerClient>;
mapproxyClientMock: jest.Mocked<MapproxyApiClient>;
geoserverClientMock: jest.Mocked<GeoserverClient>;
catalogClientMock: jest.Mocked<CatalogClient>;
}

export const setupUpdateJobHandlerTest = (): UpdateJobHandlerTestContext => {
const taskBuilderMock = {
buildTasks: jest.fn(),
pushTasks: jest.fn(),
} as unknown as jest.Mocked<TileMergeTaskManager>;

const jobManagerClientMock = {
updateJob: jest.fn(),
updateTask: jest.fn(),
} as unknown as jest.Mocked<JobManagerClient>;

const queueClientMock = {
jobManagerClient: jobManagerClientMock,
ack: jest.fn(),
reject: jest.fn(),
} as unknown as jest.Mocked<QueueClient>;

const mapproxyClientMock = { publish: jest.fn() } as unknown as jest.Mocked<MapproxyApiClient>;
const geoserverClientMock = { publish: jest.fn() } as unknown as jest.Mocked<GeoserverClient>;
const catalogClientMock = { publish: jest.fn() } as unknown as jest.Mocked<CatalogClient>;

const updateJobHandler = new UpdateJobHandler(jsLogger({ enabled: false }), configMock, taskBuilderMock, queueClientMock);

return {
updateJobHandler,
taskBuilderMock,
queueClientMock,
jobManagerClientMock,
mapproxyClientMock,
geoserverClientMock,
catalogClientMock,
};
};
15 changes: 13 additions & 2 deletions tests/unit/mocks/jobsMockData.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import { NewRasterLayer, ProductType, TileOutputFormat, Transparency, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import {
IngestionUpdateJobParams,
NewRasterLayer,
ProductType,
TileOutputFormat,
Transparency,
UpdateRasterLayer,
} from '@map-colonies/mc-model-types';
import { IJobResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { ExtendedNewRasterLayer, Grid } from '../../../src/common/interfaces';
import { partsData } from './partsMockData';
Expand Down Expand Up @@ -71,7 +78,7 @@ export const ingestionNewJobExtended: IJobResponse<ExtendedNewRasterLayer, unkno
},
};

export const ingestionUpdateJob: IJobResponse<UpdateRasterLayer, unknown> = {
export const ingestionUpdateJob: IJobResponse<IngestionUpdateJobParams, unknown> = {
id: 'd027b3aa-272b-4dc9-91d7-ba8343af5ed1',
resourceId: 'another-product-id',
version: '1.0',
Expand All @@ -86,6 +93,10 @@ export const ingestionUpdateJob: IJobResponse<UpdateRasterLayer, unknown> = {
fileNames: ['blueMarble.gpkg'],
originDirectory: 'tests',
},
additionalParams: {
displayPath: 'd1e9fe74-2a8f-425f-ac46-d65bb5c5756d',
tileOutputFormat: TileOutputFormat.PNG,
},
},
status: OperationStatus.PENDING,
percentage: 0,
Expand Down
Loading