Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added polling jobs status system #31

Merged
merged 10 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
},
"jobManager": {
"url": "JOB_MANAGER_URL",
"expirationTime": {
"__name": "JOB_MANAGER_EXPIRATION_TIME",
"expirationDays": {
"__name": "JOB_MANAGER_EXPIRATION_DAYS",
"__format": "number"
}
},
Expand All @@ -66,5 +66,11 @@
"taskType": "WORKER_TYPES_TILES_TASK_TYPE"
}
},
"tilesProvider": "TILES_PROVIDER"
"tilesProvider": "TILES_PROVIDER",
"gpkgsLocation": "GPKGS_LOCATION",
"downloadServerUrl": "DOWNLOAD_SERVER_URL",
"pollingTimeoutMS": {
"__name": "POLLING_TIMEOUT_MS",
"__format": "number"
}
}
7 changes: 5 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
},
"jobManager": {
"url": "http://job-manager-job-manager",
"expirationTime": 30
"expirationDays": 30
},
"rasterCatalogManager": {
"url": "http://raster-catalog-manager"
Expand All @@ -43,5 +43,8 @@
"taskType": "rasterTilesExporter"
}
},
"tilesProvider": "S3"
"tilesProvider": "S3",
"gpkgsLocation": "/app/tiles_outputs/gpkgs",
"downloadServerUrl": "http://download-service",
"pollingTimeoutMS": 2000
}
6 changes: 4 additions & 2 deletions helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ data:
TELEMETRY_METRICS_URL: {{ $metricsUrl }}
{{ end }}
JOB_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.jobManager | quote }}
JOB_MANAGER_EXPIRATION_TIME: {{ .Values.env.jobManager.expirationTime | quote }}
JOB_MANAGER_EXPIRATION_DAYS: {{ .Values.env.jobManager.expirationDays | quote }}
RASTER_CATALOG_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.catalogManager | quote }}
WORKER_TYPES_TILES_JOB_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.jobType | quote }}
WORKER_TYPES_TILES_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.taskType | quote }}
HTTP_RETRY_ATTEMPTS: {{ .Values.env.httpRetry.attempts | quote }}
HTTP_RETRY_DELAY: {{ .Values.env.httpRetry.delay | quote }}
HTTP_RETRY_SHOULD_RESET_TIMEOUT: {{ .Values.env.httpRetry.shouldResetTimeout | quote }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider | quote }}
DOWNLOAD_SERVER_URL: {{ .Values.rasterCommon.serviceUrls.downloadServer | quote }}
POLLING_TIMEOUT_MS: {{ .Values.env.pollingTimeoutMS | quote }}
{{- end }}
13 changes: 12 additions & 1 deletion helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
{{- $cloudProviderImagePullSecretName := include "exporter-trigger.cloudProviderImagePullSecretName" . -}}
{{- $imageTag := include "exporter-trigger.tag" . -}}
{{- if .Values.enabled -}}

{{ $gpkgPath := (printf "%s%s" "/app/tiles_outputs/" .Values.rasterCommon.storage.fs.internalPvc.gpkgSubPath) }}

apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -48,18 +51,23 @@ spec:
env:
- name: SERVER_PORT
value: {{ .Values.env.targetPort | quote }}
- name: GPKGS_LOCATION
value: {{ $gpkgPath }}
envFrom:
- configMapRef:
name: {{ $releaseName }}-{{ $chartName }}-configmap
ports:
- name: http
containerPort: {{ .Values.env.targetPort }}
protocol: {{ .Values.env.protocol }}
protocol: TCP
livenessProbe:
initialDelaySeconds: {{ .Values.initialDelaySeconds }}
httpGet:
path: /liveness
port: http
volumeMounts:
- name: internal-storage
mountPath: /app/tiles_outputs
{{- if .Values.resources.enabled }}
resources:
{{- toYaml .Values.resources.value | nindent 12 }}
Expand Down Expand Up @@ -126,4 +134,7 @@ spec:
configMap:
name: {{ .Release.Name }}-{{ $chartName }}-envoy
{{- end }}
- name: internal-storage
persistentVolumeClaim:
claimName: {{ .Values.rasterCommon.storage.fs.internalPvc.name }}
{{- end -}}
10 changes: 8 additions & 2 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rasterCommon:
serviceUrls:
catalogManager: "http://catalog-raster-dev-raster-catalog-manager"
jobManager: "http://job-manager-raster-dev-discrete-ingestion-db"
downloadServer: "http://files-server-raster-dev-files-server"
jobManagement:
exporter:
jobType: tilesExport
Expand All @@ -18,7 +19,11 @@ rasterCommon:
customHeaderName: ""
storage:
tilesStorageProvider: 'FS'

fs:
internalPvc:
enabled: true
name: internal-pvc
gpkgSubPath: gpkgs
enabled: true
environment: development
replicaCount: 1
Expand Down Expand Up @@ -73,14 +78,15 @@ env:
logPrettyPrintEnabled: false
responseCompressionEnabled: true
requestPayloadLimit: 1mb
pollingTimeoutMS: 15000
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
tracing:
enabled: false
url: http://localhost:55681/v1/trace
metrics:
enabled: false
url: http://localhost:55681/v1/metrics
jobManager:
expirationTime: 30
expirationDays: 30
httpRetry:
attempts: 5
delay: exponential
Expand Down
17 changes: 17 additions & 0 deletions src/clients/callbackClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { inject, singleton } from 'tsyringe';
import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils';
import { Logger } from '@map-colonies/js-logger';
import { SERVICES } from '../common/constants';
import { ICallbackData, IConfig } from '../common/interfaces';

@singleton()
export class CallbackClient extends HttpClient {
public constructor(@inject(SERVICES.LOGGER) logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {
super(logger, '', 'requestCallback', config.get<IHttpRetryConfig>('httpRetry'));
}

public async send(callbackUrl: string, params: ICallbackData): Promise<void> {
this.logger.info(`send Callback request to URL: ${callbackUrl} with data ${JSON.stringify(params)}`);
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
await this.post(callbackUrl, params);
}
}
29 changes: 25 additions & 4 deletions src/clients/jobManagerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
JobResponse,
TaskResponse,
} from '../common/interfaces';

//this is the job manager api for find job DO NOT MODIFY
interface IFindJob {
resourceId?: string;
Expand All @@ -33,7 +32,7 @@ interface IFindJob {
export class JobManagerWrapper extends JobManagerClient {
private readonly tilesJobType: string;
private readonly tilesTaskType: string;
private readonly expirationTime: number;
private readonly expirationDays: number;

public constructor(@inject(SERVICES.LOGGER) protected readonly logger: Logger) {
super(
Expand All @@ -42,14 +41,14 @@ export class JobManagerWrapper extends JobManagerClient {
config.get<string>('workerTypes.tiles.taskType'),
config.get<string>('jobManager.url')
);
this.expirationTime = config.get<number>('jobManager.expirationTime');
this.expirationDays = config.get<number>('jobManager.expirationDays');
this.tilesJobType = config.get<string>('workerTypes.tiles.jobType');
this.tilesTaskType = config.get<string>('workerTypes.tiles.taskType');
}

public async create(data: IWorkerInput): Promise<ICreateJobResponse> {
const expirationDate = new Date();
expirationDate.setDate(expirationDate.getDate() + this.expirationTime);
expirationDate.setDate(expirationDate.getDate() + this.expirationDays);

const createJobRequest: CreateJobBody = {
resourceId: data.cswProductId,
Expand All @@ -62,6 +61,7 @@ export class JobManagerWrapper extends JobManagerClient {
zoomLevel: data.zoomLevel,
callbacks: data.callbacks,
crs: data.crs,
fileName: data.fileName,
},
internalId: data.dbId,
productType: data.productType,
Expand Down Expand Up @@ -149,6 +149,27 @@ export class JobManagerWrapper extends JobManagerClient {
return tasks;
}

public async getJobsStatus(): Promise<JobResponse[] | undefined> {
const queryParams: IFindJob = {
isCleaned: 'false',
type: this.tilesJobType,
shouldReturnTasks: 'false',
status: OperationStatus.IN_PROGRESS,
};

const jobs = await this.getJobs(queryParams);
return jobs;
}

public async updateJobStatus(jobId: string, status: OperationStatus, reason?: string, catalogId?: string): Promise<void> {
const updateJobUrl = `/jobs/${jobId}`;
await this.put(updateJobUrl, {
status: status,
reason: reason,
internalId: catalogId,
});
}

private async getJobs(queryParams: IFindJob): Promise<JobResponse[] | undefined> {
this.logger.info(`Getting jobs that match these parameters: ${JSON.stringify(queryParams)}`);
const jobs = await this.get<JobResponse[] | undefined>('/jobs', queryParams as unknown as Record<string, unknown>);
Expand Down
29 changes: 25 additions & 4 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BBox2d } from '@turf/helpers/dist/js/lib/geojson';
import { MultiPolygon, Polygon, BBox } from '@turf/turf';
import { ICreateJobBody, IJobResponse, ITaskResponse, OperationStatus } from '@map-colonies/mc-priority-queue';
import { ITileRange } from '@map-colonies/mc-utils';

Expand Down Expand Up @@ -31,6 +32,7 @@ export interface ICallbackTarget {
export interface IWorkerInput {
dbId: string;
targetResolution: number;
fileName: string;
priority?: number;
callbacks: ICallbackTarget[];
crs: string;
Expand All @@ -53,20 +55,23 @@ export interface ICreateJobResponse {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

export interface ICallbackParams {
export interface ICallbackDataBase {
fileUri: string;
expirationTime: Date;
fileSize: number;
dbId: string;
packageName: string;
bbox: BBox2d | true;
targetResolution: number;
requestId: string;
success: boolean;
errorReason?: string;
}

export interface ICallbackResposne extends ICallbackParams {
export interface ICallbackData extends ICallbackDataBase {
bbox: BBox;
}

export interface ICallbackResposne extends ICallbackData {
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

Expand All @@ -85,7 +90,8 @@ export interface IJobParameters {
callbacks: ICallbackTarget[];
sanitizedBbox: BBox2d;
zoomLevel: number;
callbackParams?: ICallbackParams;
callbackParams?: ICallbackDataBase;
fileName: string;
}

export declare type MergerSourceType = 'S3' | 'GPKG' | 'FS';
Expand All @@ -105,6 +111,21 @@ export interface ITaskParameters {
sources: IMapSource[];
}

export interface IInput {
jobId: string;
footprint?: Polygon | MultiPolygon;
bbox: BBox | true;
zoomLevel: number;
packageName: string;
callbackURLs: string[];
dbId: string;
}

export interface IJobStatusResponse {
completedJobs: JobResponse[] | undefined;
failedJobs: JobResponse[] | undefined;
}

export type JobResponse = IJobResponse<IJobParameters, ITaskParameters>;
export type TaskResponse = ITaskResponse<ITaskParameters>;
export type CreateJobBody = ICreateJobBody<IJobParameters, ITaskParameters>;
26 changes: 26 additions & 0 deletions src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { promises as fsPromise } from 'fs';
import { join } from 'path';
import { BBox } from '@turf/turf';

export const getFileSize = async (filePath: string): Promise<number> => {
const fileSizeInBytes = (await fsPromise.stat(filePath)).size;
return Math.trunc(fileSizeInBytes); // Make sure we return an Integer
};

export const generatePackageName = (dbId: string, zoomLevel: number, bbox: BBox): string => {
const numberOfDecimals = 5;
const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join('');
return `gm_${dbId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`;
};

export const getGpkgRelativePath = (packageName: string): string => {
const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.'));
const packageRelativePath = join(packageDirectoryName, packageName);
return packageRelativePath;
};

export const getGpkgFullPath = (gpkgsLocation: string, packageName: string): string => {
const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.'));
const packageFullPath = join(gpkgsLocation, packageDirectoryName, packageName);
CL-SHLOMIKONCHA marked this conversation as resolved.
Show resolved Hide resolved
return packageFullPath;
};
4 changes: 3 additions & 1 deletion src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import { SERVICES, SERVICE_NAME } from './common/constants';
import { tracing } from './common/tracing';
import { createPackageRouterFactory, CREATE_PACKAGE_ROUTER_SYMBOL } from './createPackage/routes/createPackageRouter';
import { InjectionObject, registerDependencies } from './common/dependencyRegistration';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './createPackage/routes/tasksRouter';
import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './tasks/routes/tasksRouter';
import { PollingManager, POLLING_MANGER_SYMBOL } from './pollingManager';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand All @@ -33,6 +34,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon
{ token: SERVICES.METER, provider: { useValue: meter } },
{ token: CREATE_PACKAGE_ROUTER_SYMBOL, provider: { useFactory: createPackageRouterFactory } },
{ token: TASKS_ROUTER_SYMBOL, provider: { useFactory: tasksRouterFactory } },
{ token: POLLING_MANGER_SYMBOL, provider: { useClass: PollingManager } },
{
token: 'onSignal',
provider: {
Expand Down
Loading