Skip to content

Commit

Permalink
feat: added polling jobs status system
Browse files Browse the repository at this point in the history
  • Loading branch information
Shlomi Koncha committed Aug 23, 2022
1 parent 786bccc commit 4fcdad4
Show file tree
Hide file tree
Showing 20 changed files with 372 additions and 57 deletions.
8 changes: 7 additions & 1 deletion config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,11 @@
"taskType": "WORKER_TYPES_TILES_TASK_TYPE"
}
},
"tilesProvider": "TILES_PROVIDER"
"tilesProvider": "TILES_PROVIDER",
"gpkgsLocation": "GPKGS_LOCATION",
"downloadServerUrl": "DOWNLOAD_SERVER_URL",
"pollingTimeoutMS": {
"__name": "POLLING_TIMEOUT_MS",
"__format": "number"
}
}
5 changes: 4 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,8 @@
"taskType": "rasterTilesExporter"
}
},
"tilesProvider": "S3"
"tilesProvider": "S3",
"gpkgsLocation": "/app/tiles_outputs/gpkgs",
"downloadServerUrl": "http://download-service",
"pollingTimeoutMS": 15000
}
4 changes: 3 additions & 1 deletion helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ data:
HTTP_RETRY_ATTEMPTS: {{ .Values.env.httpRetry.attempts | quote }}
HTTP_RETRY_DELAY: {{ .Values.env.httpRetry.delay | quote }}
HTTP_RETRY_SHOULD_RESET_TIMEOUT: {{ .Values.env.httpRetry.shouldResetTimeout | quote }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider | quote }}
DOWNLOAD_SERVER_URL: {{ .Values.rasterCommon.serviceUrls.downloadServer | quote }}
POLLING_TIMEOUT_MS: {{ .Values.env.pollingTimeoutMS | quote }}
{{- end }}
13 changes: 12 additions & 1 deletion helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
{{- $cloudProviderImagePullSecretName := include "exporter-trigger.cloudProviderImagePullSecretName" . -}}
{{- $imageTag := include "exporter-trigger.tag" . -}}
{{- if .Values.enabled -}}

{{ $gpkgPath := (printf "%s%s" "/app/tiles_outputs/" .Values.rasterCommon.storage.fs.internalPvc.gpkgSubPath) }}

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -48,18 +51,23 @@ spec:
env:
- name: SERVER_PORT
value: {{ .Values.env.targetPort | quote }}
- name: GPKGS_LOCATION
value: {{ $gpkgPath }}
envFrom:
- configMapRef:
name: {{ $releaseName }}-{{ $chartName }}-configmap
ports:
- name: http
containerPort: {{ .Values.env.targetPort }}
protocol: {{ .Values.env.protocol }}
protocol: TCP
livenessProbe:
initialDelaySeconds: {{ .Values.initialDelaySeconds }}
httpGet:
path: /liveness
port: http
volumeMounts:
- name: internal-storage
mountPath: /app/tiles_outputs
{{- if .Values.resources.enabled }}
resources:
{{- toYaml .Values.resources.value | nindent 12 }}
Expand Down Expand Up @@ -126,4 +134,7 @@ spec:
configMap:
name: {{ .Release.Name }}-{{ $chartName }}-envoy
{{- end }}
- name: internal-storage
persistentVolumeClaim:
claimName: {{ .Values.rasterCommon.storage.fs.internalPvc.name }}
{{- end -}}
10 changes: 8 additions & 2 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rasterCommon:
serviceUrls:
catalogManager: "http://catalog-raster-dev-raster-catalog-manager"
jobManager: "http://job-manager-raster-dev-discrete-ingestion-db"
downloadServer: "http://files-server-raster-dev-files-server"
jobManagement:
exporter:
jobType: tilesExport
Expand All @@ -18,7 +19,11 @@ rasterCommon:
customHeaderName: ""
storage:
tilesStorageProvider: 'FS'

fs:
internalPvc:
enabled: true
name: internal-pvc
gpkgSubPath: gpkgs
enabled: true
environment: development
replicaCount: 1
Expand Down Expand Up @@ -73,6 +78,7 @@ env:
logPrettyPrintEnabled: false
responseCompressionEnabled: true
requestPayloadLimit: 1mb
pollingTimeoutMS: 15000
tracing:
enabled: false
url: http://localhost:55681/v1/trace
Expand All @@ -97,7 +103,7 @@ resources:
memory: 128Mi

route:
enabled: true
enabled: false
path: /
timeout:
enabled: false # defaults to 30s by openshift
Expand Down
18 changes: 18 additions & 0 deletions src/clients/callbackClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { inject, singleton } from 'tsyringe';
import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils';
import { Logger } from '@map-colonies/js-logger';
import { BBox } from '@turf/helpers';
import { SERVICES } from '../common/constants';
import { ICallbackData, IConfig } from '../common/interfaces';

@singleton()
export class CallbackClient extends HttpClient {
public constructor(@inject(SERVICES.LOGGER) logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {
super(logger, '', 'requestCallback', config.get<IHttpRetryConfig>('httpRetry'));
}

public async send(callbackUrl: string, params: ICallbackData): Promise<void> {
this.logger.info(`send Callback request to URL: ${callbackUrl} with data ${JSON.stringify(params)}`);
await this.post(callbackUrl, params);
}
}
23 changes: 22 additions & 1 deletion src/clients/jobManagerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
JobResponse,
TaskResponse,
} from '../common/interfaces';

//this is the job manager api for find job DO NOT MODIFY
interface IFindJob {
resourceId?: string;
Expand Down Expand Up @@ -62,6 +61,7 @@ export class JobManagerWrapper extends JobManagerClient {
zoomLevel: data.zoomLevel,
callbacks: data.callbacks,
crs: data.crs,
fileName: data.fileName
},
internalId: data.dbId,
productType: data.productType,
Expand Down Expand Up @@ -149,6 +149,27 @@ export class JobManagerWrapper extends JobManagerClient {
return tasks;
}

public async getJobsStatus(): Promise<JobResponse[] | undefined> {
const queryParams: IFindJob = {
isCleaned: 'false',
type: this.tilesJobType,
shouldReturnTasks: 'false',
status: OperationStatus.IN_PROGRESS,
};

const jobs = await this.getJobs(queryParams);
return jobs;
}

public async updateJobStatus(jobId: string, status: OperationStatus, reason?: string, catalogId?: string): Promise<void> {
const updateJobUrl = `/jobs/${jobId}`;
await this.put(updateJobUrl, {
status: status,
reason: reason,
internalId: catalogId,
});
}

private async getJobs(queryParams: IFindJob): Promise<JobResponse[] | undefined> {
this.logger.info(`Getting jobs that match these parameters: ${JSON.stringify(queryParams)}`);
const jobs = await this.get<JobResponse[] | undefined>('/jobs', queryParams as unknown as Record<string, unknown>);
Expand Down
9 changes: 9 additions & 0 deletions src/common/enums.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//TODO: replace with model
export enum OperationStatus {
PENDING = 'Pending',
IN_PROGRESS = 'In-Progress',
COMPLETED = 'Completed',
FAILED = 'Failed',
EXPIRED = 'Expired',
ABORTED = 'Aborted',
}
30 changes: 24 additions & 6 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BBox2d } from '@turf/helpers/dist/js/lib/geojson';
import { BBox, BBox2d, MultiPolygon, Polygon } from '@turf/helpers/dist/js/lib/geojson';
import { ICreateJobBody, IJobResponse, ITaskResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { ITileRange } from '@map-colonies/mc-utils';

Expand Down Expand Up @@ -31,6 +31,7 @@ export interface ICallbackTarget {
export interface IWorkerInput {
dbId: string;
targetResolution: number;
fileName: string;
priority?: number;
callbacks: ICallbackTarget[];
crs: string;
Expand All @@ -53,20 +54,21 @@ export interface ICreateJobResponse {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

export interface ICallbackParams {
export interface ICallbackDataBase {
fileUri: string;
expirationTime: Date;
fileSize: number;
dbId: string;
packageName: string;
bbox: BBox2d | true;
targetResolution: number;
requestId: string;
success: boolean;
errorReason?: string;
}

export interface ICallbackResposne extends ICallbackParams {
export interface ICallbackData extends ICallbackDataBase {
bbox: BBox;
}
export interface ICallbackResposne extends ICallbackData {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

Expand All @@ -85,7 +87,8 @@ export interface IJobParameters {
callbacks: ICallbackTarget[];
sanitizedBbox: BBox2d;
zoomLevel: number;
callbackParams?: ICallbackParams;
callbackParams?: ICallbackDataBase;
fileName?: string;
}

export declare type MergerSourceType = 'S3' | 'GPKG' | 'FS';
Expand All @@ -105,6 +108,21 @@ export interface ITaskParameters {
sources: IMapSource[];
}

export interface IInput {
jobId: string;
footprint?: Polygon | MultiPolygon;
bbox: BBox | true;
zoomLevel: number;
packageName: string;
callbackURLs: string[];
dbId: string;
}

export interface IJobStatusResponse {
completed: JobResponse[] | undefined;
failed: JobResponse[] | undefined;
}

export type JobResponse = IJobResponse<IJobParameters, ITaskParameters>;
export type TaskResponse = ITaskResponse<ITaskParameters>;
export type CreateJobBody = ICreateJobBody<IJobParameters, ITaskParameters>;
20 changes: 20 additions & 0 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { promises as fsPromise } from 'fs';
import { join } from 'path';
import { BBox, BBox2d } from '@turf/helpers/dist/js/lib/geojson';

export const getFileSize = async (filePath: string): Promise<number> => {
const fileSizeInBytes = (await fsPromise.stat(filePath)).size;
return Math.trunc(fileSizeInBytes); // Make sure we return an Integer
};

export const generatePackageName = (dbId: string, zoomLevel: number, bbox: BBox): string => {
const numberOfDecimals = 5;
const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join('');
return `gm_${dbId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`;
}

export const getGpkgFilePath = (gpkgsLocation: string, packageName: string): string => {
const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.'));
const packageFullPath = join(gpkgsLocation, packageDirectoryName as string, packageName);
return packageFullPath;
}
4 changes: 3 additions & 1 deletion src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { createPackageRouterFactory, CREATE_PACKAGE_ROUTER_SYMBOL } from './createPackage/routes/createPackageRouter';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './createPackage/routes/tasksRouter';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './tasks/routes/tasksRouter';
import { PollingManager, POLLING_MANGER_SYMBOL } from './pollingManager';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand All @@ -33,6 +34,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: SERVICES.METER, provider: { useValue: meter } },
{ token: CREATE_PACKAGE_ROUTER_SYMBOL, provider: { useFactory: createPackageRouterFactory } },
{ token: TASKS_ROUTER_SYMBOL, provider: { useFactory: tasksRouterFactory } },
{ token: POLLING_MANGER_SYMBOL, provider: { useClass: PollingManager } },
{
token: 'onSignal',
provider: {
Expand Down
21 changes: 15 additions & 6 deletions src/createPackage/models/createPackageManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { sep } from 'path';
import { promises as fsPromise } from 'fs';
import { generatePackageName, getGpkgFilePath } from '../../common/utils';
import { sep, join, dirname } from 'path';
import config from 'config';
import { Logger } from '@map-colonies/js-logger';
import { Polygon, MultiPolygon, BBox, bbox as PolygonBbox, intersect, bboxPolygon } from '@turf/turf';
Expand Down Expand Up @@ -27,11 +29,13 @@ import { JobManagerWrapper } from '../../clients/jobManagerWrapper';
@injectable()
export class CreatePackageManager {
private readonly tilesProvider: MergerSourceType;
private readonly gpkgsLocation: string;
public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(JobManagerWrapper) private readonly jobManagerClient: JobManagerWrapper,
@inject(RasterCatalogManagerClient) private readonly rasterCatalogManager: RasterCatalogManagerClient
) {
this.gpkgsLocation = config.get<string>('gpkgsLocation');
this.tilesProvider = config.get('tilesProvider');
this.tilesProvider = this.tilesProvider.toUpperCase() as MergerSourceType;
}
Expand Down Expand Up @@ -71,9 +75,11 @@ export class CreatePackageManager {
batches.push(bboxToTileRange(sanitizedBbox, i));
}
const separator = this.getSeparator();
const packageName = generatePackageName(dbId, zoomLevel, sanitizedBbox);
const packageFullPath = getGpkgFilePath(this.gpkgsLocation, packageName);
const sources: IMapSource[] = [
{
path: this.generatePackageName(dbId, zoomLevel, sanitizedBbox), //gpkg path
path: packageFullPath,
type: 'GPKG',
extent: {
minX: bbox[0],
Expand All @@ -90,6 +96,7 @@ export class CreatePackageManager {
const workerInput: IWorkerInput = {
sanitizedBbox,
targetResolution,
fileName: packageName,
zoomLevel,
dbId,
version: version as string,
Expand Down Expand Up @@ -122,10 +129,12 @@ export class CreatePackageManager {
return bbox;
}

private generatePackageName(cswId: string, zoomLevel: number, bbox: BBox): string {
const numberOfDecimals = 5;
const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join('');
return `gm_${cswId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`;
public async createJsonMetadata(filePath: string, dbId: string): Promise<void> {
const fileName = 'metadata.json';
const metadataFilePath = join(dirname(filePath), fileName);
const record = await this.rasterCatalogManager.findLayer(dbId);
const recordMetadata = JSON.stringify(record.metadata);
await fsPromise.writeFile(metadataFilePath, recordMetadata);
}

private async checkForDuplicate(
Expand Down
34 changes: 0 additions & 34 deletions src/createPackage/models/tasksManager.ts

This file was deleted.

Loading

0 comments on commit 4fcdad4

Please sign in to comment.