Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementing new job init- merge tiles tasks creation(MAPCO-4348) #5

Merged
merged 20 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"disableHttpClientLogs": true
}
},
"mapServerCacheType": "FS",
"jobManagement": {
"config": {
"jobManagerBaseUrl": "http://localhost:8081",
Expand Down
40,354 changes: 13,794 additions & 26,560 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@
"@map-colonies/express-access-log-middleware": "^2.0.1",
"@map-colonies/js-logger": "^1.0.1",
"@map-colonies/mc-model-types": "^17.0.4",
"@map-colonies/mc-priority-queue": "^8.1.0",
"@map-colonies/mc-priority-queue": "^8.1.1",
"@map-colonies/mc-utils": "^3.1.0",
"@map-colonies/read-pkg": "0.0.1",
"@map-colonies/telemetry": "^6.0.0",
"@opentelemetry/api": "^1.7.0",
"@opentelemetry/api-metrics": "0.23.0",
"@turf/turf": "^7.0.0",
"compression": "^1.7.4",
"config": "^3.3.9",
"express": "^4.19.2",
Expand Down
1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const SERVICES = {
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
QUEUE_CLIENT: Symbol('QueueClient'),
TILE_RANGER: Symbol('TileRanger'),
} satisfies Record<string, symbol>;

/* eslint-enable @typescript-eslint/naming-convention */
7 changes: 7 additions & 0 deletions src/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ export class JobHandlerNotFoundError extends Error {
this.name = JobHandlerNotFoundError.name;
}
}

export class UnsupportedTransparencyError extends Error {
public constructor(transparency: string) {
super(`unsupported transparency value: ${transparency}`);
this.name = UnsupportedTransparencyError.name;
}
}
83 changes: 78 additions & 5 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { InputFiles, NewRasterLayerMetadata, PolygonPart, TileOutputFormat } from '@map-colonies/mc-model-types';
import { TilesMimeFormat } from '@map-colonies/types';
import { BBox, GeoJSON } from 'geojson';
import { Footprint, ITileRange } from '@map-colonies/mc-utils';

//#region config interfaces
export interface IConfig {
Expand Down Expand Up @@ -50,12 +54,81 @@ export interface LogContext {
}
export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobInit: (job: IJobResponse<any, any>) => Promise<void>;
handleJobInit: (job: IJobResponse<any, any>, task: string) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
handleJobFinalize: (job: IJobResponse<any, any>) => Promise<void>;
handleJobFinalize: (job: IJobResponse<any, any>, task: string) => Promise<void>;
}
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved

export interface JobAndTaskType {
export interface JobAndPhaseTask {
job: IJobResponse<unknown, unknown>;
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
taskType: string;
task: ITaskResponse<unknown>;
}

export interface OverseerNewRasterLayerMetadata extends NewRasterLayerMetadata {
catalogId: string;
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
displayPath: string;
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
tileMimeType: TilesMimeFormat | undefined;
grid: Grid;
}

export interface MergeTilesTaskParams {
inputFiles: InputFiles;
taskMetadata: MergeTilesMetadata;
partData: PolygonPart[];
}

export interface MergeTilesMetadata {
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
isNewTarget: boolean;
grid: Grid;
}

export enum Grid {
TWO_ON_ONE = '2x1',
}
//#region task
export interface ILayerMergeData {
fileName: string;
tilesPath: string;
footprint?: GeoJSON;
extent: BBox;
}

export interface IMergeParameters {
layers: ILayerMergeData[];
destPath: string;
maxZoom: number;
grid: Grid;
targetFormat: TileOutputFormat;
isNewTarget: boolean;
}

export interface IMergeSources {
type: string;
path: string;
grid?: Grid;
extent?: IBBox;
}

export interface IMergeTaskParameters {
targetFormat: TileOutputFormat;
isNewTarget: boolean;
sources: IMergeSources[];
batches: ITileRange[];
}

export interface IMergeOverlaps {
layers: ILayerMergeData[];
intersection: Footprint;
}
//#endregion task

export interface IBBox {
minX: number;
minY: number;
maxX: number;
maxY: number;
}
11 changes: 6 additions & 5 deletions src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import config from 'config';
import { getOtelMixin } from '@map-colonies/telemetry';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { IHttpRetryConfig } from '@map-colonies/mc-utils';
import { IHttpRetryConfig, TileRanger } from '@map-colonies/mc-utils';
import { trace, metrics as OtelMetrics } from '@opentelemetry/api';
import { instancePerContainerCachingFactory } from 'tsyringe';
import { DependencyContainer } from 'tsyringe/dist/typings/types';
Expand All @@ -11,11 +11,11 @@ import { Metrics } from '@map-colonies/telemetry';
import { SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { NewJobHandler } from './models/newJobHandler';
import { UpdateJobHandler } from './models/updateJobHandler';
import { JOB_HANDLER_FACTORY_SYMBOL, jobHandlerFactory } from './models/jobHandlerFactory';
import { NewJobHandler } from './job/models/newJobHandler';
import { UpdateJobHandler } from './job/models/updateJobHandler';
import { JOB_HANDLER_FACTORY_SYMBOL, jobHandlerFactory } from './job/models/jobHandlerFactory';
import { validateAndGetHandlersTokens } from './utils/configUtil';
import { SwapJobHandler } from './models/swapJobHandler';
import { SwapJobHandler } from './job/models/swapJobHandler';
import { IConfig, IJobManagerConfig, IngestionJobsConfig } from './common/interfaces';

export const queueClientFactory = (container: DependencyContainer): QueueClient => {
Expand Down Expand Up @@ -60,6 +60,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: handlersTokens.Ingestion_New, provider: { useClass: NewJobHandler } },
{ token: handlersTokens.Ingestion_Update, provider: { useClass: UpdateJobHandler } },
{ token: handlersTokens.Ingestion_Swap_Update, provider: { useClass: SwapJobHandler } },
{ token: SERVICES.TILE_RANGER, provider: { useClass: TileRanger } },
{
token: 'onSignal',
provider: {
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from '@map-colonies/js-logger';
import config from 'config';
import { DEFAULT_SERVER_PORT, SERVICES } from './common/constants';
import { getApp } from './app';
import { JobProcessor } from './models/jobProcessor';
import { JobProcessor } from './job/models/jobProcessor';

const port: number = config.get<number>('server.port') || DEFAULT_SERVER_PORT;

Expand Down
31 changes: 31 additions & 0 deletions src/job/models/jobHandlerFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Logger } from '@map-colonies/js-logger';
import { DependencyContainer } from 'tsyringe';
import { IJobHandler } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { JobHandlerNotFoundError } from '../../common/errors';

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export const jobHandlerFactory = (container: DependencyContainer) => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
return (jobType: string): IJobHandler => {
try {
const jobHandler = container.resolve<IJobHandler | null>(jobType);
if (!jobHandler) {
const errorMsg = `Job handler for job type ${jobType} not found`;
logger.error(errorMsg);
throw new JobHandlerNotFoundError(errorMsg);
}
return jobHandler;
} catch (err) {
if (err instanceof JobHandlerNotFoundError) {
throw err;
} else {
const message = (err as Error).message;
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
throw new Error(`Error in Job handler for job type ${jobType}: err:${message}`);
}
}
};
};

export type JobHandlerFactory = ReturnType<typeof jobHandlerFactory>;
export const JOB_HANDLER_FACTORY_SYMBOL = Symbol(jobHandlerFactory.name);
34 changes: 17 additions & 17 deletions src/models/jobProcessor.ts → src/job/models/jobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { setTimeout as setTimeoutPromise } from 'timers/promises';
import { Logger } from '@map-colonies/js-logger';
import { inject, injectable } from 'tsyringe';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { getAvailableJobTypes } from '../utils/configUtil';
import { SERVICES } from '../common/constants';
import { IConfig, IngestionConfig, JobAndTaskType, LogContext } from '../common/interfaces';
import { OperationStatus, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { getAvailableJobTypes } from '../../utils/configUtil';
import { SERVICES } from '../../common/constants';
import { IConfig, IngestionConfig, JobAndPhaseTask, LogContext } from '../../common/interfaces';
import { JOB_HANDLER_FACTORY_SYMBOL, JobHandlerFactory } from './jobHandlerFactory';

@injectable()
Expand Down Expand Up @@ -49,7 +49,7 @@ export class JobProcessor {
private async consumeAndProcess(): Promise<void> {
const logCtx: LogContext = { ...this.logContext, function: this.consumeAndProcess.name };
try {
const jobAndTaskType = await this.getJobWithTaskType();
const jobAndTaskType = await this.getJobWithPhaseTask();

if (!jobAndTaskType) {
await setTimeoutPromise(this.dequeueIntervalMs);
Expand All @@ -63,35 +63,35 @@ export class JobProcessor {
}
}

private async processJob(jobAndTaskType: JobAndTaskType): Promise<void> {
const { job, taskType } = jobAndTaskType;
private async processJob(jobAndTaskType: JobAndPhaseTask): Promise<void> {
const { job, task } = jobAndTaskType;
const taskTypes = this.ingestionConfig.pollingTasks;
const jobHandler = this.jobHandlerFactory(job.type);

switch (taskType) {
switch (task.type) {
case taskTypes.init:
await jobHandler.handleJobInit(job);
await jobHandler.handleJobInit(job, task.id);
break;
case taskTypes.finalize:
await jobHandler.handleJobFinalize(job);
await jobHandler.handleJobFinalize(job, task.id);
break;
}
}

private async getJobWithTaskType(): Promise<JobAndTaskType | undefined> {
const logCtx: LogContext = { ...this.logContext, function: this.getJobWithTaskType.name };
private async getJobWithPhaseTask(): Promise<JobAndPhaseTask | undefined> {
const logger = this.logger.child({ logContext: { ...this.logContext, function: this.getJobWithPhaseTask.name } });
for (const taskType of this.pollingTaskTypes) {
for (const jobType of this.jobTypes) {
this.logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"`, logContext: logCtx });
logger.debug({ msg: `trying to dequeue task of type "${taskType}" and job of type "${jobType}"` });
const task = await this.queueClient.dequeue(jobType, taskType);

if (!task) {
continue;
}
this.logger.info({ msg: `dequeued task ${task.id}`, metadata: task, logContext: this.logContext });
await this.queueClient.jobManagerClient.updateJob(task.jobId, { status: OperationStatus.IN_PROGRESS });
logger.info({ msg: `dequeued task ${task.id}`, metadata: task });
const job = await this.queueClient.jobManagerClient.getJob(task.jobId);
this.logger.info({ msg: `got job ${job.id}`, metadata: job, logContext: this.logContext });
return { job, taskType: task.type };
logger.info({ msg: `got job ${job.id}`, metadata: job });
return { job, task };
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { randomUUID } from 'crypto';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { TilesMimeFormat, lookup as mimeLookup } from '@map-colonies/types';
import { NewRasterLayer, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Grid, IJobHandler, LogContext, MergeTilesTaskParams, OverseerNewRasterLayerMetadata } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
import { MergeTilesTaskBuilder } from '../../task/models/mergeTilesTaskBuilder';

@injectable()
export class NewJobHandler implements IJobHandler {
private readonly logContext: LogContext;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(MergeTilesTaskBuilder) private readonly taskBuilder: MergeTilesTaskBuilder,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
) {
this.logContext = {
fileName: __filename,
class: NewJobHandler.name,
};
}

public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobInit.name } });
try {
logger.info({ msg: `Handling ${job.type} job with "init" task`, metadata: { job } });

const { inputFiles, metadata, partData } = job.parameters;
const overseerLayerMetadata = this.mapToOverseerNewLayerMetadata(metadata);

this.logger.info({ msg: 'Updating job with new metadata', metadata: { job, overseerLayerMetadata } });
await this.queueClient.jobManagerClient.updateJob(job.id, { parameters: { metadata: overseerLayerMetadata, partData, inputFiles } });
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved

const buildTasksParams: MergeTilesTaskParams = {
inputFiles,
taskMetadata: {
layerRelativePath: overseerLayerMetadata.layerRelativePath,
tileOutputFormat: overseerLayerMetadata.tileOutputFormat,
isNewTarget: true,
grid: overseerLayerMetadata.grid,
},
partData,
};

logger.info({ msg: 'Building tasks', metadata: { buildTasksParams } });
const mergeTasks = this.taskBuilder.buildTasks(buildTasksParams);

logger.info({ msg: 'Pushing tasks', metadata: { mergeTasks } });
await this.taskBuilder.pushTasks(job.id, taskId, mergeTasks);

await this.queueClient.ack(job.id, taskId);

logger.info({ msg: 'Job init completed successfully' });
} catch (err) {
if (err instanceof Error) {
logger.error({ msg: 'Failed to handle job init', error: err, logContext: { ...this.logContext, function: this.handleJobInit.name } });
await this.queueClient.reject(job.id, taskId, true, err.message);
}
}
}

public async handleJobFinalize(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId, logContext: { ...this.logContext, function: this.handleJobFinalize.name } });
logger.info({ msg: `handling ${job.type} job with "finalize"`, metadata: { job } });
await Promise.reject('not implemented');
}

private readonly mapToOverseerNewLayerMetadata = (metadata: NewRasterLayerMetadata): OverseerNewRasterLayerMetadata => {
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
const catalogId = randomUUID();
const displayPath = randomUUID();
const layerRelativePath = `${catalogId}/${displayPath}`;
const tileOutputFormat = getTileOutputFormat(metadata.transparency);
const tileMimeType = mimeLookup(tileOutputFormat) as TilesMimeFormat;
const grid = Grid.TWO_ON_ONE;

return {
...metadata,
catalogId,
displayPath,
layerRelativePath,
tileOutputFormat,
tileMimeType,
grid,
};
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler, LogContext } from '../common/interfaces';
import { SERVICES } from '../common/constants';
import { IJobHandler, LogContext } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';

@injectable()
export class SwapJobHandler implements IJobHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IJobHandler, LogContext } from '../common/interfaces';
import { SERVICES } from '../common/constants';
import { IJobHandler, LogContext } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';

@injectable()
export class UpdateJobHandler implements IJobHandler {
Expand Down
Loading
Loading