Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementing new job init- merge tiles tasks creation(MAPCO-4348) #5

Merged
merged 20 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,20 @@
"dequeueIntervalMs": 3000
},
"ingestion": {
"taskMaxTaskAttempts": 3,
"maxTaskAttempts": 3,
"pollingTasks": {
"init": "init",
"finalize": "finalize"
},
"jobs": {
"new": {
"type": "Ingestion_New",
"tasks": {
"mergeTiles": "tilesMerging"
}
"type": "Ingestion_New"
},
"update": {
"type": "Ingestion_Update",
"tasks": {
"mergeTiles": "tilesMerging"
}
"type": "Ingestion_Update"
},
"swapUpdate": {
"type": "Ingestion_Swap_Update",
"tasks": {
"mergeTiles": "tilesMerging"
}
"type": "Ingestion_Swap_Update"
}
},
"tasks": {
Expand Down
13 changes: 8 additions & 5 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export interface IPollingTasks {
export interface IngestionConfig {
pollingTasks: IPollingTasks;
jobs: IngestionJobsConfig;
taskMaxTaskAttempts: number;
maxTaskAttempts: number;
}
//#endregion config
export interface LogContext {
Expand All @@ -55,17 +55,19 @@ export interface LogContext {
}
export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobInit: (job: IJobResponse<any, any>, task: string) => Promise<void>;
handleJobInit: (job: IJobResponse<any, any>, taskId: string) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobFinalize: (job: IJobResponse<any, any>, task: string) => Promise<void>;
handleJobFinalize: (job: IJobResponse<any, any>, taskId: string) => Promise<void>;
}

export interface JobAndPhaseTask {
export interface JobAndTaskResponse {
job: IJobResponse<unknown, unknown>;
task: ITaskResponse<unknown>;
}

export interface OverseerNewRasterLayerMetadata extends NewRasterLayerMetadata {
export type TaskResponse<T> = { task: ITaskResponse<T>; shouldSkipTask: false } | { task: null; shouldSkipTask: true };
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved

export interface ExtendedRasterLayerMetadata extends NewRasterLayerMetadata {
catalogId: string;
displayPath: string;
layerRelativePath: string;
Expand Down Expand Up @@ -96,6 +98,7 @@ export interface ILayerMergeData {
tilesPath: string;
footprint?: GeoJSON;
extent: BBox;
maxZoom: number;
}

export interface IMergeParameters {
Expand Down
7 changes: 4 additions & 3 deletions src/job/models/jobHandlerFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ export const jobHandlerFactory = (container: DependencyContainer) => {
const jobHandler = container.resolve<IJobHandler | null>(jobType);
if (!jobHandler) {
const errorMsg = `Job handler for job type ${jobType} not found`;
logger.error(errorMsg);
throw new JobHandlerNotFoundError(errorMsg);
}
return jobHandler;
} catch (err) {
if (err instanceof JobHandlerNotFoundError) {
logger.error({ msg: err.message });
throw err;
} else {
const message = (err as Error).message;
throw new Error(`Error in Job handler for job type ${jobType}: err:${message}`);
const errorMsg = `Error in Job handler for job type ${jobType}: err:${(err as Error).message}`;
logger.error({ msg: errorMsg });
throw new Error(errorMsg);
}
}
};
Expand Down
80 changes: 55 additions & 25 deletions src/job/models/jobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { setTimeout as setTimeoutPromise } from 'timers/promises';
import { Logger } from '@map-colonies/js-logger';
import { inject, injectable } from 'tsyringe';
import { OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { IJobResponse, OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { getAvailableJobTypes } from '../../utils/configUtil';
import { SERVICES } from '../../common/constants';
import { IConfig, IngestionConfig, JobAndPhaseTask, LogContext } from '../../common/interfaces';
import { IConfig, IngestionConfig, JobAndTaskResponse, LogContext, TaskResponse } from '../../common/interfaces';
import { JOB_HANDLER_FACTORY_SYMBOL, JobHandlerFactory } from './jobHandlerFactory';

@injectable()
Expand Down Expand Up @@ -48,19 +48,19 @@ export class JobProcessor {

private async consumeAndProcess(): Promise<void> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.consumeAndProcess.name } });
let jobAndTaskType: JobAndPhaseTask | undefined = undefined;
let jobAndTask: JobAndTaskResponse | undefined = undefined;
try {
jobAndTaskType = await this.getJobWithPhaseTask();
if (!jobAndTaskType) {
jobAndTask = await this.getJobAndTaskResponse();
if (!jobAndTask) {
logger.debug({ msg: 'waiting for next dequeue', metadata: { dequeueIntervalMs: this.dequeueIntervalMs } });
await setTimeoutPromise(this.dequeueIntervalMs);
return;
}

await this.processJob(jobAndTaskType);
await this.processJob(jobAndTask);
} catch (error) {
if (error instanceof Error && jobAndTaskType) {
const { job, task } = jobAndTaskType;
if (error instanceof Error && jobAndTask) {
const { job, task } = jobAndTask;
logger.error({ msg: 'rejecting task', error, metadata: { job, task } });
await this.queueClient.reject(job.id, task.id, true, error.message);
}
Expand All @@ -69,7 +69,7 @@ export class JobProcessor {
}
}

private async processJob(jobAndTaskType: JobAndPhaseTask): Promise<void> {
private async processJob(jobAndTaskType: JobAndTaskResponse): Promise<void> {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.processJob.name } });
const { job, task } = jobAndTaskType;
try {
Expand All @@ -86,40 +86,70 @@ export class JobProcessor {
}
} catch (error) {
if (error instanceof Error) {
logger.error({ msg: `Failed processing the job: ${error.message}`, error });
logger.error({ msg: `failed processing the job: ${error.message}`, error });
}
throw error;
}
}

private async getJobWithPhaseTask(): Promise<JobAndPhaseTask | undefined> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobWithPhaseTask.name } });
private async getJobAndTaskResponse(): Promise<JobAndTaskResponse | undefined> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobAndTaskResponse.name } });
try {
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);
if (!task) {
logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` });
continue;
}
if (task.attempts === this.ingestionConfig.taskMaxTaskAttempts) {
logger.warn({ msg: `task ${task.id} reached max attempts, skipping`, metadata: task });
const { task, shouldSkipTask } = await this.getTask(jobType, taskType);

if (shouldSkipTask) {
logger.debug({ msg: `skipping task of type "${taskType}" and job of type "${jobType}"` });
continue;
}
logger.info({ msg: `dequeued task ${task.id}`, metadata: task });

await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
logger.info({ msg: `got job ${job.id}`, metadata: job });
const job = await this.getJob(task.jobId);
logger.info({ msg: `got job and task response`, jobId: job.id, jobType: job.type, taskId: task.id, taskType: task.type });

return { job, task };
}
}
} catch (error) {
if (error instanceof Error) {
logger.error({ msg: `Failed to get job with phase task: ${error.message}`, error });
logger.error({ msg: `Failed to get job and task response: ${error.message}`, error });
}
throw error;
}
}

private async getJob(jobId: string): Promise<IJobResponse<unknown, unknown>> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJob.name }, jobId });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved

logger.info({ msg: `updating job status to ${OperationStatus.IN_PROGRESS}` });
await this.queueClient.jobManagerClient.updateJob(jobId, { status: OperationStatus.IN_PROGRESS });

const job = await this.queueClient.jobManagerClient.getJob(jobId);
logger.info({ msg: `got job ${job.id}`, jobType: job.type });

return job;
}

private async getTask(jobType: string, taskType: string): Promise<TaskResponse<unknown>> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getTask.name }, jobType, taskType });
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);

if (!task) {
logger.debug({ msg: `no task of type "${taskType}" and job of type "${jobType}" found` });
return { task: null, shouldSkipTask: true };
}
if (task.attempts >= this.ingestionConfig.maxTaskAttempts) {
const message = `${taskType} task ${task.id} reached max attempts, rejects as unrecoverable`;

logger.warn({ msg: message, taskId: task.id, attempts: task.attempts });
await this.queueClient.reject(task.jobId, task.id, false);

logger.error({ msg: `updating job status to ${OperationStatus.FAILED}`, jobId: task.jobId });
await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.FAILED, reason: message });
return { task: null, shouldSkipTask: true };
}
logger.info({ msg: `dequeued task ${task.id} successfully` });
return { task, shouldSkipTask: false };
}
}
43 changes: 26 additions & 17 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { TilesMimeFormat, lookup as mimeLookup } from '@map-colonies/types';
import { NewRasterLayer, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Grid, IJobHandler, LogContext, MergeTilesTaskParams, OverseerNewRasterLayerMetadata } from '../../common/interfaces';
import { Grid, IJobHandler, LogContext, MergeTilesTaskParams, ExtendedRasterLayerMetadata } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
import { MergeTilesTaskBuilder } from '../../task/models/mergeTilesTaskBuilder';
Expand All @@ -25,33 +25,42 @@ export class NewJobHandler implements IJobHandler {
}

public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobInit.name } });
const logger = this.logger.child({
jobId: job.id,
jobType: job.type,
taskId,
logContext: { ...this.logContext, function: this.handleJobInit.name },
});
try {
logger.info({ msg: `Handling ${job.type} job with "init" task`, metadata: { job } });
logger.info({ msg: `handling ${job.type} job with "init" task` });

const { inputFiles, metadata, partData } = job.parameters;
const overseerLayerMetadata = this.mapToOverseerNewLayerMetadata(metadata);
const extendedLayerMetadata = this.mapToExtendedNewLayerMetadata(metadata);

this.logger.info({ msg: 'Updating job with new metadata', metadata: { job, overseerLayerMetadata } });
await this.queueClient.jobManagerClient.updateJob(job.id, { parameters: { metadata: overseerLayerMetadata, partData, inputFiles } });

const buildTasksParams: MergeTilesTaskParams = {
const taskBuildParams: MergeTilesTaskParams = {
inputFiles,
taskMetadata: {
layerRelativePath: overseerLayerMetadata.layerRelativePath,
tileOutputFormat: overseerLayerMetadata.tileOutputFormat,
layerRelativePath: extendedLayerMetadata.layerRelativePath,
tileOutputFormat: extendedLayerMetadata.tileOutputFormat,
isNewTarget: true,
grid: overseerLayerMetadata.grid,
grid: extendedLayerMetadata.grid,
},
partData,
};

logger.info({ msg: 'Building tasks', metadata: { buildTasksParams } });
const mergeTasks = this.taskBuilder.buildTasks(buildTasksParams);
logger.info({ msg: 'building tasks' });
logger.debug({ msg: 'building tasks', metadata: { taskBuildParams } });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const mergeTasks = this.taskBuilder.buildTasks(taskBuildParams);

logger.info({ msg: 'pushing tasks' });
logger.debug({ msg: 'pushing tasks', metadata: { mergeTasks } });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.taskBuilder.pushTasks(job.id, mergeTasks);

logger.info({ msg: 'Pushing tasks', metadata: { mergeTasks } });
await this.taskBuilder.pushTasks(job.id, taskId, mergeTasks);
logger.info({ msg: 'Updating job with new metadata' });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
this.logger.debug({ msg: 'Updating job with new metadata', metadata: { job, extendedLayerMetadata } });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.queueClient.jobManagerClient.updateJob(job.id, { parameters: { metadata: extendedLayerMetadata, partData, inputFiles } });

logger.info({ msg: 'Acking task' });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.queueClient.ack(job.id, taskId);

logger.info({ msg: 'Job init completed successfully' });
Expand All @@ -65,11 +74,11 @@ export class NewJobHandler implements IJobHandler {

public async handleJobFinalize(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobFinalize.name } });
logger.info({ msg: `handling ${job.type} job with "finalize"`, metadata: { job } });
logger.info({ msg: `handling ${job.type} job with "finalize"` });
await Promise.reject('not implemented');
}

private readonly mapToOverseerNewLayerMetadata = (metadata: NewRasterLayerMetadata): OverseerNewRasterLayerMetadata => {
private readonly mapToExtendedNewLayerMetadata = (metadata: NewRasterLayerMetadata): ExtendedRasterLayerMetadata => {
const catalogId = randomUUID();
const displayPath = randomUUID();
const layerRelativePath = `${catalogId}/${displayPath}`;
Expand Down
12 changes: 6 additions & 6 deletions src/job/models/swapJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ export class SwapJobHandler implements IJobHandler {
this.logContext = { fileName: __filename, class: SwapJobHandler.name };
}

public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobInit.name };
this.logger.info({ msg: `handling ${job.type} job with "init" task`, metadata: { job }, logContext: logCtx });
public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobInit.name } });
logger.info({ msg: `handling ${job.type} job with "init" task` });
await Promise.reject('not implemented');
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: `handling ${job.type} job with "finalize" task`, metadata: { job }, logContext: logCtx });
public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobFinalize.name } });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
}
}
12 changes: 6 additions & 6 deletions src/job/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ export class UpdateJobHandler implements IJobHandler {
this.logContext = { fileName: __filename, class: UpdateJobHandler.name };
}

public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobInit.name };
this.logger.info({ msg: `handling ${job.type} job with "init" task`, metadata: { job }, logContext: logCtx });
public async handleJobInit(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobInit.name } });
logger.info({ msg: `handling ${job.type} job with "init" task` });
await Promise.reject('not implemented');
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: `handling ${job.type} job with "finalize" task`, metadata: { job }, logContext: logCtx });
public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobFinalize.name } });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
}
}
Loading
Loading