Skip to content

Commit

Permalink
feat: Ingestion new finalize (MAPCO-4785) (#15)
Browse files Browse the repository at this point in the history
* feat: add FinalizeTaskParams interface

* feat: ingestion new finalzie process

* feat: ack only if all publish stepscompleted and adding tests to new job finalize

* fix: pr comments

* fix: geoserverLAyerName convert to lower case outside of geoserverClient
  • Loading branch information
almog8k authored Oct 14, 2024
1 parent 8cadff8 commit 1296b9b
Show file tree
Hide file tree
Showing 32 changed files with 1,033 additions and 48 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"typescript.tsdk": "node_modules/typescript/lib",
"files.eol": "\n"
"files.eol": "\n",
"cSpell.words": ["geoserver", "Mapproxy"]
}
11 changes: 11 additions & 0 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@
},
"dequeueIntervalMs": "DEQUEUE_INTERVAL_MS"
},
"servicesUrl": {
"mapproxyApi": "MAPPROXY_API_URL",
"geoserverApi": "GEOSERVER_API_URL",
"catalogManager": "CATALOG_MANAGER_URL",
"mapproxyDns": "MAPPROXY_DNS",
"polygonPartManager": "POLYGON_PART_MANAGER_URL"
},
"geoserver": {
"workspace": "GEOSERVER_WORKSPACE",
"datastore": "GEOSERVER_DATASTORE"
},
"ingestion": {
"pollingTasks": {
"init": "INGESTION_POLLING_INIT_TASK",
Expand Down
14 changes: 13 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,23 @@
},
"disableHttpClientLogs": true,
"tilesStorageProvider": "FS",
"linkTemplatesPath": "config/linkTemplates.template",
"servicesUrl": {
"mapproxyApi": "http://localhost:8083",
"geoserverApi": "http://localhost:8084",
"catalogManager": "http://localhost:8085",
"mapproxyDns": "http://localhost:8086",
"polygonPartManager": "http://localhost:8087"
},
"geoserver": {
"workspace": "polygonParts",
"dataStore": "polygonParts"
},
"jobManagement": {
"config": {
"jobManagerBaseUrl": "http://localhost:8081",
"heartbeat": {
"baseUrl": "http://localhost:8083",
"baseUrl": "http://localhost:8082",
"intervalMs": 3000
},
"dequeueIntervalMs": 3000
Expand Down
38 changes: 38 additions & 0 deletions config/linkTemplates.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[
{
"name": "{{layerName}}",
"description": "",
"protocol": "WMS",
"url": "{{serverUrl}}/service?REQUEST=GetCapabilities"
},
{
"name": "{{layerName}}",
"description": "",
"protocol": "WMS_BASE",
"url": "{{serverUrl}}/wms"
},
{
"name": "{{layerName}}",
"description": "",
"protocol": "WMTS",
"url": "{{serverUrl}}/wmts/1.0.0/WMTSCapabilities.xml"
},
{
"name": "{{layerName}}",
"description": "",
"protocol": "WMTS_KVP",
"url": "{{serverUrl}}/service?REQUEST=GetCapabilities&SERVICE=WMTS"
},
{
"name": "{{layerName}}",
"description": "",
"protocol": "WMTS_BASE",
"url": "{{serverUrl}}/wmts"
},
{
"name": "{{layerName}}",
"description": "",
"protocol": "WFS",
"url": "{{serverUrl}}/wfs?request=GetCapabilities"
}
]
7 changes: 7 additions & 0 deletions helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ data:
TILES_MERGING_TILE_BATCH_SIZE: {{ .Values.env.jobManagement.ingestion.tasks.tilesMerging.tileBatchSize | quote }}
TILES_MERGING_TASK_BATCH_SIZE: {{ .Values.env.jobManagement.ingestion.tasks.tilesMerging.taskBatchSize | quote }}
TILES_MERGING_USE_NEW_TARGET_FLAG: {{ .Values.env.jobManagement.ingestion.tasks.tilesMerging.useNewTargetFlagInUpdate | quote }}
MAPPROXY_API_URL: {{ $serviceUrls.mapproxyApi | quote }}
GEOSERVER_API_URL: {{ $serviceUrls.geoserverApi | quote }}
CATALOG_MANAGER_URL: {{ $serviceUrls.catalogManager | quote }}
MAPPROXY_DNS: {{ $serviceUrls.mapServerPublicDNS | quote }}
POLYGON_PART_MANAGER_URL: {{ $serviceUrls.polygonPartManager | quote }}
GEOSERVER_WORKSPACE: {{ .Values.global.geoserver.workspace | quote }}
GEOSERVER_DATASTORE: {{ .Values.global.geoserver.dataStore | quote }}
{{- end }}
15 changes: 15 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ global:
serviceUrls:
jobManager: ""
heartbeatManager: ""
catalogManager: ""
mapServerPublicDNS: ""
mapproxyApi: ""
geoserverApi: ""
polygonPartManager: ""
geoserver:
workspace: ''
dataStore: ''
ca:
secretName: ''
path: '/usr/local/share/ca-certificates'
Expand Down Expand Up @@ -69,6 +77,11 @@ image:
serviceUrls:
jobManager: ""
heartbeatManager: ""
catalogManager: ""
mapServerPublicDNS: ""
mapproxyApi: ""
geoserverApi: ""
polygonPartManager: ""

tracing:
enabled: false
Expand All @@ -92,6 +105,8 @@ metrics:
- 50
- 250
- 500
prometheus:
scrape: false

env:
port: 8080
Expand Down
23 changes: 23 additions & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { IngestionNewFinalizeTaskParams } from '@map-colonies/mc-model-types';
import { readPackageJsonSync } from '@map-colonies/read-pkg';

export const SERVICE_NAME = readPackageJsonSync().name ?? 'unknown_service';
Expand All @@ -16,4 +17,26 @@ export const SERVICES = {
TILE_RANGER: Symbol('TileRanger'),
} satisfies Record<string, symbol>;

export const TilesStorageProvider = {
FS: 'FS',
S3: 'S3',
} as const;

export type TilesStorageProvider = (typeof TilesStorageProvider)[keyof typeof TilesStorageProvider];

export const PublishedLayerCacheType = {
FS: 'file',
S3: 's3',
REDIS: 'redis',
} as const;

export type PublishedLayerCacheType = (typeof PublishedLayerCacheType)[keyof typeof PublishedLayerCacheType];

export const storageProviderToCacheTypeMap = new Map([
[TilesStorageProvider.FS, PublishedLayerCacheType.FS],
[TilesStorageProvider.S3, PublishedLayerCacheType.S3],
]);

export type FinalizeSteps = keyof IngestionNewFinalizeTaskParams;

/* eslint-enable @typescript-eslint/naming-convention */
15 changes: 15 additions & 0 deletions src/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,18 @@ export class UnsupportedTransparencyError extends Error {
this.name = UnsupportedTransparencyError.name;
}
}

export class UnsupportedStorageProviderError extends Error {
public constructor(storageProvider: string) {
super(`Unsupported storage provider: ${storageProvider}`);
this.name = UnsupportedStorageProviderError.name;
}
}

export class PublishLayerError extends Error {
public constructor(publishingClient: string, layerName: string, err: Error) {
super(`Failed to publish ${layerName} layer to ${publishingClient} client: ${err.message}`);
this.name = PublishLayerError.name;
this.stack = err.stack;
}
}
79 changes: 60 additions & 19 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { IJobResponse, ITaskResponse } from '@map-colonies/mc-priority-queue';
import { InputFiles, NewRasterLayerMetadata, PolygonPart, TileOutputFormat } from '@map-colonies/mc-model-types';
import { GeoJSON } from 'geojson';
import { InputFiles, NewRasterLayerMetadata, PolygonPart, TileOutputFormat, LayerData } from '@map-colonies/mc-model-types';
import { TilesMimeFormat } from '@map-colonies/types';
import { BBox, Polygon } from 'geojson';
import { Footprint, ITileRange } from '@map-colonies/mc-utils';
import { PublishedLayerCacheType } from './constants';

//#region config interfaces
export interface IConfig {
Expand Down Expand Up @@ -49,11 +51,12 @@ export interface IngestionConfig {
}
//#endregion config

//#region job/task interfaces
export interface IJobHandler {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobInit: (job: IJobResponse<any, any>, taskId: string) => Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
handleJobFinalize: (job: IJobResponse<any, any>, taskId: string) => Promise<void>;
handleJobFinalize: (job: IJobResponse<any, any>, task: ITaskResponse<any>) => Promise<void>;
}

export interface JobAndTaskResponse {
Expand All @@ -72,23 +75,22 @@ export interface ExtendedRasterLayerMetadata extends NewRasterLayerMetadata {
grid: Grid;
}

export interface MergeTilesTaskParams {
inputFiles: InputFiles;
taskMetadata: MergeTilesMetadata;
partsData: PolygonPart[];
}
export type ExtendedNewRasterLayer = { metadata: ExtendedRasterLayerMetadata } & LayerData;

export interface MergeTilesMetadata {
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
isNewTarget: boolean;
grid: Grid;
}
//#endregion job/task

//#region merge task

export enum Grid {
TWO_ON_ONE = '2x1',
}
//#region task

export interface IBBox {
minX: number;
minY: number;
maxX: number;
maxY: number;
}
export interface IPartSourceContext {
fileName: string;
tilesPath: string;
Expand Down Expand Up @@ -129,11 +131,50 @@ export interface IntersectionState {
accumulatedIntersection: Footprint | null;
currentIntersection: Footprint | null;
}

export interface MergeTilesTaskParams {
inputFiles: InputFiles;
taskMetadata: MergeTilesMetadata;
partsData: PolygonPart[];
}

export interface MergeTilesMetadata {
layerRelativePath: string;
tileOutputFormat: TileOutputFormat;
isNewTarget: boolean;
grid: Grid;
}
//#endregion task

export interface IBBox {
minX: number;
minY: number;
maxX: number;
maxY: number;
//#region mapproxyApi
export interface IPublishMapLayerRequest {
name: string;
tilesPath: string;
cacheType: PublishedLayerCacheType;
format: TileOutputFormat;
}
//#endregion mapproxyApi

//#region geoserverApi
export interface IInsertGeoserverRequest {
nativeName: string;
}
//#endregion geoserverApi

//#region catalogClient

export interface PartAggregatedData {
imagingTimeBeginUTC: Date;
imagingTimeEndUTC: Date;
minHorizontalAccuracyCE90: number;
maxHorizontalAccuracyCE90: number;
sensors: string[];
maxResolutionDeg: number;
minResolutionDeg: number;
maxResolutionMeter: number;
minResolutionMeter: number;
footprint: GeoJSON;
productBoundingBox: string;
}

//#endregion catalogClient
93 changes: 93 additions & 0 deletions src/httpClients/catalogClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { IConfig } from 'config';
import { Logger } from '@map-colonies/js-logger';
import { IJobResponse } from '@map-colonies/mc-priority-queue';
import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils';
import { IRasterCatalogUpsertRequestBody, LayerMetadata, Link, RecordType } from '@map-colonies/mc-model-types';
import { inject, injectable } from 'tsyringe';
import { SERVICES } from '../common/constants';
import { ExtendedNewRasterLayer } from '../common/interfaces';
import { PublishLayerError } from '../common/errors';
import { ILinkBuilderData, LinkBuilder } from '../utils/linkBuilder';
import { PolygonPartMangerClient } from './polygonPartMangerClient';

@injectable()
export class CatalogClient extends HttpClient {
private readonly mapproxyDns: string;
public constructor(
@inject(SERVICES.CONFIG) private readonly config: IConfig,
@inject(SERVICES.LOGGER) protected readonly logger: Logger,
private readonly linkBuilder: LinkBuilder,
private readonly polygonPartMangerClient: PolygonPartMangerClient
) {
const serviceName = 'RasterCatalogManager';
const baseUrl = config.get<string>('servicesUrl.catalogManager');
const httpRetryConfig = config.get<IHttpRetryConfig>('httpRetry');
const disableHttpClientLogs = config.get<boolean>('disableHttpClientLogs');
super(logger, baseUrl, serviceName, httpRetryConfig, disableHttpClientLogs);
this.mapproxyDns = config.get<string>('servicesUrl.mapproxyDns');
}

public async publish(job: IJobResponse<ExtendedNewRasterLayer, unknown>, layerName: string): Promise<void> {
try {
const url = '/records';
const publishReq: IRasterCatalogUpsertRequestBody = this.createPublishReqBody(job, layerName);
await this.post(url, publishReq);
} catch (err) {
if (err instanceof Error) {
throw new PublishLayerError(this.targetService, layerName, err);
}
}
}

private createPublishReqBody(job: IJobResponse<ExtendedNewRasterLayer, unknown>, layerName: string): IRasterCatalogUpsertRequestBody {
const metadata = this.mapToCatalogRecordMetadata(job);
const links = this.buildLinks(layerName);

return {
metadata,
links,
};
}

private mapToCatalogRecordMetadata(job: IJobResponse<ExtendedNewRasterLayer, unknown>): LayerMetadata {
const { parameters, version } = job;
const { partData, metadata } = parameters;

const aggregatedPartData = this.polygonPartMangerClient.getAggregatedPartData(partData);

return {
id: metadata.catalogId,
type: RecordType.RECORD_RASTER,
classification: metadata.classification,
productName: metadata.productName,
description: metadata.description,
srs: metadata.srs,
srsName: metadata.srsName,
producerName: metadata.producerName,
region: metadata.region,
productId: metadata.productId,
productType: metadata.productType,
productSubType: metadata.productSubType,
displayPath: metadata.displayPath,
transparency: metadata.transparency,
scale: metadata.scale,
tileMimeFormat: metadata.tileMimeType,
tileOutputFormat: metadata.tileOutputFormat,
productVersion: version,
updateDateUTC: undefined,
creationDateUTC: undefined,
rms: undefined,
ingestionDate: new Date(),
...aggregatedPartData,
};
}

private buildLinks(layerName: string): Link[] {
const linkBuildData: ILinkBuilderData = {
layerName,
serverUrl: this.mapproxyDns,
};

return this.linkBuilder.createLinks(linkBuildData);
}
}
Loading

0 comments on commit 1296b9b

Please sign in to comment.