diff --git a/config/custom-environment-variables.json b/config/custom-environment-variables.json index f50faf6..0484beb 100644 --- a/config/custom-environment-variables.json +++ b/config/custom-environment-variables.json @@ -52,8 +52,8 @@ }, "jobManager": { "url": "JOB_MANAGER_URL", - "expirationTime": { - "__name": "JOB_MANAGER_EXPIRATION_TIME", + "expirationDays": { + "__name": "JOB_MANAGER_EXPIRATION_DAYS", "__format": "number" } }, @@ -66,5 +66,11 @@ "taskType": "WORKER_TYPES_TILES_TASK_TYPE" } }, - "tilesProvider": "TILES_PROVIDER" + "tilesProvider": "TILES_PROVIDER", + "gpkgsLocation": "GPKGS_LOCATION", + "downloadServerUrl": "DOWNLOAD_SERVER_URL", + "pollingTimeoutMS": { + "__name": "POLLING_TIMEOUT_MS", + "__format": "number" + } } diff --git a/config/default.json b/config/default.json index bda8150..ec8fb2a 100644 --- a/config/default.json +++ b/config/default.json @@ -32,7 +32,7 @@ }, "jobManager": { "url": "http://job-manager-job-manager", - "expirationTime": 30 + "expirationDays": 30 }, "rasterCatalogManager": { "url": "http://raster-catalog-manager" @@ -43,5 +43,8 @@ "taskType": "rasterTilesExporter" } }, - "tilesProvider": "S3" + "tilesProvider": "S3", + "gpkgsLocation": "/app/tiles_outputs/gpkgs", + "downloadServerUrl": "http://download-service", + "pollingTimeoutMS": 2000 } diff --git a/helm/templates/configmap.yaml b/helm/templates/configmap.yaml index 08d45f5..7cab6ae 100644 --- a/helm/templates/configmap.yaml +++ b/helm/templates/configmap.yaml @@ -20,12 +20,14 @@ data: TELEMETRY_METRICS_URL: {{ $metricsUrl }} {{ end }} JOB_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.jobManager | quote }} - JOB_MANAGER_EXPIRATION_TIME: {{ .Values.env.jobManager.expirationTime | quote }} + JOB_MANAGER_EXPIRATION_DAYS: {{ .Values.env.jobManager.expirationDays | quote }} RASTER_CATALOG_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.catalogManager | quote }} WORKER_TYPES_TILES_JOB_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.jobType | quote }} WORKER_TYPES_TILES_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.taskType | quote }} HTTP_RETRY_ATTEMPTS: {{ .Values.env.httpRetry.attempts | quote }} HTTP_RETRY_DELAY: {{ .Values.env.httpRetry.delay | quote }} HTTP_RETRY_SHOULD_RESET_TIMEOUT: {{ .Values.env.httpRetry.shouldResetTimeout | quote }} - TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider }} + TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider | quote }} + DOWNLOAD_SERVER_URL: {{ .Values.rasterCommon.serviceUrls.downloadServer | quote }} + POLLING_TIMEOUT_MS: {{ .Values.env.pollingTimeoutMS | quote }} {{- end }} diff --git a/helm/templates/deployment.yaml b/helm/templates/deployment.yaml index b4d8046..5bc0323 100644 --- a/helm/templates/deployment.yaml +++ b/helm/templates/deployment.yaml @@ -5,6 +5,9 @@ {{- $cloudProviderImagePullSecretName := include "exporter-trigger.cloudProviderImagePullSecretName" . -}} {{- $imageTag := include "exporter-trigger.tag" . -}} {{- if .Values.enabled -}} + +{{ $gpkgPath := (printf "%s%s" "/app/tiles_outputs/" .Values.rasterCommon.storage.fs.internalPvc.gpkgSubPath) }} + apiVersion: apps/v1 kind: Deployment metadata: @@ -48,18 +51,23 @@ spec: env: - name: SERVER_PORT value: {{ .Values.env.targetPort | quote }} + - name: GPKGS_LOCATION + value: {{ $gpkgPath }} envFrom: - configMapRef: name: {{ $releaseName }}-{{ $chartName }}-configmap ports: - name: http containerPort: {{ .Values.env.targetPort }} - protocol: {{ .Values.env.protocol }} + protocol: TCP livenessProbe: initialDelaySeconds: {{ .Values.initialDelaySeconds }} httpGet: path: /liveness port: http + volumeMounts: + - name: internal-storage + mountPath: /app/tiles_outputs {{- if .Values.resources.enabled }} resources: {{- toYaml .Values.resources.value | nindent 12 }} @@ -126,4 +134,7 @@ spec: configMap: name: {{ .Release.Name }}-{{ $chartName }}-envoy {{- end }} + - name: internal-storage + persistentVolumeClaim: + claimName: {{ .Values.rasterCommon.storage.fs.internalPvc.name }} {{- end -}} diff --git a/helm/values.yaml b/helm/values.yaml index 602e8c1..c486afb 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -8,6 +8,7 @@ rasterCommon: serviceUrls: catalogManager: "http://catalog-raster-dev-raster-catalog-manager" jobManager: "http://job-manager-raster-dev-discrete-ingestion-db" + downloadServer: "http://files-server-raster-dev-files-server" jobManagement: exporter: jobType: tilesExport @@ -18,7 +19,11 @@ rasterCommon: customHeaderName: "" storage: tilesStorageProvider: 'FS' - + fs: + internalPvc: + enabled: true + name: internal-pvc + gpkgSubPath: gpkgs enabled: true environment: development replicaCount: 1 @@ -73,6 +78,7 @@ env: logPrettyPrintEnabled: false responseCompressionEnabled: true requestPayloadLimit: 1mb + pollingTimeoutMS: 2000 tracing: enabled: false url: http://localhost:55681/v1/trace @@ -80,7 +86,7 @@ env: enabled: false url: http://localhost:55681/v1/metrics jobManager: - expirationTime: 30 + expirationDays: 30 httpRetry: attempts: 5 delay: exponential diff --git a/package-lock.json b/package-lock.json index a398687..f9825ec 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1079,9 +1079,9 @@ } }, "@map-colonies/mc-priority-queue": { - "version": "3.3.1", - "resolved": "https://registry.npmjs.org/@map-colonies/mc-priority-queue/-/mc-priority-queue-3.3.1.tgz", - "integrity": "sha512-ZwLN5PQ0e6f+Ov8eF1sVNUw8qopNfg0uslhHdqILfYlKgAR15JHfpSbM/sdKr2o9sQDhPSbER/1fogfjINvh+g==", + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/@map-colonies/mc-priority-queue/-/mc-priority-queue-4.0.1.tgz", + "integrity": "sha512-Wu7xUluH8OVVShToLgZUVa3XqKj0YrfcnjhBnLn+82xhANAmj4VAg0QDxp39wxVeOK+QDc41Y6BrFx0OHOXNwQ==", "requires": { "@map-colonies/mc-utils": "^1.3.0" } @@ -3000,6 +3000,12 @@ "integrity": "sha1-7ihweulOEdK4J7y+UnC86n8+ce4=", "dev": true }, + "@types/lodash": { + "version": "4.14.184", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.184.tgz", + "integrity": "sha512-RoZphVtHbxPZizt4IcILciSWiC6dcn+eZ8oX9IWEYfDMcocdd42f7NPI6fQj+6zI8y4E0L7gu2pcZKLGTRaV9Q==", + "dev": true + }, "@types/mime": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz", diff --git a/package.json b/package.json index 8cc8e9d..85f3dc4 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "@map-colonies/express-access-log-middleware": "^0.0.3", "@map-colonies/js-logger": "^0.0.3", "@map-colonies/mc-model-types": "^11.0.0", - "@map-colonies/mc-priority-queue": "^3.3.1", + "@map-colonies/mc-priority-queue": "^4.0.1", "@map-colonies/mc-utils": "^1.4.6", "@map-colonies/openapi-express-viewer": "^2.0.1", "@map-colonies/read-pkg": "0.0.1", @@ -80,6 +80,7 @@ "@types/express": "^4.17.13", "@types/jest": "^27.0.2", "@types/js-yaml": "^4.0.3", + "@types/lodash": "^4.14.184", "@types/multer": "^1.4.7", "@types/supertest": "^2.0.11", "@types/swagger-ui-express": "^4.1.3", diff --git a/src/clients/callbackClient.ts b/src/clients/callbackClient.ts new file mode 100644 index 0000000..a76e031 --- /dev/null +++ b/src/clients/callbackClient.ts @@ -0,0 +1,17 @@ +import { inject, singleton } from 'tsyringe'; +import { HttpClient, IHttpRetryConfig } from '@map-colonies/mc-utils'; +import { Logger } from '@map-colonies/js-logger'; +import { SERVICES } from '../common/constants'; +import { ICallbackData, IConfig } from '../common/interfaces'; + +@singleton() +export class CallbackClient extends HttpClient { + public constructor(@inject(SERVICES.LOGGER) logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) { + super(logger, '', 'requestCallback', config.get('httpRetry')); + } + + public async send(callbackUrl: string, params: ICallbackData): Promise { + this.logger.info(`send Callback request to URL: ${callbackUrl} with data ${JSON.stringify(params)}`); + await this.post(callbackUrl, params); + } +} diff --git a/src/clients/jobManagerWrapper.ts b/src/clients/jobManagerWrapper.ts index 06769aa..3c527fc 100644 --- a/src/clients/jobManagerWrapper.ts +++ b/src/clients/jobManagerWrapper.ts @@ -15,7 +15,6 @@ import { JobResponse, TaskResponse, } from '../common/interfaces'; - //this is the job manager api for find job DO NOT MODIFY interface IFindJob { resourceId?: string; @@ -33,7 +32,7 @@ interface IFindJob { export class JobManagerWrapper extends JobManagerClient { private readonly tilesJobType: string; private readonly tilesTaskType: string; - private readonly expirationTime: number; + private readonly expirationDays: number; public constructor(@inject(SERVICES.LOGGER) protected readonly logger: Logger) { super( @@ -42,14 +41,14 @@ export class JobManagerWrapper extends JobManagerClient { config.get('workerTypes.tiles.taskType'), config.get('jobManager.url') ); - this.expirationTime = config.get('jobManager.expirationTime'); + this.expirationDays = config.get('jobManager.expirationDays'); this.tilesJobType = config.get('workerTypes.tiles.jobType'); this.tilesTaskType = config.get('workerTypes.tiles.taskType'); } public async create(data: IWorkerInput): Promise { const expirationDate = new Date(); - expirationDate.setDate(expirationDate.getDate() + this.expirationTime); + expirationDate.setDate(expirationDate.getDate() + this.expirationDays); const createJobRequest: CreateJobBody = { resourceId: data.cswProductId, @@ -62,6 +61,7 @@ export class JobManagerWrapper extends JobManagerClient { zoomLevel: data.zoomLevel, callbacks: data.callbacks, crs: data.crs, + fileName: data.fileName, }, internalId: data.dbId, productType: data.productType, @@ -149,6 +149,27 @@ export class JobManagerWrapper extends JobManagerClient { return tasks; } + public async getJobsStatus(): Promise { + const queryParams: IFindJob = { + isCleaned: 'false', + type: this.tilesJobType, + shouldReturnTasks: 'false', + status: OperationStatus.IN_PROGRESS, + }; + + const jobs = await this.getJobs(queryParams); + return jobs; + } + + public async updateJobStatus(jobId: string, status: OperationStatus, reason?: string, catalogId?: string): Promise { + const updateJobUrl = `/jobs/${jobId}`; + await this.put(updateJobUrl, { + status: status, + reason: reason, + internalId: catalogId, + }); + } + private async getJobs(queryParams: IFindJob): Promise { this.logger.info(`Getting jobs that match these parameters: ${JSON.stringify(queryParams)}`); const jobs = await this.get('/jobs', queryParams as unknown as Record); diff --git a/src/common/interfaces.ts b/src/common/interfaces.ts index 5ce6cdc..716f4cc 100644 --- a/src/common/interfaces.ts +++ b/src/common/interfaces.ts @@ -1,4 +1,5 @@ import { BBox2d } from '@turf/helpers/dist/js/lib/geojson'; +import { MultiPolygon, Polygon, BBox } from '@turf/turf'; import { ICreateJobBody, IJobResponse, ITaskResponse, OperationStatus } from '@map-colonies/mc-priority-queue'; import { ITileRange } from '@map-colonies/mc-utils'; @@ -31,6 +32,7 @@ export interface ICallbackTarget { export interface IWorkerInput { dbId: string; targetResolution: number; + fileName: string; priority?: number; callbacks: ICallbackTarget[]; crs: string; @@ -53,20 +55,23 @@ export interface ICreateJobResponse { status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED; } -export interface ICallbackParams { +export interface ICallbackDataBase { fileUri: string; expirationTime: Date; fileSize: number; dbId: string; packageName: string; - bbox: BBox2d | true; targetResolution: number; requestId: string; success: boolean; errorReason?: string; } -export interface ICallbackResposne extends ICallbackParams { +export interface ICallbackData extends ICallbackDataBase { + bbox: BBox; +} + +export interface ICallbackResposne extends ICallbackData { status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED; } @@ -85,7 +90,8 @@ export interface IJobParameters { callbacks: ICallbackTarget[]; sanitizedBbox: BBox2d; zoomLevel: number; - callbackParams?: ICallbackParams; + callbackParams?: ICallbackDataBase; + fileName: string; } export declare type MergerSourceType = 'S3' | 'GPKG' | 'FS'; @@ -105,6 +111,21 @@ export interface ITaskParameters { sources: IMapSource[]; } +export interface IInput { + jobId: string; + footprint?: Polygon | MultiPolygon; + bbox: BBox | true; + zoomLevel: number; + packageName: string; + callbackURLs: string[]; + dbId: string; +} + +export interface IJobStatusResponse { + completedJobs: JobResponse[] | undefined; + failedJobs: JobResponse[] | undefined; +} + export type JobResponse = IJobResponse; export type TaskResponse = ITaskResponse; export type CreateJobBody = ICreateJobBody; diff --git a/src/common/utils.ts b/src/common/utils.ts new file mode 100644 index 0000000..47831db --- /dev/null +++ b/src/common/utils.ts @@ -0,0 +1,26 @@ +import { promises as fsPromise } from 'fs'; +import { join } from 'path'; +import { BBox } from '@turf/turf'; + +export const getFileSize = async (filePath: string): Promise => { + const fileSizeInBytes = (await fsPromise.stat(filePath)).size; + return Math.trunc(fileSizeInBytes); // Make sure we return an Integer +}; + +export const generatePackageName = (dbId: string, zoomLevel: number, bbox: BBox): string => { + const numberOfDecimals = 5; + const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join(''); + return `gm_${dbId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`; +}; + +export const getGpkgRelativePath = (packageName: string): string => { + const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.')); + const packageRelativePath = join(packageDirectoryName, packageName); + return packageRelativePath; +}; + +export const getGpkgFullPath = (gpkgsLocation: string, packageName: string): string => { + const packageDirectoryName = packageName.substr(0, packageName.lastIndexOf('.')); + const packageFullPath = join(gpkgsLocation, packageDirectoryName, packageName); + return packageFullPath; +}; diff --git a/src/containerConfig.ts b/src/containerConfig.ts index dac96c5..6cb2ece 100644 --- a/src/containerConfig.ts +++ b/src/containerConfig.ts @@ -8,7 +8,8 @@ import { SERVICES, SERVICE_NAME } from './common/constants'; import { tracing } from './common/tracing'; import { createPackageRouterFactory, CREATE_PACKAGE_ROUTER_SYMBOL } from './createPackage/routes/createPackageRouter'; import { InjectionObject, registerDependencies } from './common/dependencyRegistration'; -import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './createPackage/routes/tasksRouter'; +import { tasksRouterFactory, TASKS_ROUTER_SYMBOL } from './tasks/routes/tasksRouter'; +import { PollingManager, POLLING_MANGER_SYMBOL } from './pollingManager'; export interface RegisterOptions { override?: InjectionObject[]; @@ -33,6 +34,7 @@ export const registerExternalValues = (options?: RegisterOptions): DependencyCon { token: SERVICES.METER, provider: { useValue: meter } }, { token: CREATE_PACKAGE_ROUTER_SYMBOL, provider: { useFactory: createPackageRouterFactory } }, { token: TASKS_ROUTER_SYMBOL, provider: { useFactory: tasksRouterFactory } }, + { token: POLLING_MANGER_SYMBOL, provider: { useClass: PollingManager } }, { token: 'onSignal', provider: { diff --git a/src/createPackage/models/createPackageManager.ts b/src/createPackage/models/createPackageManager.ts index bf4a451..f74a99a 100644 --- a/src/createPackage/models/createPackageManager.ts +++ b/src/createPackage/models/createPackageManager.ts @@ -1,13 +1,15 @@ -import { sep } from 'path'; +import { promises as fsPromise } from 'fs'; +import { sep, join, dirname } from 'path'; import config from 'config'; import { Logger } from '@map-colonies/js-logger'; import { Polygon, MultiPolygon, BBox, bbox as PolygonBbox, intersect, bboxPolygon } from '@turf/turf'; import { inject, injectable } from 'tsyringe'; import { degreesPerPixelToZoomLevel, ITileRange, snapBBoxToTileGrid } from '@map-colonies/mc-utils'; -import { OperationStatus } from '@map-colonies/mc-priority-queue'; +import { IJobResponse, OperationStatus } from '@map-colonies/mc-priority-queue'; import { bboxToTileRange } from '@map-colonies/mc-utils'; import { BadRequestError } from '@map-colonies/error-types'; import { BBox2d } from '@turf/helpers/dist/js/lib/geojson'; +import { generatePackageName, getGpkgRelativePath } from '../../common/utils'; import { RasterCatalogManagerClient } from '../../clients/rasterCatalogManagerClient'; import { DEFAULT_CRS, DEFAULT_PRIORITY, DEFAULT_PRODUCT_TYPE, SERVICES } from '../../common/constants'; import { @@ -21,12 +23,14 @@ import { MergerSourceType, IMapSource, ICallbackTarget, + ITaskParameters, } from '../../common/interfaces'; import { JobManagerWrapper } from '../../clients/jobManagerWrapper'; @injectable() export class CreatePackageManager { private readonly tilesProvider: MergerSourceType; + private readonly metadataFileName: string; public constructor( @inject(SERVICES.LOGGER) private readonly logger: Logger, @inject(JobManagerWrapper) private readonly jobManagerClient: JobManagerWrapper, @@ -34,6 +38,7 @@ export class CreatePackageManager { ) { this.tilesProvider = config.get('tilesProvider'); this.tilesProvider = this.tilesProvider.toUpperCase() as MergerSourceType; + this.metadataFileName = 'metadata.json'; } public async createPackage(userInput: ICreatePackage): Promise { @@ -71,9 +76,11 @@ export class CreatePackageManager { batches.push(bboxToTileRange(sanitizedBbox, i)); } const separator = this.getSeparator(); + const packageName = generatePackageName(dbId, zoomLevel, sanitizedBbox); + const packageRelativePath = getGpkgRelativePath(packageName); const sources: IMapSource[] = [ { - path: this.generatePackageName(dbId, zoomLevel, sanitizedBbox), //gpkg path + path: packageRelativePath, type: 'GPKG', extent: { minX: bbox[0], @@ -90,6 +97,7 @@ export class CreatePackageManager { const workerInput: IWorkerInput = { sanitizedBbox, targetResolution, + fileName: packageName, zoomLevel, dbId, version: version as string, @@ -109,6 +117,13 @@ export class CreatePackageManager { return duplicationExist; } + public async createJsonMetadata(filePath: string, dbId: string): Promise { + const metadataFilePath = join(dirname(filePath), this.metadataFileName); + const record = await this.rasterCatalogManager.findLayer(dbId); + const recordMetadata = JSON.stringify(record.metadata); + await fsPromise.writeFile(metadataFilePath, recordMetadata); + } + private getSeparator(): string { return this.tilesProvider === 'S3' ? '/' : sep; } @@ -122,12 +137,6 @@ export class CreatePackageManager { return bbox; } - private generatePackageName(cswId: string, zoomLevel: number, bbox: BBox): string { - const numberOfDecimals = 5; - const bboxToString = bbox.map((val) => String(val.toFixed(numberOfDecimals)).replace('.', '_').replace(/-/g, 'm')).join(''); - return `gm_${cswId.replace(/-/g, '_')}_${zoomLevel}_${bboxToString}.gpkg`; - } - private async checkForDuplicate( dupParams: JobDuplicationParams, callbackUrls: ICallbackTarget[] @@ -170,7 +179,7 @@ export class CreatePackageManager { return { id: processingJob.id, - taskIds: processingJob.tasks.map((t) => t.id), + taskIds: (processingJob.tasks as unknown as IJobResponse[]).map((t) => t.id), status: OperationStatus.IN_PROGRESS, }; } diff --git a/src/createPackage/models/tasksManager.ts b/src/createPackage/models/tasksManager.ts deleted file mode 100644 index 4100476..0000000 --- a/src/createPackage/models/tasksManager.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { Logger } from '@map-colonies/js-logger'; -import { inject, injectable } from 'tsyringe'; -import { OperationStatus } from '@map-colonies/mc-priority-queue'; -import { NotFoundError } from '@map-colonies/error-types'; -import { SERVICES } from '../../common/constants'; -import { JobManagerWrapper } from '../../clients/jobManagerWrapper'; - -export interface ITaskStatusResponse { - percentage: number | undefined; - status: OperationStatus; -} - -@injectable() -export class TasksManager { - public constructor( - @inject(SERVICES.LOGGER) private readonly logger: Logger, - @inject(JobManagerWrapper) private readonly jobManagerClient: JobManagerWrapper - ) {} - - public async getTaskStatusByJobId(jobId: string): Promise { - this.logger.info(`Getting task status by jobId: ${jobId}`); - const tasks = await this.jobManagerClient.getTasksByJobId(jobId); - - if (tasks.length === 0) { - throw new NotFoundError(`jobId: ${jobId} is not exists`); - } - const task = tasks[0]; - const statusResponse: ITaskStatusResponse = { - percentage: task.percentage, - status: task.status, - }; - return statusResponse; - } -} diff --git a/src/index.ts b/src/index.ts index f164947..9b30e8f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,7 @@ import { Logger } from '@map-colonies/js-logger'; import { container } from 'tsyringe'; import config from 'config'; import { DEFAULT_SERVER_PORT, SERVICES } from './common/constants'; +import { PollingManager, POLLING_MANGER_SYMBOL } from './pollingManager'; import { getApp } from './app'; @@ -22,7 +23,27 @@ const app = getApp(); const logger = container.resolve(SERVICES.LOGGER); const stubHealthcheck = async (): Promise => Promise.resolve(); const server = createTerminus(createServer(app), { healthChecks: { '/liveness': stubHealthcheck, onSignal: container.resolve('onSignal') } }); - +const pollingManager: PollingManager = container.resolve(POLLING_MANGER_SYMBOL); server.listen(port, () => { logger.info(`app started on port ${port}`); }); +const mainPollLoop = async (): Promise => { + const pollingTimout = config.get('pollingTimeoutMS'); + const isRunning = true; + logger.info('running job status poll'); + //eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (isRunning) { + let polledData = false; + try { + polledData = await pollingManager.jobStatusPoll(); + } catch (error) { + logger.error(`mainPollLoop: Error: ${JSON.stringify(error, Object.getOwnPropertyNames(error))}`); + } finally { + if (!polledData) { + await new Promise((resolve) => setTimeout(resolve, pollingTimout)); + } + } + } +}; + +void mainPollLoop(); diff --git a/src/pollingManager.ts b/src/pollingManager.ts new file mode 100644 index 0000000..57a41ee --- /dev/null +++ b/src/pollingManager.ts @@ -0,0 +1,39 @@ +import config from 'config'; +import { inject, singleton } from 'tsyringe'; +import { Logger } from '@map-colonies/js-logger'; +import { SERVICES } from './common/constants'; +import { TasksManager } from './tasks/models/tasksManager'; +import { JobResponse } from './common/interfaces'; + +export const POLLING_MANGER_SYMBOL = Symbol('tasksFactory'); + +@singleton() +export class PollingManager { + private readonly expirationDays: number; + + public constructor(@inject(SERVICES.LOGGER) private readonly logger: Logger, @inject(TasksManager) private readonly taskManager: TasksManager) { + this.expirationDays = config.get('jobManager.expirationDays'); + } + + public async jobStatusPoll(): Promise { + let existsJobs = false; + const jobs = await this.taskManager.getJobsByTaskStatus(); + const expirationDate = new Date(); + expirationDate.setDate(expirationDate.getDate() + this.expirationDays); + if ((jobs.completedJobs?.length ?? 0) > 0) { + existsJobs = true; + this.logger.info(`completed jobs detected, running finalize job`); + for (const job of jobs.completedJobs as JobResponse[]) { + await this.taskManager.finalizeJob(job, expirationDate); + } + } else if ((jobs.failedJobs?.length ?? 0) > 0) { + existsJobs = true; + this.logger.info(`failed jobs detected, updating job status`); + for (const job of jobs.failedJobs as JobResponse[]) { + const gpkgFailedErr = `failed to create gpkg, job: ${job.id}`; + await this.taskManager.finalizeJob(job, expirationDate, false, gpkgFailedErr); + } + } + return existsJobs; + } +} diff --git a/src/serverBuilder.ts b/src/serverBuilder.ts index a0c9571..3deca86 100644 --- a/src/serverBuilder.ts +++ b/src/serverBuilder.ts @@ -10,7 +10,7 @@ import httpLogger from '@map-colonies/express-access-log-middleware'; import { SERVICES } from './common/constants'; import { IConfig } from './common/interfaces'; import { CREATE_PACKAGE_ROUTER_SYMBOL } from './createPackage/routes/createPackageRouter'; -import { TASKS_ROUTER_SYMBOL } from './createPackage/routes/tasksRouter'; +import { TASKS_ROUTER_SYMBOL } from './tasks/routes/tasksRouter'; @injectable() export class ServerBuilder { diff --git a/src/createPackage/controllers/tasksController.ts b/src/tasks/controllers/tasksController.ts similarity index 100% rename from src/createPackage/controllers/tasksController.ts rename to src/tasks/controllers/tasksController.ts diff --git a/src/tasks/models/tasksManager.ts b/src/tasks/models/tasksManager.ts new file mode 100644 index 0000000..bbdb476 --- /dev/null +++ b/src/tasks/models/tasksManager.ts @@ -0,0 +1,132 @@ +import { Logger } from '@map-colonies/js-logger'; +import { inject, injectable } from 'tsyringe'; +import config from 'config'; +import { IUpdateJobBody, OperationStatus } from '@map-colonies/mc-priority-queue'; +import { NotFoundError } from '@map-colonies/error-types'; +import { getGpkgFullPath, getGpkgRelativePath } from '../../common/utils'; +import { SERVICES } from '../../common/constants'; +import { JobManagerWrapper } from '../../clients/jobManagerWrapper'; +import { ICallbackData, ICallbackDataBase, IJobParameters, IJobStatusResponse, JobResponse } from '../../common/interfaces'; +import { CallbackClient } from '../../clients/callbackClient'; +import { getFileSize } from '../../common/utils'; +import { CreatePackageManager } from '../../createPackage/models/createPackageManager'; + +export interface ITaskStatusResponse { + percentage: number | undefined; + status: OperationStatus; +} + +@injectable() +export class TasksManager { + private readonly gpkgsLocation: string; + private readonly downloadServerUrl: string; + public constructor( + @inject(SERVICES.LOGGER) private readonly logger: Logger, + @inject(JobManagerWrapper) private readonly jobManagerClient: JobManagerWrapper, + @inject(CallbackClient) private readonly callbackClient: CallbackClient, + @inject(CreatePackageManager) private readonly packageManager: CreatePackageManager + ) { + this.gpkgsLocation = config.get('gpkgsLocation'); + this.downloadServerUrl = config.get('downloadServerUrl'); + } + + public async getJobsByTaskStatus(): Promise { + const jobs = await this.jobManagerClient.getJobsStatus(); + + const completedJobs = jobs?.filter((job) => job.completedTasks === job.taskCount); + const failedJobs = jobs?.filter((job) => job.failedTasks === job.taskCount); + const jobsStatus = { + completedJobs: completedJobs, + failedJobs: failedJobs, + }; + return jobsStatus; + } + + public async getTaskStatusByJobId(jobId: string): Promise { + const tasks = await this.jobManagerClient.getTasksByJobId(jobId); + + if (tasks.length === 0) { + throw new NotFoundError(`No tasks were found for jobId: ${jobId}`); + } + const task = tasks[0]; + const statusResponse: ITaskStatusResponse = { + percentage: task.percentage, + status: task.status, + }; + return statusResponse; + } + + public async sendCallbacks(job: JobResponse, expirationDate: Date, errorReason?: string): Promise { + let fileUri = ''; + let fileRelativePath = ''; + try { + this.logger.info(`Sending callback for job: ${job.id}`); + const packageName = job.parameters.fileName; + const success = errorReason === undefined; + let fileSize = 0; + if (success) { + fileRelativePath = getGpkgRelativePath(packageName); + const packageFullPath = getGpkgFullPath(this.gpkgsLocation, packageName); + fileUri = `${this.downloadServerUrl}/downloads/${fileRelativePath}`; + fileSize = await getFileSize(packageFullPath); + } + const callbackParams: ICallbackDataBase = { + fileUri, + expirationTime: expirationDate, + fileSize, + dbId: job.internalId as string, + packageName: packageName, + requestId: job.id, + targetResolution: job.parameters.targetResolution, + success, + errorReason, + }; + + const targetCallbacks = job.parameters.callbacks; + const callbackPromises: Promise[] = []; + for (const target of targetCallbacks) { + const params: ICallbackData = { ...callbackParams, bbox: target.bbox }; + callbackPromises.push(this.callbackClient.send(target.url, params)); + } + + const promisesResponse = await Promise.allSettled(callbackPromises); + promisesResponse.forEach((response, index) => { + if (response.status === 'rejected') { + this.logger.error(`Did not send callback to ${targetCallbacks[index].url}, got error: ${JSON.stringify(response.reason)}`); + } + }); + + return callbackParams; + } catch (error) { + this.logger.error(`Sending callback has failed with error: ${JSON.stringify(error, Object.getOwnPropertyNames(error))}`); + } + } + + public async finalizeJob(job: JobResponse, expirationDate: Date, isSuccess = true, reason?: string): Promise { + let updateJobParams: IUpdateJobBody = { + status: isSuccess ? OperationStatus.COMPLETED : OperationStatus.FAILED, + reason, + /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */ + percentage: isSuccess ? 100 : undefined, + expirationDate: expirationDate, + }; + try { + this.logger.info(`Finzaling Job: ${job.id}`); + const packageName = job.parameters.fileName; + if (isSuccess) { + const packageFullPath = getGpkgFullPath(this.gpkgsLocation, packageName); + await this.packageManager.createJsonMetadata(packageFullPath.substr(0, packageFullPath.lastIndexOf('.')), job.internalId as string); + } + const callbackParams = await this.sendCallbacks(job, expirationDate, reason); + updateJobParams = { ...updateJobParams, parameters: { ...job.parameters, callbackParams } }; + + this.logger.info(`Update Job status to success=${String(isSuccess)} jobId=${job.id}`); + await this.jobManagerClient.updateJob(job.id, updateJobParams); + } catch (error) { + this.logger.error(`Could not finalize job: ${job.id} updating failed job status, error: ${(error as Error).message}`); + const callbackParams = await this.sendCallbacks(job, expirationDate, reason); + updateJobParams = { ...updateJobParams, status: OperationStatus.FAILED, parameters: { ...job.parameters, callbackParams } }; + await this.jobManagerClient.updateJob(job.id, updateJobParams); + } + } +} diff --git a/src/createPackage/routes/tasksRouter.ts b/src/tasks/routes/tasksRouter.ts similarity index 85% rename from src/createPackage/routes/tasksRouter.ts rename to src/tasks/routes/tasksRouter.ts index 837bbe5..6ac8851 100644 --- a/src/createPackage/routes/tasksRouter.ts +++ b/src/tasks/routes/tasksRouter.ts @@ -1,6 +1,6 @@ import { Router } from 'express'; import { FactoryFunction } from 'tsyringe'; -import { TasksController } from '../controllers/tasksController'; +import { TasksController } from '../../tasks/controllers/tasksController'; const tasksRouterFactory: FactoryFunction = (dependencyContainer) => { const router = Router(); diff --git a/tests/integration/testContainerConfig.ts b/tests/integration/testContainerConfig.ts new file mode 100644 index 0000000..7459127 --- /dev/null +++ b/tests/integration/testContainerConfig.ts @@ -0,0 +1,25 @@ +import { trace } from '@opentelemetry/api'; +import jsLogger from '@map-colonies/js-logger'; +import { container } from 'tsyringe'; +import { SERVICES } from '../../src/common/constants'; +import { configMock, registerDefaultConfig, getMock, hasMock } from '../mocks/config'; +import { InjectionObject } from '../../src/common/dependencyRegistration'; + +function getContainerConfig(): InjectionObject[] { + registerDefaultConfig(); + return [ + { token: SERVICES.LOGGER, provider: { useValue: jsLogger({ enabled: false }) } }, + { token: SERVICES.CONFIG, provider: { useValue: configMock } }, + { token: SERVICES.TRACER, provider: { useValue: trace.getTracer('testTracer') } }, + ]; +} +const resetContainer = (clearInstances = true): void => { + if (clearInstances) { + container.clearInstances(); + } + + getMock.mockReset(); + hasMock.mockReset(); +}; + +export { getContainerConfig, resetContainer }; diff --git a/tests/mocks/clients/callbackClient.ts b/tests/mocks/clients/callbackClient.ts new file mode 100644 index 0000000..4677b94 --- /dev/null +++ b/tests/mocks/clients/callbackClient.ts @@ -0,0 +1,9 @@ +import { CallbackClient } from '../../../src/clients/callbackClient'; + +const sendMock = jest.fn(); + +const callbackClientMock = { + send: sendMock, +} as unknown as CallbackClient; + +export { callbackClientMock, sendMock }; diff --git a/tests/mocks/clients/catalogManagerClient.ts b/tests/mocks/clients/catalogManagerClient.ts new file mode 100644 index 0000000..aaf095f --- /dev/null +++ b/tests/mocks/clients/catalogManagerClient.ts @@ -0,0 +1,9 @@ +import { RasterCatalogManagerClient } from '../../../src/clients/rasterCatalogManagerClient'; + +const findLayerMock = jest.fn(); + +const catalogManagerMock = { + findLayer: findLayerMock, +} as unknown as RasterCatalogManagerClient; + +export { catalogManagerMock, findLayerMock }; diff --git a/tests/mocks/clients/jobManagerWrapper.ts b/tests/mocks/clients/jobManagerWrapper.ts index f517822..19bfb11 100644 --- a/tests/mocks/clients/jobManagerWrapper.ts +++ b/tests/mocks/clients/jobManagerWrapper.ts @@ -5,6 +5,8 @@ const findInProgressJobMock = jest.fn(); const findPendingJobMock = jest.fn(); const createMock = jest.fn(); const createJobMock = jest.fn(); +const getJobsStatusMock = jest.fn(); +const updateJobMock = jest.fn(); const jobManagerWrapperMock = { createJob: createJobMock, @@ -12,6 +14,17 @@ const jobManagerWrapperMock = { findInProgressJob: findInProgressJobMock, findPendingJob: findPendingJobMock, create: createMock, + getJobsStatus: getJobsStatusMock, + updateJob: updateJobMock, } as unknown as JobManagerWrapper; -export { jobManagerWrapperMock, createMock, createJobMock, findCompletedJobMock, findInProgressJobMock, findPendingJobMock }; +export { + jobManagerWrapperMock, + createMock, + createJobMock, + findCompletedJobMock, + findInProgressJobMock, + findPendingJobMock, + getJobsStatusMock, + updateJobMock, +}; diff --git a/tests/mocks/clients/packageManager.ts b/tests/mocks/clients/packageManager.ts new file mode 100644 index 0000000..9e3fc3e --- /dev/null +++ b/tests/mocks/clients/packageManager.ts @@ -0,0 +1,33 @@ +import { CreatePackageManager } from '../../../src/createPackage/models/createPackageManager'; + +const createPackageMock = jest.fn(); +const createJsonMetadataMock = jest.fn(); +const getSeparatorMock = jest.fn(); +const sanitizeBboxMock = jest.fn(); +const checkForDuplicateMock = jest.fn(); +const checkForCompletedMock = jest.fn(); +const checkForProcessingMock = jest.fn(); +const updateCallbackURLsMock = jest.fn(); + +const packageManagerMock = { + createPackage: createPackageMock, + createJsonMetadata: createJsonMetadataMock, + getSeparator: getSeparatorMock, + sanitizeBbox: sanitizeBboxMock, + checkForDuplicate: checkForDuplicateMock, + checkForCompleted: checkForCompletedMock, + checkForProcessing: checkForProcessingMock, + updateCallbackURLs: updateCallbackURLsMock, +} as unknown as CreatePackageManager; + +export { + packageManagerMock, + createPackageMock, + createJsonMetadataMock, + getSeparatorMock, + sanitizeBboxMock, + checkForDuplicateMock, + checkForCompletedMock, + checkForProcessingMock, + updateCallbackURLsMock, +}; diff --git a/tests/mocks/config.ts b/tests/mocks/config.ts new file mode 100644 index 0000000..89ff231 --- /dev/null +++ b/tests/mocks/config.ts @@ -0,0 +1,75 @@ +import config from 'config'; +import { get, has } from 'lodash'; +import { IConfig } from '../../src/common/interfaces'; + +const getMock = jest.fn(); +const hasMock = jest.fn(); + +const configMock = { + get: getMock, + has: hasMock, +} as unknown as IConfig; + +const setConfigValues = (values: Record): void => { + getMock.mockImplementation((key: string) => { + const value = get(values, key) ?? config.get(key); + return value; + }); + hasMock.mockImplementation((key: string) => has(values, key) || config.has(key)); +}; + +const registerDefaultConfig = (): void => { + const config = { + openapiConfig: { + filePath: './openapi3.yaml', + basePath: '/docs', + rawPath: '/api', + uiPath: '/api', + }, + telemetry: { + logger: { + level: 'info', + prettyPrint: false, + }, + }, + server: { + port: '8080', + request: { + payload: { + limit: '1mb', + }, + }, + response: { + compression: { + enabled: true, + options: null, + }, + }, + }, + httpRetry: { + attempts: 5, + delay: 'exponential', + shouldResetTimeout: true, + }, + jobManager: { + url: 'http://job-manager-job-manager', + expirationDays: 30, + }, + rasterCatalogManager: { + url: 'http://raster-catalog-manager', + }, + workerTypes: { + tiles: { + jobType: 'rasterTilesExporter', + taskType: 'rasterTilesExporter', + }, + }, + tilesProvider: 'S3', + gpkgsLocation: '/app/tiles_outputs/gpkgs', + downloadServerUrl: 'http://download-service', + pollingTimeoutMS: 2000, + }; + setConfigValues(config); +}; + +export { getMock, hasMock, configMock, setConfigValues, registerDefaultConfig }; diff --git a/tests/mocks/data.ts b/tests/mocks/data.ts index 0fff80d..d27f654 100644 --- a/tests/mocks/data.ts +++ b/tests/mocks/data.ts @@ -111,6 +111,7 @@ const completedJob: IJobResponse = { parameters: { crs: 'EPSG:4326', sanitizedBbox: [0, 0, 25, 41], + fileName: 'test.gpkg', zoomLevel: 4, callbacks: [ { @@ -119,7 +120,6 @@ const completedJob: IJobResponse = { }, ], callbackParams: { - bbox: [0, 0, 25, 41], dbId: '0c3e455f-4aeb-4258-982d-f7773469a92d', fileUri: 'http://localhost:4515/downloads/gm_0c3e455f_4aeb_4258_982d_f7773469a92d_4_0_000000_0000025_0000041_00000.gpkg', success: true, @@ -129,8 +129,10 @@ const completedJob: IJobResponse = { expirationTime: new Date(), targetResolution: 0.0439453125, }, + targetResolution: 0.0439453125, }, + status: OperationStatus.COMPLETED, percentage: 100, reason: '', @@ -146,6 +148,7 @@ const completedJob: IJobResponse = { expiredTasks: 0, pendingTasks: 0, inProgressTasks: 0, + abortedTasks: 0, tasks: [ { id: '542ebbfd-f4d1-4c77-bd4d-97ca121f0de7', @@ -172,8 +175,10 @@ const inProgressJob: IJobResponse = { resourceId: 'string', version: '1.0', type: 'rasterTilesExporter', + percentage: 0, description: '', parameters: { + fileName: 'test.gpkg', crs: 'EPSG:4326', sanitizedBbox: [0, 0, 25, 41], zoomLevel: 4, @@ -193,6 +198,7 @@ const inProgressJob: IJobResponse = { failedTasks: 0, expiredTasks: 0, pendingTasks: 0, + abortedTasks: 0, inProgressTasks: 1, tasks: [ { @@ -216,6 +222,7 @@ const inProgressJob: IJobResponse = { }; const workerInput: IWorkerInput = { + fileName: 'test.gpkg', sanitizedBbox: [0, 2.999267578125, 25.0048828125, 41.0009765625], targetResolution: 0.0000429153442382812, zoomLevel: 15, diff --git a/tests/mocks/data/mockJob.ts b/tests/mocks/data/mockJob.ts new file mode 100644 index 0000000..54b729a --- /dev/null +++ b/tests/mocks/data/mockJob.ts @@ -0,0 +1,49 @@ +/* eslint-disable @typescript-eslint/no-magic-numbers */ +import { OperationStatus } from '@map-colonies/mc-priority-queue'; +import { JobResponse } from '../../../src/common/interfaces'; + +export const mockJob: JobResponse = { + id: 'b729f0e0-af64-4c2c-ba4e-e799e2f3df0f', + resourceId: 'test', + version: '1.0', + type: 'rasterTilesExporter', + description: '', + parameters: { + crs: 'EPSG:4326', + fileName: 'test.gpkg', + callbacks: [ + { + url: 'http://example.getmap.com/callback', + bbox: [34.811938017107494, 31.95475033759175, 34.82237261707599, 31.96426962177354], + }, + { + url: 'http://example.getmap.com/callback2', + bbox: [34.811938017107494, 31.95475033759175, 34.82237261707599, 31.96426962177354], + }, + ], + zoomLevel: 3, + sanitizedBbox: [0, -90, 180, 90], + targetResolution: 0.072, + }, + status: OperationStatus.IN_PROGRESS, + percentage: 100, + reason: '', + isCleaned: false, + priority: 0, + expirationDate: new Date(), + internalId: '880a9316-0f10-4874-92e2-a62d587a1169', + producerName: undefined, + productName: 'test', + productType: 'Orthophoto', + additionalIdentifiers: '0,-90,180,903', + taskCount: 1, + completedTasks: 0, + failedTasks: 0, + expiredTasks: 0, + pendingTasks: 0, + inProgressTasks: 0, + abortedTasks: 0, + tasks: [], + created: '2022-08-29T07:06:05.043Z', + updated: '2022-08-29T07:13:05.206Z', +}; diff --git a/tests/unit/clients/jobManagerClient.spec.ts b/tests/unit/clients/jobManagerClient.spec.ts index 170bbf6..932ada0 100644 --- a/tests/unit/clients/jobManagerClient.spec.ts +++ b/tests/unit/clients/jobManagerClient.spec.ts @@ -1,7 +1,8 @@ import jsLogger from '@map-colonies/js-logger'; import { OperationStatus } from '@map-colonies/mc-priority-queue'; import { JobManagerWrapper } from '../../../src/clients/jobManagerWrapper'; -import { jobs, workerInput } from '../../mocks/data'; +import { JobResponse } from '../../../src/common/interfaces'; +import { inProgressJob, jobs, workerInput } from '../../mocks/data'; let jobManagerClient: JobManagerWrapper; let postFun: jest.Mock; @@ -73,5 +74,19 @@ describe('JobManagerClient', () => { expect(getJobs).toHaveBeenCalledTimes(1); expect(completedJobs).toBeDefined(); }); + + it('should get In-Progress jobs status successfully', async () => { + getJobs = jest.fn(); + const jobs: JobResponse[] = []; + jobs.push(inProgressJob); + const jobManager = jobManagerClient as unknown as { getJobs: unknown }; + jobManager.getJobs = getJobs.mockResolvedValue(jobs); + + const result = await jobManagerClient.getJobsStatus(); + + expect(getJobs).toHaveBeenCalledTimes(1); + expect(result).toBeDefined(); + expect(result).toEqual(jobs); + }); }); }); diff --git a/tests/unit/createPackage/models/createPackageModel.spec.ts b/tests/unit/createPackage/models/createPackageModel.spec.ts index 31bdd16..baa0eed 100644 --- a/tests/unit/createPackage/models/createPackageModel.spec.ts +++ b/tests/unit/createPackage/models/createPackageModel.spec.ts @@ -1,36 +1,26 @@ import { BadRequestError } from '@map-colonies/error-types'; import jsLogger from '@map-colonies/js-logger'; -import { OperationStatus } from '@map-colonies/mc-priority-queue'; -import { JobManagerWrapper } from '../../../../src/clients/jobManagerWrapper'; -import { RasterCatalogManagerClient } from '../../../../src/clients/rasterCatalogManagerClient'; -import { ICreateJobResponse, ICreatePackage, JobDuplicationParams } from '../../../../src/common/interfaces'; +import { IJobResponse, OperationStatus } from '@map-colonies/mc-priority-queue'; +import { BBox2d } from '@turf/helpers/dist/js/lib/geojson'; +import { + jobManagerWrapperMock, + findCompletedJobMock, + findInProgressJobMock, + findPendingJobMock, + updateJobMock, + createMock, +} from '../../../mocks/clients/jobManagerWrapper'; +import { catalogManagerMock, findLayerMock } from '../../../mocks/clients/catalogManagerClient'; +import { ICreateJobResponse, ICreatePackage, IJobParameters, ITaskParameters, JobDuplicationParams } from '../../../../src/common/interfaces'; import { CreatePackageManager } from '../../../../src/createPackage/models/createPackageManager'; import { inProgressJob, layerFromCatalog, userInput } from '../../../mocks/data'; let createPackageManager: CreatePackageManager; -let jobManagerWrapperMock: JobManagerWrapper; -let rasterCatalogManagerClientMock: RasterCatalogManagerClient; -const findLayerStub = jest.fn(); -const createJobStub = jest.fn(); -const updateJobStub = jest.fn(); -const findCompletedJobMock = jest.fn(); -const findInProgressJobMock = jest.fn(); -const findPendingJobMock = jest.fn(); describe('CreatePackageManager', () => { beforeEach(() => { const logger = jsLogger({ enabled: false }); - rasterCatalogManagerClientMock = { - findLayer: findLayerStub, - } as unknown as RasterCatalogManagerClient; - jobManagerWrapperMock = { - create: createJobStub, - findCompletedJob: findCompletedJobMock, - findInProgressJob: findInProgressJobMock, - findPendingJob: findPendingJobMock, - updateJob: updateJobStub, - } as unknown as JobManagerWrapper; - createPackageManager = new CreatePackageManager(logger, jobManagerWrapperMock, rasterCatalogManagerClientMock); + createPackageManager = new CreatePackageManager(logger, jobManagerWrapperMock, catalogManagerMock); }); afterEach(() => { @@ -47,118 +37,118 @@ describe('CreatePackageManager', () => { targetResolution: 0.0439453125, crs: 'EPSG:4326', }; + + const expectedsanitizedBbox: BBox2d = [0, -90, 180, 90]; const jobDupParams: JobDuplicationParams = { - resourceId: 'temp_resourceId', - version: 'temp_version', + resourceId: 'string', + version: '1.0', dbId: layerFromCatalog.id, zoomLevel: 4, - sanitizedBbox: [0, 1, 3, 5], + sanitizedBbox: expectedsanitizedBbox, crs: 'EPSG:4326', }; - findLayerStub.mockResolvedValue(layerFromCatalog); - createJobStub.mockResolvedValue({ + const expectedCreateJobResponse = { jobId: '09e29fa8-7283-4334-b3a4-99f75922de59', taskIds: ['66aa1e2e-784c-4178-b5a0-af962937d561'], status: OperationStatus.IN_PROGRESS, - }); + }; + findLayerMock.mockResolvedValue(layerFromCatalog); + createMock.mockResolvedValue(expectedCreateJobResponse); findCompletedJobMock.mockResolvedValue(undefined); findInProgressJobMock.mockResolvedValue(undefined); findPendingJobMock.mockResolvedValue(undefined); - // eslint-disable-next-line - const checkForDuplicateResponse = await (createPackageManager as unknown as { checkForDuplicate: any }).checkForDuplicate(jobDupParams, []); - - await createPackageManager.createPackage(req); + const res = await createPackageManager.createPackage(req); - expect(checkForDuplicateResponse).toBeUndefined(); - expect(findLayerStub).toHaveBeenCalledTimes(1); - expect(createJobStub).toHaveBeenCalledTimes(1); - expect(findCompletedJobMock).toHaveBeenCalledTimes(2); - expect(findInProgressJobMock).toHaveBeenCalledTimes(2); + expect(res).toEqual(expectedCreateJobResponse); + expect(findLayerMock).toHaveBeenCalledWith(req.dbId); + expect(findLayerMock).toHaveBeenCalledTimes(1); + expect(createMock).toHaveBeenCalledTimes(1); + expect(findCompletedJobMock).toHaveBeenCalledWith(jobDupParams); + expect(findCompletedJobMock).toHaveBeenCalledTimes(1); + expect(findInProgressJobMock).toHaveBeenCalledWith(jobDupParams); + expect(findInProgressJobMock).toHaveBeenCalledTimes(1); }); it('should return job and task-ids of existing in pending job', async () => { + const expectedsanitizedBbox: BBox2d = [0, 2.8125, 25.3125, 42.1875]; const jobDupParams: JobDuplicationParams = { resourceId: layerFromCatalog.metadata.productId as string, version: layerFromCatalog.metadata.productVersion as string, dbId: layerFromCatalog.id, - zoomLevel: 4, - sanitizedBbox: userInput.bbox, + zoomLevel: 7, + sanitizedBbox: expectedsanitizedBbox, crs: userInput.crs as string, }; - findLayerStub.mockResolvedValue(layerFromCatalog); - createJobStub.mockResolvedValue(undefined); - - updateJobStub.mockResolvedValue(undefined); + findLayerMock.mockResolvedValue(layerFromCatalog); + createMock.mockResolvedValue(undefined); + updateJobMock.mockResolvedValue(undefined); findCompletedJobMock.mockResolvedValue(undefined); findInProgressJobMock.mockResolvedValue(undefined); findPendingJobMock.mockResolvedValue(inProgressJob); - // eslint-disable-next-line - const checkForDuplicateResponse = await (createPackageManager as unknown as { checkForDuplicate: any }).checkForDuplicate(jobDupParams, []); - await createPackageManager.createPackage(userInput); - const expectedReturn: ICreateJobResponse = { - id: inProgressJob.id, - taskIds: [inProgressJob.tasks[0].id], - status: OperationStatus.IN_PROGRESS, - }; - - expect(findLayerStub).toHaveBeenCalledTimes(1); - expect(createJobStub).toHaveBeenCalledTimes(0); - expect(checkForDuplicateResponse).toEqual(expectedReturn); - expect(findCompletedJobMock).toHaveBeenCalledTimes(4); - expect(findInProgressJobMock).toHaveBeenCalledTimes(2); - expect(findPendingJobMock).toHaveBeenCalledTimes(2); + expect(findLayerMock).toHaveBeenCalledWith(layerFromCatalog.id); + expect(findLayerMock).toHaveBeenCalledTimes(1); + expect(createMock).toHaveBeenCalledTimes(0); + expect(findCompletedJobMock).toHaveBeenNthCalledWith(1, jobDupParams); + expect(findCompletedJobMock).toHaveBeenNthCalledWith(2, jobDupParams); + expect(findCompletedJobMock).toHaveBeenCalledTimes(2); + expect(findInProgressJobMock).toHaveBeenCalledWith(jobDupParams); + expect(findInProgressJobMock).toHaveBeenCalledTimes(1); + expect(findPendingJobMock).toHaveBeenCalledWith(jobDupParams); + expect(findPendingJobMock).toHaveBeenCalledTimes(1); }); it('should return job and task-ids of existing in progress job', async () => { + const expectedsanitizedBbox: BBox2d = [0, 2.8125, 25.3125, 42.1875]; const jobDupParams: JobDuplicationParams = { resourceId: layerFromCatalog.metadata.productId as string, version: layerFromCatalog.metadata.productVersion as string, dbId: layerFromCatalog.id, - zoomLevel: 4, - sanitizedBbox: userInput.bbox, + zoomLevel: 7, + sanitizedBbox: expectedsanitizedBbox, crs: userInput.crs as string, }; - findLayerStub.mockResolvedValue(layerFromCatalog); - createJobStub.mockResolvedValue(undefined); - - updateJobStub.mockResolvedValue(undefined); + findLayerMock.mockResolvedValue(layerFromCatalog); + createMock.mockResolvedValue(undefined); + updateJobMock.mockResolvedValue(undefined); findCompletedJobMock.mockResolvedValue(undefined); findInProgressJobMock.mockResolvedValue(inProgressJob); - // eslint-disable-next-line - const checkForDuplicateResponse = await (createPackageManager as unknown as { checkForDuplicate: any }).checkForDuplicate(jobDupParams, []); - - await createPackageManager.createPackage(userInput); + const res = await createPackageManager.createPackage(userInput); const expectedReturn: ICreateJobResponse = { id: inProgressJob.id, - taskIds: [inProgressJob.tasks[0].id], + taskIds: [(inProgressJob.tasks as unknown as IJobResponse[])[0].id], status: OperationStatus.IN_PROGRESS, }; - expect(findLayerStub).toHaveBeenCalledTimes(1); - expect(createJobStub).toHaveBeenCalledTimes(0); - expect(checkForDuplicateResponse).toEqual(expectedReturn); - - expect(findCompletedJobMock).toHaveBeenCalledTimes(4); - expect(findInProgressJobMock).toHaveBeenCalledTimes(2); + expect(res).toEqual(expectedReturn); + expect(findLayerMock).toHaveBeenCalledWith(jobDupParams.dbId); + expect(findLayerMock).toHaveBeenCalledTimes(1); + expect(createMock).toHaveBeenCalledTimes(0); + expect(findCompletedJobMock).toHaveBeenNthCalledWith(1, jobDupParams); + expect(findCompletedJobMock).toHaveBeenNthCalledWith(2, jobDupParams); + expect(findCompletedJobMock).toHaveBeenCalledTimes(2); + expect(findInProgressJobMock).toHaveBeenCalledWith(jobDupParams); + expect(findInProgressJobMock).toHaveBeenCalledTimes(1); expect(findPendingJobMock).toHaveBeenCalledTimes(0); }); it('should throw bad request error when requested resolution is higher then the layer resolution', async () => { const layer = { ...layerFromCatalog, metadata: { ...layerFromCatalog.metadata, maxResolutionDeg: 0.072 } }; - findLayerStub.mockResolvedValue(layer); + findLayerMock.mockResolvedValue(layer); const action = async () => createPackageManager.createPackage(userInput); await expect(action).rejects.toThrow(BadRequestError); + expect(findLayerMock).toHaveBeenCalledTimes(1); + expect(findLayerMock).toHaveBeenCalledWith(layer.id); }); }); }); diff --git a/tests/unit/createPackage/models/tasksModel.spec.ts b/tests/unit/createPackage/models/tasksModel.spec.ts index 9ed9a27..eb5ab6d 100644 --- a/tests/unit/createPackage/models/tasksModel.spec.ts +++ b/tests/unit/createPackage/models/tasksModel.spec.ts @@ -1,19 +1,23 @@ import jsLogger from '@map-colonies/js-logger'; import { OperationStatus } from '@map-colonies/mc-priority-queue'; import { NotFoundError } from '@map-colonies/error-types'; -import { JobManagerWrapper } from '../../../../src/clients/jobManagerWrapper'; -import { ITaskStatusResponse, TasksManager } from '../../../../src/createPackage/models/tasksManager'; -import { ITaskParameters, TaskResponse } from '../../../../src/common/interfaces'; +import { ITaskStatusResponse, TasksManager } from '../../../../src/tasks/models/tasksManager'; +import { ICallbackDataBase, ITaskParameters, JobResponse, TaskResponse } from '../../../../src/common/interfaces'; +import { registerDefaultConfig } from '../../../mocks/config'; +import { callbackClientMock, sendMock } from '../../../mocks/clients/callbackClient'; +import { createJsonMetadataMock, packageManagerMock } from '../../../mocks/clients/packageManager'; +import { jobManagerWrapperMock, getJobsStatusMock, updateJobMock } from '../../../mocks/clients/jobManagerWrapper'; +import { mockJob } from '../../../mocks/data/mockJob'; +import * as utils from '../../../../src/common/utils'; -let jobManagerWrapper: JobManagerWrapper; let tasksManager: TasksManager; let getTasksByJobIdStub: jest.Mock; describe('TasksManager', () => { beforeEach(() => { const logger = jsLogger({ enabled: false }); - jobManagerWrapper = new JobManagerWrapper(logger); - tasksManager = new TasksManager(logger, jobManagerWrapper); + registerDefaultConfig(); + tasksManager = new TasksManager(logger, jobManagerWrapperMock, callbackClientMock, packageManagerMock); }); afterEach(() => { @@ -26,7 +30,7 @@ describe('TasksManager', () => { const emptyTasksResponse: TaskResponse[] = []; getTasksByJobIdStub = jest.fn(); - jobManagerWrapper.getTasksByJobId = getTasksByJobIdStub.mockResolvedValue(emptyTasksResponse); + jobManagerWrapperMock.getTasksByJobId = getTasksByJobIdStub.mockResolvedValue(emptyTasksResponse); const action = async () => tasksManager.getTaskStatusByJobId('09e29fa8-7283-4334-b3a4-99f75922de59'); @@ -53,7 +57,7 @@ describe('TasksManager', () => { ]; getTasksByJobIdStub = jest.fn(); - jobManagerWrapper.getTasksByJobId = getTasksByJobIdStub.mockResolvedValue(tasksResponse); + jobManagerWrapperMock.getTasksByJobId = getTasksByJobIdStub.mockResolvedValue(tasksResponse); const result = tasksManager.getTaskStatusByJobId('0a5552f7-01eb-40af-a912-eed8fa9e1568'); const expectedResult: ITaskStatusResponse = { @@ -65,4 +69,193 @@ describe('TasksManager', () => { expect(getTasksByJobIdStub).toHaveBeenCalledTimes(1); }); }); + describe('#getJobsByTaskStatus', () => { + it('should return completed job with no failed jobs', async () => { + const jobs: JobResponse[] = []; + const completedMockJob = { ...mockJob, completedTasks: 1 }; + jobs.push(completedMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(1); + expect(jobsStatus.failedJobs?.length).toBe(0); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return failed job with no completed jobs', async () => { + const jobs: JobResponse[] = []; + const failedMockJob = { ...mockJob, failedTasks: 1 }; + jobs.push(failedMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(0); + expect(jobsStatus.failedJobs?.length).toBe(1); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return completed job and failed job', async () => { + const jobs: JobResponse[] = []; + const completedMockJob = { ...mockJob, completedTasks: 1 }; + const failedMockJob = { ...mockJob, failedTasks: 1 }; + jobs.push(completedMockJob, failedMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(1); + expect(jobsStatus.failedJobs?.length).toBe(1); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return an empty jobs response if task is in progress', async () => { + const jobs: JobResponse[] = []; + + const inProgressMockJob = { ...mockJob, inProgressTasks: 1 }; + jobs.push(inProgressMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(0); + expect(jobsStatus.failedJobs?.length).toBe(0); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return an empty jobs response if task is in pending', async () => { + const jobs: JobResponse[] = []; + const pendingMockJob = { ...mockJob, pendingTasks: 1 }; + jobs.push(pendingMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(0); + expect(jobsStatus.failedJobs?.length).toBe(0); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return an empty jobs response if task is in expired', async () => { + const jobs: JobResponse[] = []; + const expiredMockJob = { ...mockJob, expiredTasks: 1 }; + jobs.push(expiredMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(0); + expect(jobsStatus.failedJobs?.length).toBe(0); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + + it('should return an empty jobs response if task is in aborted', async () => { + const jobs: JobResponse[] = []; + const abortedMockJob = { ...mockJob, abortedTasks: 1 }; + jobs.push(abortedMockJob); + getJobsStatusMock.mockResolvedValue(jobs); + + const jobsStatus = await tasksManager.getJobsByTaskStatus(); + + expect(jobsStatus.completedJobs?.length).toBe(0); + expect(jobsStatus.failedJobs?.length).toBe(0); + expect(getJobsStatusMock).toHaveBeenCalledTimes(1); + }); + }); + + describe('#sendCallbacks', () => { + it('should return callback data with the expected params for success jobs', async () => { + sendMock.mockResolvedValue(200); + const getFileSizeSpy = jest.spyOn(utils, 'getFileSize'); + getFileSizeSpy.mockResolvedValue(2000); + const expirationTime = new Date(); + const expectedCallbackData: ICallbackDataBase = { + fileUri: 'http://download-service/downloads/test/test.gpkg', + expirationTime: expirationTime, + fileSize: 2000, + dbId: '880a9316-0f10-4874-92e2-a62d587a1169', + packageName: 'test.gpkg', + requestId: 'b729f0e0-af64-4c2c-ba4e-e799e2f3df0f', + targetResolution: 0.072, + success: true, + errorReason: undefined, + }; + + const callbackData = await tasksManager.sendCallbacks(mockJob, expirationTime); + expect(callbackData).toEqual(expectedCallbackData); + }); + + it('should return callback data with the expected params for failed jobs', async () => { + sendMock.mockResolvedValue(200); + const getFileSizeSpy = jest.spyOn(utils, 'getFileSize'); + getFileSizeSpy.mockResolvedValue(2000); + const expirationTime = new Date(); + const errMessage = 'gpkg failed to create'; + const expectedCallbackData: ICallbackDataBase = { + fileUri: '', + expirationTime: expirationTime, + fileSize: 0, + dbId: '880a9316-0f10-4874-92e2-a62d587a1169', + packageName: 'test.gpkg', + requestId: 'b729f0e0-af64-4c2c-ba4e-e799e2f3df0f', + targetResolution: 0.072, + success: false, + errorReason: errMessage, + }; + const callbackData = await tasksManager.sendCallbacks(mockJob, expirationTime, errMessage); + expect(callbackData).toEqual(expectedCallbackData); + }); + + it('should return callback data even if callback response got rejected', async () => { + sendMock.mockRejectedValue({}); + const expirationTime = new Date(); + + const action = async () => tasksManager.sendCallbacks(mockJob, expirationTime); + await expect(action()).resolves.not.toThrow(); + }); + }); + + describe('#finalizeJob', () => { + let sendCallbacksSpy: jest.SpyInstance; + + it('should successfuly finalize a job with status completed', async () => { + const expirationTime = new Date(); + createJsonMetadataMock.mockResolvedValue({}); + updateJobMock.mockResolvedValue({}); + sendCallbacksSpy = jest.spyOn(tasksManager, 'sendCallbacks'); + + const action = async () => tasksManager.finalizeJob(mockJob, expirationTime); + await expect(action()).resolves.not.toThrow(); + expect(createJsonMetadataMock).toHaveBeenCalledTimes(1); + expect(sendCallbacksSpy).toHaveBeenCalledTimes(1); + expect(updateJobMock).toHaveBeenCalledTimes(1); + }); + + it('should successfuly finalize a job with status failed due to error while create json metadata file', async () => { + const expirationTime = new Date(); + createJsonMetadataMock.mockRejectedValue({}); + updateJobMock.mockResolvedValue({}); + sendCallbacksSpy = jest.spyOn(tasksManager, 'sendCallbacks'); + + const action = async () => tasksManager.finalizeJob(mockJob, expirationTime); + await expect(action()).resolves.not.toThrow(); + expect(createJsonMetadataMock).toHaveBeenCalledTimes(1); + expect(sendCallbacksSpy).toHaveBeenCalledTimes(1); + expect(updateJobMock).toHaveBeenCalledTimes(1); + }); + + it('should successfuly finalize a job with job status failed without create json metadata file due to failed in task', async () => { + const expirationTime = new Date(); + updateJobMock.mockResolvedValue({}); + sendCallbacksSpy = jest.spyOn(tasksManager, 'sendCallbacks'); + + const errMessage = 'gpkg failed to create'; + const action = async () => tasksManager.finalizeJob(mockJob, expirationTime, false, errMessage); + await expect(action()).resolves.not.toThrow(); + expect(createJsonMetadataMock).toHaveBeenCalledTimes(0); + expect(sendCallbacksSpy).toHaveBeenCalledTimes(1); + expect(updateJobMock).toHaveBeenCalledTimes(1); + }); + }); });