Skip to content

Commit

Permalink
feat: added polling jobs status system (#31)
Browse files Browse the repository at this point in the history
* feat: added polling jobs status system

* fix: lint errors

* fix: main poll loop

* fix: pr issues

* fix: pr issues

* test: added task manager unit test

* test: added and fixed unit-tests

* fix: mc-priority-queue version & tests

* fix: tests

* fix: spelling + lint error
  • Loading branch information
CL-SHLOMIKONCHA authored Sep 5, 2022
1 parent 786bccc commit 11960f1
Show file tree
Hide file tree
Showing 31 changed files with 865 additions and 158 deletions.
12 changes: 9 additions & 3 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
},
"jobManager": {
"url": "JOB_MANAGER_URL",
"expirationTime": {
"__name": "JOB_MANAGER_EXPIRATION_TIME",
"expirationDays": {
"__name": "JOB_MANAGER_EXPIRATION_DAYS",
"__format": "number"
}
},
Expand All @@ -66,5 +66,11 @@
"taskType": "WORKER_TYPES_TILES_TASK_TYPE"
}
},
"tilesProvider": "TILES_PROVIDER"
"tilesProvider": "TILES_PROVIDER",
"gpkgsLocation": "GPKGS_LOCATION",
"downloadServerUrl": "DOWNLOAD_SERVER_URL",
"pollingTimeoutMS": {
"__name": "POLLING_TIMEOUT_MS",
"__format": "number"
}
}
7 changes: 5 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
},
"jobManager": {
"url": "http://job-manager-job-manager",
"expirationTime": 30
"expirationDays": 30
},
"rasterCatalogManager": {
"url": "http://raster-catalog-manager"
Expand All @@ -43,5 +43,8 @@
"taskType": "rasterTilesExporter"
}
},
"tilesProvider": "S3"
"tilesProvider": "S3",
"gpkgsLocation": "/app/tiles_outputs/gpkgs",
"downloadServerUrl": "http://download-service",
"pollingTimeoutMS": 2000
}
6 changes: 4 additions & 2 deletions helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ data:
TELEMETRY_METRICS_URL: {{ $metricsUrl }}
{{ end }}
JOB_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.jobManager | quote }}
JOB_MANAGER_EXPIRATION_TIME: {{ .Values.env.jobManager.expirationTime | quote }}
JOB_MANAGER_EXPIRATION_DAYS: {{ .Values.env.jobManager.expirationDays | quote }}
RASTER_CATALOG_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.catalogManager | quote }}
WORKER_TYPES_TILES_JOB_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.jobType | quote }}
WORKER_TYPES_TILES_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.taskType | quote }}
HTTP_RETRY_ATTEMPTS: {{ .Values.env.httpRetry.attempts | quote }}
HTTP_RETRY_DELAY: {{ .Values.env.httpRetry.delay | quote }}
HTTP_RETRY_SHOULD_RESET_TIMEOUT: {{ .Values.env.httpRetry.shouldResetTimeout | quote }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider | quote }}
DOWNLOAD_SERVER_URL: {{ .Values.rasterCommon.serviceUrls.downloadServer | quote }}
POLLING_TIMEOUT_MS: {{ .Values.env.pollingTimeoutMS | quote }}
{{- end }}
13 changes: 12 additions & 1 deletion helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
{{- $cloudProviderImagePullSecretName := include "exporter-trigger.cloudProviderImagePullSecretName" . -}}
{{- $imageTag := include "exporter-trigger.tag" . -}}
{{- if .Values.enabled -}}

{{ $gpkgPath := (printf "%s%s" "/app/tiles_outputs/" .Values.rasterCommon.storage.fs.internalPvc.gpkgSubPath) }}

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -48,18 +51,23 @@ spec:
env:
- name: SERVER_PORT
value: {{ .Values.env.targetPort | quote }}
- name: GPKGS_LOCATION
value: {{ $gpkgPath }}
envFrom:
- configMapRef:
name: {{ $releaseName }}-{{ $chartName }}-configmap
ports:
- name: http
containerPort: {{ .Values.env.targetPort }}
protocol: {{ .Values.env.protocol }}
protocol: TCP
livenessProbe:
initialDelaySeconds: {{ .Values.initialDelaySeconds }}
httpGet:
path: /liveness
port: http
volumeMounts:
- name: internal-storage
mountPath: /app/tiles_outputs
{{- if .Values.resources.enabled }}
resources:
{{- toYaml .Values.resources.value | nindent 12 }}
Expand Down Expand Up @@ -126,4 +134,7 @@ spec:
configMap:
name: {{ .Release.Name }}-{{ $chartName }}-envoy
{{- end }}
- name: internal-storage
persistentVolumeClaim:
claimName: {{ .Values.rasterCommon.storage.fs.internalPvc.name }}
{{- end -}}
10 changes: 8 additions & 2 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rasterCommon:
serviceUrls:
catalogManager: "http://catalog-raster-dev-raster-catalog-manager"
jobManager: "http://job-manager-raster-dev-discrete-ingestion-db"
downloadServer: "http://files-server-raster-dev-files-server"
jobManagement:
exporter:
jobType: tilesExport
Expand All @@ -18,7 +19,11 @@ rasterCommon:
customHeaderName: ""
storage:
tilesStorageProvider: 'FS'

fs:
internalPvc:
enabled: true
name: internal-pvc
gpkgSubPath: gpkgs
enabled: true
environment: development
replicaCount: 1
Expand Down Expand Up @@ -73,14 +78,15 @@ env:
logPrettyPrintEnabled: false
responseCompressionEnabled: true
requestPayloadLimit: 1mb
pollingTimeoutMS: 2000
tracing:
enabled: false
url: http://localhost:55681/v1/trace
metrics:
enabled: false
url: http://localhost:55681/v1/metrics
jobManager:
expirationTime: 30
expirationDays: 30
httpRetry:
attempts: 5
delay: exponential
Expand Down
12 changes: 9 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"@map-colonies/express-access-log-middleware": "^0.0.3",
"@map-colonies/js-logger": "^0.0.3",
"@map-colonies/mc-model-types": "^11.0.0",
"@map-colonies/mc-priority-queue": "^3.3.1",
"@map-colonies/mc-priority-queue": "^4.0.1",
"@map-colonies/mc-utils": "^1.4.6",
"@map-colonies/openapi-express-viewer": "^2.0.1",
"@map-colonies/read-pkg": "0.0.1",
Expand Down Expand Up @@ -80,6 +80,7 @@
"@types/express": "^4.17.13",
"@types/jest": "^27.0.2",
"@types/js-yaml": "^4.0.3",
"@types/lodash": "^4.14.184",
"@types/multer": "^1.4.7",
"@types/supertest": "^2.0.11",
"@types/swagger-ui-express": "^4.1.3",
Expand Down
17 changes: 17 additions & 0 deletions src/clients/callbackClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { inject, singleton } from 'tsyringe';
import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils';
import { Logger } from '@map-colonies/js-logger';
import { SERVICES } from '../common/constants';
import { ICallbackData, IConfig } from '../common/interfaces';

@singleton()
export class CallbackClient extends HttpClient {
public constructor(@inject(SERVICES.LOGGER) logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {
super(logger, '', 'requestCallback', config.get<IHttpRetryConfig>('httpRetry'));
}

public async send(callbackUrl: string, params: ICallbackData): Promise<void> {
this.logger.info(`send Callback request to URL: ${callbackUrl} with data ${JSON.stringify(params)}`);
await this.post(callbackUrl, params);
}
}
29 changes: 25 additions & 4 deletions src/clients/jobManagerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
JobResponse,
TaskResponse,
} from '../common/interfaces';

//this is the job manager api for find job DO NOT MODIFY
interface IFindJob {
resourceId?: string;
Expand All @@ -33,7 +32,7 @@ interface IFindJob {
export class JobManagerWrapper extends JobManagerClient {
private readonly tilesJobType: string;
private readonly tilesTaskType: string;
private readonly expirationTime: number;
private readonly expirationDays: number;

public constructor(@inject(SERVICES.LOGGER) protected readonly logger: Logger) {
super(
Expand All @@ -42,14 +41,14 @@ export class JobManagerWrapper extends JobManagerClient {
config.get<string>('workerTypes.tiles.taskType'),
config.get<string>('jobManager.url')
);
this.expirationTime = config.get<number>('jobManager.expirationTime');
this.expirationDays = config.get<number>('jobManager.expirationDays');
this.tilesJobType = config.get<string>('workerTypes.tiles.jobType');
this.tilesTaskType = config.get<string>('workerTypes.tiles.taskType');
}

public async create(data: IWorkerInput): Promise<ICreateJobResponse> {
const expirationDate = new Date();
expirationDate.setDate(expirationDate.getDate() + this.expirationTime);
expirationDate.setDate(expirationDate.getDate() + this.expirationDays);

const createJobRequest: CreateJobBody = {
resourceId: data.cswProductId,
Expand All @@ -62,6 +61,7 @@ export class JobManagerWrapper extends JobManagerClient {
zoomLevel: data.zoomLevel,
callbacks: data.callbacks,
crs: data.crs,
fileName: data.fileName,
},
internalId: data.dbId,
productType: data.productType,
Expand Down Expand Up @@ -149,6 +149,27 @@ export class JobManagerWrapper extends JobManagerClient {
return tasks;
}

public async getJobsStatus(): Promise<JobResponse[] | undefined> {
const queryParams: IFindJob = {
isCleaned: 'false',
type: this.tilesJobType,
shouldReturnTasks: 'false',
status: OperationStatus.IN_PROGRESS,
};

const jobs = await this.getJobs(queryParams);
return jobs;
}

public async updateJobStatus(jobId: string, status: OperationStatus, reason?: string, catalogId?: string): Promise<void> {
const updateJobUrl = `/jobs/${jobId}`;
await this.put(updateJobUrl, {
status: status,
reason: reason,
internalId: catalogId,
});
}

private async getJobs(queryParams: IFindJob): Promise<JobResponse[] | undefined> {
this.logger.info(`Getting jobs that match these parameters: ${JSON.stringify(queryParams)}`);
const jobs = await this.get<JobResponse[] | undefined>('/jobs', queryParams as unknown as Record<string, unknown>);
Expand Down
29 changes: 25 additions & 4 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BBox2d } from '@turf/helpers/dist/js/lib/geojson';
import { MultiPolygon, Polygon, BBox } from '@turf/turf';
import { ICreateJobBody, IJobResponse, ITaskResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { ITileRange } from '@map-colonies/mc-utils';

Expand Down Expand Up @@ -31,6 +32,7 @@ export interface ICallbackTarget {
export interface IWorkerInput {
dbId: string;
targetResolution: number;
fileName: string;
priority?: number;
callbacks: ICallbackTarget[];
crs: string;
Expand All @@ -53,20 +55,23 @@ export interface ICreateJobResponse {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

export interface ICallbackParams {
export interface ICallbackDataBase {
fileUri: string;
expirationTime: Date;
fileSize: number;
dbId: string;
packageName: string;
bbox: BBox2d | true;
targetResolution: number;
requestId: string;
success: boolean;
errorReason?: string;
}

export interface ICallbackResposne extends ICallbackParams {
export interface ICallbackData extends ICallbackDataBase {
bbox: BBox;
}

export interface ICallbackResposne extends ICallbackData {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

Expand All @@ -85,7 +90,8 @@ export interface IJobParameters {
callbacks: ICallbackTarget[];
sanitizedBbox: BBox2d;
zoomLevel: number;
callbackParams?: ICallbackParams;
callbackParams?: ICallbackDataBase;
fileName: string;
}

export declare type MergerSourceType = 'S3' | 'GPKG' | 'FS';
Expand All @@ -105,6 +111,21 @@ export interface ITaskParameters {
sources: IMapSource[];
}

export interface IInput {
jobId: string;
footprint?: Polygon | MultiPolygon;
bbox: BBox | true;
zoomLevel: number;
packageName: string;
callbackURLs: string[];
dbId: string;
}

export interface IJobStatusResponse {
completedJobs: JobResponse[] | undefined;
failedJobs: JobResponse[] | undefined;
}

export type JobResponse = IJobResponse<IJobParameters, ITaskParameters>;
export type TaskResponse = ITaskResponse<ITaskParameters>;
export type CreateJobBody = ICreateJobBody<IJobParameters, ITaskParameters>;
26 changes: 26 additions & 0 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { promises as fsPromise } from 'fs';
import { join } from 'path';
import { BBox } from '@turf/turf';

export const getFileSize = async (filePath: string): Promise<number> => {
const fileSizeInBytes = (await fsPromise.stat(filePath)).size;
return Math.trunc(fileSizeInBytes); // Make sure we return an Integer
};

export const generatePackageName = (dbId: string, zoomLevel: number, bbox: BBox): string => {
const numberOfDecimals = 5;
const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join('');
return `gm_${dbId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`;
};

export const getGpkgRelativePath = (packageName: string): string => {
const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.'));
const packageRelativePath = join(packageDirectoryName, packageName);
return packageRelativePath;
};

export const getGpkgFullPath = (gpkgsLocation: string, packageName: string): string => {
const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.'));
const packageFullPath = join(gpkgsLocation, packageDirectoryName, packageName);
return packageFullPath;
};
4 changes: 3 additions & 1 deletion src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { createPackageRouterFactory, CREATE_PACKAGE_ROUTER_SYMBOL } from './createPackage/routes/createPackageRouter';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './createPackage/routes/tasksRouter';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './tasks/routes/tasksRouter';
import { PollingManager, POLLING_MANGER_SYMBOL } from './pollingManager';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand All @@ -33,6 +34,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: SERVICES.METER, provider: { useValue: meter } },
{ token: CREATE_PACKAGE_ROUTER_SYMBOL, provider: { useFactory: createPackageRouterFactory } },
{ token: TASKS_ROUTER_SYMBOL, provider: { useFactory: tasksRouterFactory } },
{ token: POLLING_MANGER_SYMBOL, provider: { useClass: PollingManager } },
{
token: 'onSignal',
provider: {
Expand Down
Loading

0 comments on commit 11960f1

Please sign in to comment.