Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Ingestion update finalize(MAPCO-4442) #24

Merged
merged 16 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@
"tilesMerging": {
"type": "TILES_MERGING_TASK_TYPE",
"tileBatchSize": "TILES_MERGING_TILE_BATCH_SIZE",
"taskBatchSize": "TILES_MERGING_TASK_BATCH_SIZE",
"useNewTargetFlagInUpdate": {
"__name": "TILES_MERGING_USE_NEW_TARGET_FLAG",
"__format": "boolean"
}
"taskBatchSize": "TILES_MERGING_TASK_BATCH_SIZE"
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@
"tilesMerging": {
"type": "tilesMerging",
"tileBatchSize": 10000,
"taskBatchSize": 5,
"useNewTargetFlagInUpdate": true
"taskBatchSize": 5
}
}
}
Expand Down
1 change: 0 additions & 1 deletion helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ data:
TILES_MERGING_TASK_TYPE: {{ $jobDefinitions.tasks.merge.type | quote }}
TILES_MERGING_TILE_BATCH_SIZE: {{ $jobDefinitions.tasks.merge.tileBatchSize | quote }}
TILES_MERGING_TASK_BATCH_SIZE: {{ $jobDefinitions.tasks.merge.taskBatchSize | quote }}
TILES_MERGING_USE_NEW_TARGET_FLAG: {{ $jobDefinitions.tasks.merge.useNewTargetFlagInUpdate | quote }}
MAPPROXY_API_URL: {{ $serviceUrls.mapproxyApi | quote }}
GEOSERVER_API_URL: {{ $serviceUrls.geoserverApi | quote }}
CATALOG_MANAGER_URL: {{ $serviceUrls.catalogManager | quote }}
Expand Down
1 change: 0 additions & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ jobDefinitions:
type: ""
tileBatchSize: 10000
taskBatchSize: 5
useNewTargetFlagInUpdate: true
finalize:
type: ""
config:
Expand Down
3 changes: 0 additions & 3 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { IngestionNewFinalizeTaskParams } from '@map-colonies/mc-model-types';
import { readPackageJsonSync } from '@map-colonies/read-pkg';

export const SERVICE_NAME = readPackageJsonSync().name ?? 'unknown_service';
Expand Down Expand Up @@ -37,6 +36,4 @@ export const storageProviderToCacheTypeMap = new Map([
[TilesStorageProvider.S3, PublishedLayerCacheType.S3],
]);

export type FinalizeSteps = keyof IngestionNewFinalizeTaskParams;

/* eslint-enable @typescript-eslint/naming-convention */
8 changes: 8 additions & 0 deletions src/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,11 @@ export class PublishLayerError extends Error {
this.stack = err.stack;
}
}

export class UpdateLayerError extends Error {
public constructor(updatingClient: string, err: Error) {
super(`Failed to update layer in ${updatingClient} client: ${err.message}`);
this.name = UpdateLayerError.name;
this.stack = err.stack;
}
}
19 changes: 18 additions & 1 deletion src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { GeoJSON } from 'geojson';
import { InputFiles, NewRasterLayerMetadata, PolygonPart, TileOutputFormat, LayerData } from '@map-colonies/mc-model-types';
import {
InputFiles,
NewRasterLayerMetadata,
PolygonPart,
TileOutputFormat,
LayerData,
LayerMetadata,
IngestionNewFinalizeTaskParams,
IngestionUpdateFinalizeTaskParams,
IngestionSwapUpdateFinalizeTaskParams,
} from '@map-colonies/mc-model-types';
import { TilesMimeFormat } from '@map-colonies/types';
import { BBox, Polygon } from 'geojson';
import { Footprint, ITileRange } from '@map-colonies/mc-utils';
Expand Down Expand Up @@ -77,6 +87,8 @@ export interface ExtendedRasterLayerMetadata extends NewRasterLayerMetadata {

export type ExtendedNewRasterLayer = { metadata: ExtendedRasterLayerMetadata } & LayerData;

export type FinalizeTaskParams = IngestionNewFinalizeTaskParams | IngestionUpdateFinalizeTaskParams | IngestionSwapUpdateFinalizeTaskParams;

//#endregion job/task

//#region merge task
Expand Down Expand Up @@ -177,4 +189,9 @@ export interface PartAggregatedData {
productBoundingBox: string;
}

export interface ICatalogUpdateRequestBody {
metadata: CatalogUpdateMetadata;
}

export type CatalogUpdateMetadata = Partial<LayerMetadata>;
//#endregion catalogClient
38 changes: 33 additions & 5 deletions src/httpClients/catalogClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { IConfig } from 'config';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils';
import { IRasterCatalogUpsertRequestBody, LayerMetadata, Link, RecordType } from '@map-colonies/mc-model-types';
import { IngestionUpdateJobParams, IRasterCatalogUpsertRequestBody, LayerMetadata, Link, RecordType } from '@map-colonies/mc-model-types';
import { inject, injectable } from 'tsyringe';
import { SERVICES } from '../common/constants';
import { ExtendedNewRasterLayer } from '../common/interfaces';
import { PublishLayerError } from '../common/errors';
import { ExtendedNewRasterLayer, ICatalogUpdateRequestBody } from '../common/interfaces';
import { internalIdSchema } from '../utils/zod/schemas/jobParametersSchema';
import { PublishLayerError, UpdateLayerError } from '../common/errors';
import { ILinkBuilderData, LinkBuilder } from '../utils/linkBuilder';
import { PolygonPartMangerClient } from './polygonPartMangerClient';

Expand Down Expand Up @@ -39,8 +40,22 @@ export class CatalogClient extends HttpClient {
}
}

public async update(job: IJobResponse<IngestionUpdateJobParams, unknown>): Promise<void> {
try {
const internalId = internalIdSchema.parse(job).internalId;
const url = `/records/${internalId}`;
const updateReq: ICatalogUpdateRequestBody = this.createUpdateReqBody(job);
await this.put(url, updateReq);
} catch (err) {
if (err instanceof Error) {
throw new UpdateLayerError(this.targetService, err);
}
}
}

private createPublishReqBody(job: IJobResponse<ExtendedNewRasterLayer, unknown>, layerName: string): IRasterCatalogUpsertRequestBody {
const metadata = this.mapToCatalogRecordMetadata(job);
const metadata = this.mapToPublishCatalogRecordMetadata(job);

const links = this.buildLinks(layerName);

return {
Expand All @@ -49,7 +64,7 @@ export class CatalogClient extends HttpClient {
};
}

private mapToCatalogRecordMetadata(job: IJobResponse<ExtendedNewRasterLayer, unknown>): LayerMetadata {
private mapToPublishCatalogRecordMetadata(job: IJobResponse<ExtendedNewRasterLayer, unknown>): LayerMetadata {
const { parameters, version } = job;
const { partsData, metadata } = parameters;

Expand Down Expand Up @@ -90,4 +105,17 @@ export class CatalogClient extends HttpClient {

return this.linkBuilder.createLinks(linkBuildData);
}

private createUpdateReqBody(job: IJobResponse<IngestionUpdateJobParams, unknown>): ICatalogUpdateRequestBody {
const { parameters, version } = job;
const { partsData, metadata } = parameters;
const aggregatedPartData = this.polygonPartMangerClient.getAggregatedPartData(partsData);
return {
metadata: {
productVersion: version,
classification: metadata.classification,
...aggregatedPartData,
},
};
}
}
33 changes: 33 additions & 0 deletions src/job/models/jobHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* eslint-disable @typescript-eslint/member-ordering */
import { IJobResponse, TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Logger } from '@map-colonies/js-logger';
import { layerNameSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { FinalizeTaskParams } from '../../common/interfaces';

export class JobHandler {
public constructor(protected readonly logger: Logger, protected readonly queueClient: QueueClient) {}

protected validateAndGenerateLayerName(job: IJobResponse<unknown, unknown>): string {
const layerName = layerNameSchema.parse(job);
const { resourceId, productType } = layerName;
this.logger.debug({ msg: 'layer name validation passed', resourceId, productType });
return `${resourceId}_${productType}`;
}

protected async markFinalizeStepAsCompleted<T extends FinalizeTaskParams>(
jobId: string,
taskId: string,
finalizeTaskParams: T,
step: keyof T
): Promise<T> {
const updatedParams: T = { ...finalizeTaskParams, [step]: true };
await this.queueClient.jobManagerClient.updateTask(jobId, taskId, { parameters: updatedParams });
this.logger.debug({ msg: `finalization step completed`, step });
return updatedParams;
}

protected isAllStepsCompleted<T>(steps: Record<keyof T, boolean>): boolean {
this.logger.debug({ msg: 'checking if all steps are completed', steps });
return Object.values(steps).every((step) => step);
}
}
46 changes: 15 additions & 31 deletions src/job/models/newJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ import { TilesMimeFormat, lookup as mimeLookup } from '@map-colonies/types';
import { IngestionNewFinalizeTaskParams, NewRasterLayer, NewRasterLayerMetadata } from '@map-colonies/mc-model-types';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { Grid, IJobHandler, MergeTilesTaskParams, ExtendedRasterLayerMetadata, ExtendedNewRasterLayer } from '../../common/interfaces';
import { FinalizeSteps, SERVICES } from '../../common/constants';
import { SERVICES } from '../../common/constants';
import { getTileOutputFormat } from '../../utils/imageFormatUtil';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
import { MapproxyApiClient } from '../../httpClients/mapproxyClient';
import { GeoserverClient } from '../../httpClients/geoserverClient';
import { CatalogClient } from '../../httpClients/catalogClient';
import { JobHandler } from './jobHandler';

@injectable()
export class NewJobHandler implements IJobHandler {
export class NewJobHandler extends JobHandler implements IJobHandler {
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.LOGGER) logger: Logger,
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient,
@inject(SERVICES.QUEUE_CLIENT) queueClient: QueueClient,
@inject(CatalogClient) private readonly catalogClient: CatalogClient,
@inject(MapproxyApiClient) private readonly mapproxyClient: MapproxyApiClient,
@inject(GeoserverClient) private readonly geoserverClient: GeoserverClient,
@inject(CatalogClient) private readonly catalogClient: CatalogClient
) {}
@inject(GeoserverClient) private readonly geoserverClient: GeoserverClient
) {
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<NewRasterLayer, unknown>, taskId: string): Promise<void> {
const logger = this.logger.child({ jobId: job.id, jobType: job.type, taskId });
Expand Down Expand Up @@ -78,26 +81,26 @@ export class NewJobHandler implements IJobHandler {

let finalizeTaskParams: IngestionNewFinalizeTaskParams = task.parameters;
const { insertedToMapproxy, insertedToGeoServer, insertedToCatalog } = finalizeTaskParams;
const { productName, productType, layerRelativePath, tileOutputFormat } = job.parameters.metadata;
const layerName = this.generateLayerName(productName, productType);
const { layerRelativePath, tileOutputFormat } = job.parameters.metadata;
const layerName = this.validateAndGenerateLayerName(job);

if (!insertedToMapproxy) {
logger.info({ msg: 'publishing to mapproxy', layerName, layerRelativePath, tileOutputFormat });
await this.mapproxyClient.publish(layerName, layerRelativePath, tileOutputFormat);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, 'insertedToMapproxy', finalizeTaskParams);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, finalizeTaskParams, 'insertedToMapproxy');
}

if (!insertedToGeoServer) {
const geoserverLayerName = layerName.toLowerCase();
logger.info({ msg: 'publishing to geoserver', geoserverLayerName });
await this.geoserverClient.publish(geoserverLayerName);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, 'insertedToGeoServer', finalizeTaskParams);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, finalizeTaskParams, 'insertedToGeoServer');
}

if (!insertedToCatalog) {
logger.info({ msg: 'publishing to catalog', layerName });
await this.catalogClient.publish(job, layerName);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, 'insertedToCatalog', finalizeTaskParams);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, finalizeTaskParams, 'insertedToCatalog');
}

if (this.isAllStepsCompleted(finalizeTaskParams)) {
Expand Down Expand Up @@ -132,23 +135,4 @@ export class NewJobHandler implements IJobHandler {
grid,
};
};

private generateLayerName(productId: string, productType: string): string {
return `${productId}_${productType}`;
}

private isAllStepsCompleted(finalizeTaskParams: IngestionNewFinalizeTaskParams): boolean {
return Object.values(finalizeTaskParams).every((value) => value);
}

private async markFinalizeStepAsCompleted(
jobId: string,
taskId: string,
step: FinalizeSteps,
finalizeTaskParams: IngestionNewFinalizeTaskParams
): Promise<IngestionNewFinalizeTaskParams> {
const updatedParams: IngestionNewFinalizeTaskParams = { ...finalizeTaskParams, [step]: true };
await this.queueClient.jobManagerClient.updateTask(jobId, taskId, { parameters: updatedParams });
return updatedParams;
}
}
48 changes: 37 additions & 11 deletions src/job/models/updateJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ import { ZodError } from 'zod';
import { inject, injectable } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { OperationStatus, TaskHandler as QueueClient, IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { IngestionUpdateJobParams, UpdateRasterLayer } from '@map-colonies/mc-model-types';
import { IngestionUpdateFinalizeTaskParams, IngestionUpdateJobParams } from '@map-colonies/mc-model-types';
import { CatalogClient } from '../../httpClients/catalogClient';
import { Grid, IConfig, IJobHandler, MergeTilesTaskParams } from '../../common/interfaces';
import { SERVICES } from '../../common/constants';
import { UpdateAdditionalParams, updateAdditionalParamsSchema } from '../../utils/zod/schemas/jobParametersSchema';
import { TileMergeTaskManager } from '../../task/models/tileMergeTaskManager';
import { JobHandler } from './jobHandler';

@injectable()
export class UpdateJobHandler implements IJobHandler {
private readonly isNewTarget: boolean;
export class UpdateJobHandler extends JobHandler implements IJobHandler {
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.LOGGER) logger: Logger,
@inject(SERVICES.CONFIG) private readonly config: IConfig,
@inject(TileMergeTaskManager) private readonly taskBuilder: TileMergeTaskManager,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient
@inject(SERVICES.QUEUE_CLIENT) protected queueClient: QueueClient,
@inject(CatalogClient) private readonly catalogClient: CatalogClient
) {
this.isNewTarget = config.get<boolean>('jobManagement.ingestion.tasks.tilesMerging.useNewTargetFlagInUpdate');
super(logger, queueClient);
}

public async handleJobInit(job: IJobResponse<IngestionUpdateJobParams, unknown>, taskId: string): Promise<void> {
Expand All @@ -33,7 +35,7 @@ export class UpdateJobHandler implements IJobHandler {
taskMetadata: {
layerRelativePath: `${job.internalId}/${validAdditionalParams.displayPath}`,
tileOutputFormat: validAdditionalParams.tileOutputFormat,
isNewTarget: this.isNewTarget,
isNewTarget: false,
grid: Grid.TWO_ON_ONE,
},
partsData,
Expand Down Expand Up @@ -61,10 +63,34 @@ export class UpdateJobHandler implements IJobHandler {
}
}

public async handleJobFinalize(job: IJobResponse<UpdateRasterLayer, unknown>, task: ITaskResponse<unknown>): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id });
logger.info({ msg: `handling ${job.type} job with "finalize" task` });
await Promise.reject('not implemented');
public async handleJobFinalize(
job: IJobResponse<IngestionUpdateJobParams, unknown>,
task: ITaskResponse<IngestionUpdateFinalizeTaskParams>
): Promise<void> {
const logger = this.logger.child({ jobId: job.id, taskId: task.id, jobType: job.type, taskType: task.type });
try {
logger.info({ msg: `handling ${job.type} job with ${task.type} task` });
let finalizeTaskParams: IngestionUpdateFinalizeTaskParams = task.parameters;
const { updatedInCatalog } = finalizeTaskParams;

if (!updatedInCatalog) {
logger.info({ msg: 'Updating layer in catalog', catalogId: job.internalId });
await this.catalogClient.update(job);
finalizeTaskParams = await this.markFinalizeStepAsCompleted(job.id, task.id, finalizeTaskParams, 'updatedInCatalog');
}

if (this.isAllStepsCompleted(finalizeTaskParams)) {
logger.info({ msg: 'All finalize steps completed successfully', ...finalizeTaskParams });
await this.queueClient.ack(job.id, task.id);
await this.queueClient.jobManagerClient.updateJob(job.id, { status: OperationStatus.COMPLETED, reason: 'Job completed successfully' });
}
} catch (err) {
if (err instanceof Error) {
const errorMsg = `Failed to handle job finalize: ${err.message}`;
logger.error({ msg: errorMsg, error: err });
await this.queueClient.reject(job.id, task.id, true, err.message);
}
}
}

private validateAdditionalParams(additionalParams: Record<string, unknown>): UpdateAdditionalParams {
Expand Down
11 changes: 11 additions & 0 deletions src/utils/zod/schemas/jobParametersSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ export const updateAdditionalParamsSchema = z.object({
});

export type UpdateAdditionalParams = z.infer<typeof updateAdditionalParamsSchema>;

export const layerNameSchema = z.object({
resourceId: z.string(),
productType: z.string(),
});

export type LayerName = z.infer<typeof layerNameSchema>;

export const internalIdSchema = z.object({
internalId: z.string().uuid(),
});
Loading
Loading