Skip to content

Commit

Permalink
feat: implementing new job init (merge tiles tasks creation)
Browse files Browse the repository at this point in the history
  • Loading branch information
almog8k committed Aug 22, 2024
1 parent 271a87d commit eee98de
Show file tree
Hide file tree
Showing 30 changed files with 7,519 additions and 1,889 deletions.
1 change: 1 addition & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"disableHttpClientLogs": true
}
},
"mapServerCacheType": "FS",
"jobManagement": {
"config": {
"jobManagerBaseUrl": "http://localhost:8081",
Expand Down
8,333 changes: 6,555 additions & 1,778 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@
"@map-colonies/express-access-log-middleware": "^2.0.1",
"@map-colonies/js-logger": "^1.0.1",
"@map-colonies/mc-model-types": "^17.0.4",
"@map-colonies/mc-priority-queue": "^8.1.0",
"@map-colonies/mc-priority-queue": "^8.1.1",
"@map-colonies/mc-utils": "^3.1.0",
"@map-colonies/read-pkg": "0.0.1",
"@map-colonies/telemetry": "^6.0.0",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/api-metrics": "0.23.0",
"@turf/turf": "^7.0.0",
"compression": "^1.7.4",
"config": "^3.3.9",
"express": "^4.19.2",
Expand Down
1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const SERVICES = {
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
QUEUE_CLIENT: Symbol('QueueClient'),
TILE_RANGER: Symbol('TileRanger'),
} satisfies Record<string, symbol>;

/* eslint-enable @typescript-eslint/naming-convention */
7 changes: 7 additions & 0 deletions src/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ export class JobHandlerNotFoundError extends Error {
this.name = JobHandlerNotFoundError.name;
}
}

export class UnsupportedTransparencyError extends Error {
public constructor(transparency: string) {
super(`unsupported transparency value: ${transparency}`);
this.name = UnsupportedTransparencyError.name;
}
}
83 changes: 78 additions & 5 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { InputFiles, NewRasterLayerMetadata, PolygonPart, TileOutputFormat } from '@map-colonies/mc-model-types';
import { TilesMimeFormat } from '@map-colonies/types';
import { BBox, GeoJSON } from 'geojson';
import { Footprint, ITileRange } from '@map-colonies/mc-utils';

//#region config interfaces
export interface IConfig {
Expand Down Expand Up @@ -50,12 +54,81 @@ export interface LogContext {
}
export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobInit: (job: IJobResponse<any, any>) => Promise<void>;
handleJobInit: (job: IJobResponse<any, any>, task: string) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobFinalize: (job: IJobResponse<any, any>) => Promise<void>;
handleJobFinalize: (job: IJobResponse<any, any>, task: string) => Promise<void>;
}

export interface JobAndTaskType {
export interface JobAndPhaseTask {
job: IJobResponse<unknown, unknown>;
taskType: string;
task: ITaskResponse<unknown>;
}

export interface OverseerNewRasterLayerMetadata extends NewRasterLayerMetadata {
id: string;
displayPath: string;
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
tileMimeType: TilesMimeFormat | undefined;
grid: Grid;
}

export interface MergeTilesTaskParams {
inputFiles: InputFiles;
taskMetadata: MergeTilesMetadata;
partData: PolygonPart[];
}

export interface MergeTilesMetadata {
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
isNewTarget: boolean;
grid: Grid;
}

export enum Grid {
TWO_ON_ONE = '2x1',
}
//#region task
export interface ILayerMergeData {
fileName: string;
tilesPath: string;
footprint?: GeoJSON;
extent: BBox;
}

export interface IMergeParameters {
layers: ILayerMergeData[];
destPath: string;
maxZoom: number;
grid: Grid;
targetFormat: TileOutputFormat;
isNewTarget: boolean;
}

export interface IMergeSources {
type: string;
path: string;
grid?: Grid;
extent?: IBBox;
}

export interface IMergeTaskParameters {
targetFormat: TileOutputFormat;
isNewTarget: boolean;
sources: IMergeSources[];
batches: ITileRange[];
}

export interface IMergeOverlaps {
layers: ILayerMergeData[];
intersection: Footprint;
}
//#endregion task

export interface IBBox {
minX: number;
minY: number;
maxX: number;
maxY: number;
}
11 changes: 6 additions & 5 deletions src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import config from 'config';
import { getOtelMixin } from '@map-colonies/telemetry';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { IHttpRetryConfig } from '@map-colonies/mc-utils';
import { IHttpRetryConfig, TileRanger } from '@map-colonies/mc-utils';
import { trace, metrics as OtelMetrics } from '@opentelemetry/api';
import { instancePerContainerCachingFactory } from 'tsyringe';
import { DependencyContainer } from 'tsyringe/dist/typings/types';
Expand All @@ -11,11 +11,11 @@ import { Metrics } from '@map-colonies/telemetry';
import { SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { NewJobHandler } from './models/newJobHandler';
import { UpdateJobHandler } from './models/updateJobHandler';
import { JOB_HANDLER_FACTORY_SYMBOL, jobHandlerFactory } from './models/jobHandlerFactory';
import { NewJobHandler } from './job/models/newJobHandler';
import { UpdateJobHandler } from './job/models/updateJobHandler';
import { JOB_HANDLER_FACTORY_SYMBOL, jobHandlerFactory } from './job/models/jobHandlerFactory';
import { validateAndGetHandlersTokens } from './utils/configUtil';
import { SwapJobHandler } from './models/swapJobHandler';
import { SwapJobHandler } from './job/models/swapJobHandler';
import { IConfig, IJobManagerConfig, IngestionJobsConfig } from './common/interfaces';

export const queueClientFactory = (container: DependencyContainer): QueueClient => {
Expand Down Expand Up @@ -60,6 +60,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: handlersTokens.Ingestion_New, provider: { useClass: NewJobHandler } },
{ token: handlersTokens.Ingestion_Update, provider: { useClass: UpdateJobHandler } },
{ token: handlersTokens.Ingestion_Swap_Update, provider: { useClass: SwapJobHandler } },
{ token: SERVICES.TILE_RANGER, provider: { useClass: TileRanger } },
{
token: 'onSignal',
provider: {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from '@map-colonies/js-logger';
import config from 'config';
import { DEFAULT_SERVER_PORT, SERVICES } from './common/constants';
import { getApp } from './app';
import { JobProcessor } from './models/jobProcessor';
import { JobProcessor } from './job/models/jobProcessor';

const port: number = config.get<number>('server.port') || DEFAULT_SERVER_PORT;

Expand Down
31 changes: 31 additions & 0 deletions src/job/models/jobHandlerFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Logger } from '@map-colonies/js-logger';
import { DependencyContainer } from 'tsyringe';
import { IJobHandler } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { JobHandlerNotFoundError } from '../../common/errors';

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export const jobHandlerFactory = (container: DependencyContainer) => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
return (jobType: string): IJobHandler => {
try {
const jobHandler = container.resolve<IJobHandler | null>(jobType);
if (!jobHandler) {
const errorMsg = `Job handler for job type ${jobType} not found`;
logger.error(errorMsg);
throw new JobHandlerNotFoundError(errorMsg);
}
return jobHandler;
} catch (err) {
if (err instanceof JobHandlerNotFoundError) {
throw err;
} else {
const message = (err as Error).message;
throw new Error(`Error in Job handler for job type ${jobType}: err:${message}`);
}
}
};
};

export type JobHandlerFactory = ReturnType<typeof jobHandlerFactory>;
export const JOB_HANDLER_FACTORY_SYMBOL = Symbol(jobHandlerFactory.name);
34 changes: 17 additions & 17 deletions src/models/jobProcessor.ts → src/job/models/jobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { setTimeout as setTimeoutPromise } from 'timers/promises';
import { Logger } from '@map-colonies/js-logger';
import { inject, injectable } from 'tsyringe';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { getAvailableJobTypes } from '../utils/configUtil';
import { SERVICES } from '../common/constants';
import { IConfig, IngestionConfig, JobAndTaskType, LogContext } from '../common/interfaces';
import { OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { getAvailableJobTypes } from '../../utils/configUtil';
import { SERVICES } from '../../common/constants';
import { IConfig, IngestionConfig, JobAndPhaseTask, LogContext } from '../../common/interfaces';
import { JOB_HANDLER_FACTORY_SYMBOL, JobHandlerFactory } from './jobHandlerFactory';

@injectable()
Expand Down Expand Up @@ -49,7 +49,7 @@ export class JobProcessor {
private async consumeAndProcess(): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.consumeAndProcess.name };
try {
const jobAndTaskType = await this.getJobWithTaskType();
const jobAndTaskType = await this.getJobWithPhaseTask();

if (!jobAndTaskType) {
await setTimeoutPromise(this.dequeueIntervalMs);
Expand All @@ -63,35 +63,35 @@ export class JobProcessor {
}
}

private async processJob(jobAndTaskType: JobAndTaskType): Promise<void> {
const { job, taskType } = jobAndTaskType;
private async processJob(jobAndTaskType: JobAndPhaseTask): Promise<void> {
const { job, task } = jobAndTaskType;
const taskTypes = this.ingestionConfig.pollingTasks;
const jobHandler = this.jobHandlerFactory(job.type);

switch (taskType) {
switch (task.type) {
case taskTypes.init:
await jobHandler.handleJobInit(job);
await jobHandler.handleJobInit(job, task.id);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job);
await jobHandler.handleJobFinalize(job, task.id);
break;
}
}

private async getJobWithTaskType(): Promise<JobAndTaskType | undefined> {
const logCtx: LogContext = { ...this.logContext, function: this.getJobWithTaskType.name };
private async getJobWithPhaseTask(): Promise<JobAndPhaseTask | undefined> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobWithPhaseTask.name } });
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
this.logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"`, logContext: logCtx });
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);

if (!task) {
continue;
}
this.logger.info({ msg: `dequeued task ${task.id}`, metadata: task, logContext: this.logContext });
await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS });
logger.info({ msg: `dequeued task ${task.id}`, metadata: task });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
this.logger.info({ msg: `got job ${job.id}`, metadata: job, logContext: this.logContext });
return { job, taskType: task.type };
logger.info({ msg: `got job ${job.id}`, metadata: job });
return { job, task };
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { randomUUID } from 'crypto';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { TilesMimeFormat, lookup as mimeLookup } from '@map-colonies/types';
import { NewRasterLayer, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Grid, IJobHandler, LogContext, MergeTilesTaskParams, OverseerNewRasterLayerMetadata } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
import { MergeTilesTaskBuilder } from '../../task/models/mergeTilesTaskBuilder';

@injectable()
export class NewJobHandler implements IJobHandler {
private readonly logContext: LogContext;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(MergeTilesTaskBuilder) private readonly taskBuilder: MergeTilesTaskBuilder,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
) {
this.logContext = {
fileName: __filename,
class: NewJobHandler.name,
};
}

public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
try {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobInit.name } });
logger.info({ msg: `Handling ${job.type} job with "init" task`, metadata: { job } });

const { inputFiles, metadata, partData } = job.parameters;
const overseerLayerMetadata = this.mapToOverseerNewLayerMetadata(metadata);

this.logger.info({ msg: 'Updating job with new metadata', metadata: { job, overseerLayerMetadata } });
await this.queueClient.jobManagerClient.updateJob(job.id, { parameters: { metadata: overseerLayerMetadata, partData, inputFiles } });

const buildTasksParams: MergeTilesTaskParams = {
inputFiles,
taskMetadata: {
layerRelativePath: overseerLayerMetadata.layerRelativePath,
tileOutputFormat: overseerLayerMetadata.tileOutputFormat,
isNewTarget: true,
grid: overseerLayerMetadata.grid,
},
partData,
};

logger.info({ msg: 'Building tasks', metadata: { buildTasksParams } });
const mergeTasks = this.taskBuilder.buildTasks(buildTasksParams);

logger.info({ msg: 'Pushing tasks', metadata: { mergeTasks } });
await this.taskBuilder.pushTasks(job.id, taskId, mergeTasks);

await this.queueClient.ack(job.id, taskId);

logger.info({ msg: 'Job init completed successfully' });
} catch (err) {
if (err instanceof Error) {
this.logger.error({ msg: 'Failed to handle job init', error: err, logContext: { ...this.logContext, function: this.handleJobInit.name } });
await this.queueClient.reject(job.id, taskId, true, err.message);
}
}
}

public async handleJobFinalize(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.handleJobFinalize.name };
this.logger.info({ msg: `handling ${job.type} job with "finalize"`, metadata: { job }, logContext: logCtx });
await Promise.reject('not implemented');
}

private readonly mapToOverseerNewLayerMetadata = (metadata: NewRasterLayerMetadata): OverseerNewRasterLayerMetadata => {
const id = randomUUID();
const displayPath = randomUUID();
const layerRelativePath = `${id}/${displayPath}`;
const tileOutputFormat = getTileOutputFormat(metadata.transparency);
const tileMimeType = mimeLookup(tileOutputFormat) as TilesMimeFormat;
const grid = Grid.TWO_ON_ONE;

return {
...metadata,
id,
displayPath,
layerRelativePath,
tileOutputFormat,
tileMimeType,
grid,
};
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler, LogContext } from '../common/interfaces';
import { SERVICES } from '../common/constants';
import { IJobHandler, LogContext } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';

@injectable()
export class SwapJobHandler implements IJobHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler, LogContext } from '../common/interfaces';
import { SERVICES } from '../common/constants';
import { IJobHandler, LogContext } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';

@injectable()
export class UpdateJobHandler implements IJobHandler {
Expand Down
Loading

0 comments on commit eee98de

Please sign in to comment.