Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: API changes for integration + helm improvement (MAPCO-2931) #62

Merged
merged 4 commits into from
Mar 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ spec:
- name: http
containerPort: {{ .Values.env.targetPort }}
protocol: TCP
imagePullPolicy: {{ .Values.imagePullPolicy }}
livenessProbe:
initialDelaySeconds: {{ .Values.initialDelaySeconds }}
httpGet:
Expand Down
1 change: 1 addition & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ replicaCount: 1
initialDelaySeconds: 60
nodePort: 30030 #for minikube deployment only
resetOnConfigChange: true
imagePullPolicy: Always

cloudProvider:
dockerRegistryUrl: my-registry-url.io
Expand Down
29 changes: 25 additions & 4 deletions openapi3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ paths:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/createGpkgJobResponse'
- $ref: '#/components/schemas/createRoiGpkgJobResponse'
- $ref: '#/components/schemas/exportNaiveCacheJobResponse'
discriminator:
propertyName: response
Expand Down Expand Up @@ -258,9 +258,11 @@ components:
description: The priority of the record. Maximum priority = most urgent.
minimum: 0
maximum: 999999999
description:
type: string
description: free test to describe the requested export
required:
- dbId
- callbackURLs
example:
dbId: ef03ca54-c68e-4ca8-8432-50ae5ad7a7f8
roi:
Expand All @@ -287,6 +289,7 @@ components:
- http://example.getmap.com/callback2
crs: EPSG:4326
priority: 0
description: This is roi exporting example
createGpkgJobResponse:
allOf:
- $ref: '#/components/schemas/CommonResponse'
Expand All @@ -303,6 +306,22 @@ components:
required:
- id
- taskIds
createRoiGpkgJobResponse:
allOf:
- $ref: '#/components/schemas/CommonResponse'
type: object
properties:
jobId:
type: string
format: uuid
taskIds:
type: array
items:
type: string
format: uuid
required:
- jobId
- taskIds
asafMasa marked this conversation as resolved.
Show resolved Hide resolved
naiveCacheJobResponse:
allOf:
- $ref: '#/components/schemas/CommonResponse'
Expand Down Expand Up @@ -361,9 +380,11 @@ components:
recordCatalogId:
type: string
format: uuid
description:
type: string
roi:
$ref: '#/components/schemas/FeatureCollection'
requestJobId:
jobId:
type: string
format: uuid
required:
Expand All @@ -372,7 +393,7 @@ components:
- fileSize
- recordCatalogId
- roi
- requestJobId
- jobId
- status
error:
type: object
Expand Down
8 changes: 5 additions & 3 deletions src/clients/jobManagerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
CreateJobBody,
ExportVersion,
ICreateJobResponse,
ICreateExportJobResponse,
IJobExportParameters,
IJobParameters,
ITaskParameters,
Expand Down Expand Up @@ -93,7 +94,7 @@ export class JobManagerWrapper extends JobManagerClient {
};
}

public async createExport(data: IWorkerExportInput): Promise<ICreateJobResponse> {
public async createExport(data: IWorkerExportInput): Promise<ICreateExportJobResponse> {
const expirationDate = new Date();
expirationDate.setDate(expirationDate.getDate() + this.expirationDays);

Expand All @@ -117,6 +118,7 @@ export class JobManagerWrapper extends JobManagerClient {
productType: data.productType,
productName: data.cswProductId,
priority: data.priority,
description: data.description,
status: OperationStatus.IN_PROGRESS,
additionalIdentifiers: data.relativeDirectoryPath,
tasks: [
Expand All @@ -130,8 +132,8 @@ export class JobManagerWrapper extends JobManagerClient {
],
};
const res = await this.createJob<IJobExportParameters, ITaskParameters>(createJobRequest);
const createJobResponse: ICreateJobResponse = {
id: res.id,
const createJobResponse: ICreateExportJobResponse = {
jobId: res.id,
taskIds: res.taskIds,
status: OperationStatus.IN_PROGRESS,
};
Expand Down
21 changes: 17 additions & 4 deletions src/common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ export interface OpenApiConfig {

export interface IBaseCreatePackage {
dbId: string;
callbackURLs: string[];
crs?: string;
priority?: number;
}
Expand All @@ -32,10 +31,13 @@ export interface ICleanupData {
export interface ICreatePackage extends IBaseCreatePackage {
targetResolution?: number;
bbox?: BBox | Polygon | MultiPolygon;
callbackURLs: string[];
}

export interface ICreatePackageRoi extends IBaseCreatePackage {
roi?: FeatureCollection;
callbackURLs?: string[];
description?: string;
}

export interface ICallbackBase {
Expand Down Expand Up @@ -79,21 +81,31 @@ export interface IWorkerInput extends IWorkerInputBase {
}

export interface IWorkerExportInput extends IWorkerInputBase {
callbacks: ICallbackTargetExport[];
callbacks?: ICallbackTargetExport[];
roi: FeatureCollection;
fileNamesTemplates: ILinkDefinition;
description?: string;
}

export interface IBasicResponse {
message: string;
}

/**
* @deprecated GetMap API - will be deprecated on future
*/
export interface ICreateJobResponse {
id: string;
taskIds: string[];
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

export interface ICreateExportJobResponse {
jobId: string;
taskIds: string[];
status: OperationStatus.IN_PROGRESS | OperationStatus.COMPLETED;
}

/**
* @deprecated GetMap API - will be deprecated on future
*/
Expand All @@ -114,8 +126,9 @@ export interface ICallbackDataExportBase {
expirationTime: Date;
fileSize: number;
recordCatalogId: string;
requestJobId: string;
jobId: string;
errorReason?: string;
description?: string;
}

/**
Expand Down Expand Up @@ -187,7 +200,7 @@ export interface IJobExportParameters {
crs: string;
exportVersion: ExportVersion;
roi: FeatureCollection;
callbacks: ICallbackTargetExport[];
callbacks?: ICallbackTargetExport[];
callbackParams?: ICallbackExportResponse;
fileNamesTemplates: ILinkDefinition;
gpkgEstimatedSize?: number;
Expand Down
5 changes: 3 additions & 2 deletions src/createPackage/controllers/createPackageController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import {
ICallbackResposne,
ICreatePackageRoi,
ICallbackExportResponse,
ICreateExportJobResponse,
} from '../../common/interfaces';

type CreatePackageHandler = RequestHandler<
undefined,
IBasicResponse | ICreateJobResponse | ICallbackResposne | ICallbackExportResponse,
IBasicResponse | ICreateJobResponse | ICreateExportJobResponse | ICallbackResposne | ICallbackExportResponse,
ICreatePackage | ICreatePackageRoi
>;

Expand All @@ -29,7 +30,7 @@ export class CreatePackageController {
) {}

public create: CreatePackageHandler = async (req, res, next) => {
const userInput: ICreatePackage = req.body;
const userInput: ICreatePackage = req.body as ICreatePackage;
try {
this.logger.debug(userInput, `Creating package with user input`);
const jobCreated = await this.manager.createPackage(userInput);
Expand Down
58 changes: 33 additions & 25 deletions src/createPackage/models/createPackageManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
ICallbackExportResponse,
ICallbackTargetExport,
IConfig,
ICreateExportJobResponse,
ICreatePackageRoi,
IGeometryRecord,
IJobExportParameters,
Expand Down Expand Up @@ -206,8 +207,8 @@ export class CreatePackageManager {
return jobCreated;
}

public async createPackageRoi(userInput: ICreatePackageRoi): Promise<ICreateJobResponse | ICallbackExportResponse> {
const { dbId, crs, priority, callbackURLs } = userInput;
public async createPackageRoi(userInput: ICreatePackageRoi): Promise<ICreateExportJobResponse | ICallbackExportResponse> {
const { dbId, crs, priority, callbackURLs, description } = userInput;
let roi = userInput.roi;
const layer = await this.rasterCatalogManager.findLayer(userInput.dbId);
const layerMetadata = layer.metadata;
Expand Down Expand Up @@ -267,20 +268,20 @@ export class CreatePackageManager {
crs: crs ?? DEFAULT_CRS,
};

const callbacks = callbackURLs.map((url) => <ICallbackTargetExport>{ url, roi });
const callbacks = callbackURLs ? callbackURLs.map((url) => <ICallbackTargetExport>{ url, roi }) : undefined;
const duplicationExist = await this.checkForExportDuplicate(dupParams, callbacks);
if (duplicationExist && duplicationExist.status === OperationStatus.COMPLETED) {
const callbackParam = duplicationExist as ICallbackExportResponse;
this.logger.info({
jobStatus: callbackParam.status,
jobId: callbackParam.requestJobId,
jobId: callbackParam.jobId,
catalogId: callbackParam.recordCatalogId,
msg: `Found relevant cache for export request`,
});
return duplicationExist;
} else if (duplicationExist) {
const jobResponse = duplicationExist as ICreateJobResponse;
this.logger.info({ jobId: jobResponse.id, status: jobResponse.status, msg: `Found exists relevant In-Progress job for export request` });
const jobResponse = duplicationExist as ICreateExportJobResponse;
this.logger.info({ jobId: jobResponse.jobId, status: jobResponse.status, msg: `Found exists relevant In-Progress job for export request` });
return duplicationExist;
}

Expand Down Expand Up @@ -358,6 +359,7 @@ export class CreatePackageManager {
priority: priority ?? DEFAULT_PRIORITY,
callbacks: callbacks,
gpkgEstimatedSize: estimatesGpkgSize,
description,
};
const jobCreated = await this.jobManagerClient.createExport(workerInput);
return jobCreated;
Expand Down Expand Up @@ -610,8 +612,8 @@ export class CreatePackageManager {

private async checkForExportDuplicate(
dupParams: JobExportDuplicationParams,
callbackUrls: ICallbackTargetExport[]
): Promise<ICallbackExportResponse | ICreateJobResponse | undefined> {
callbackUrls: ICallbackTargetExport[] | undefined
asafMasa marked this conversation as resolved.
Show resolved Hide resolved
): Promise<ICallbackExportResponse | ICreateExportJobResponse | undefined> {
let completedExists = await this.checkForExportCompleted(dupParams);
if (completedExists) {
return completedExists;
Expand Down Expand Up @@ -675,16 +677,18 @@ export class CreatePackageManager {

private async checkForExportProcessing(
dupParams: JobExportDuplicationParams,
newCallbacks: ICallbackTargetExport[]
): Promise<ICreateJobResponse | undefined> {
newCallbacks: ICallbackTargetExport[] | undefined
asafMasa marked this conversation as resolved.
Show resolved Hide resolved
): Promise<ICreateExportJobResponse | undefined> {
this.logger.info({ ...dupParams, roi: undefined, msg: `Checking for PROCESSING duplications with parameters` });
const processingJob =
(await this.jobManagerClient.findExportJob(OperationStatus.IN_PROGRESS, dupParams, true)) ??
(await this.jobManagerClient.findExportJob(OperationStatus.PENDING, dupParams, true));
if (processingJob) {
await this.updateExportCallbackURLs(processingJob, newCallbacks);
if (newCallbacks) {
await this.updateExportCallbackURLs(processingJob, newCallbacks);
asafMasa marked this conversation as resolved.
Show resolved Hide resolved
}
return {
id: processingJob.id,
jobId: processingJob.id,
taskIds: (processingJob.tasks as unknown as IJobResponse<IJobExportParameters, ITaskParameters>[]).map((t) => t.id),
status: OperationStatus.IN_PROGRESS,
};
Expand Down Expand Up @@ -730,20 +734,24 @@ export class CreatePackageManager {
}

private async updateExportCallbackURLs(processingJob: JobExportResponse, newCallbacks: ICallbackTargetExport[]): Promise<void> {
const callbacks = processingJob.parameters.callbacks;
for (const newCallback of newCallbacks) {
const hasCallback = callbacks.findIndex((callback) => {
const exist = callback.url === newCallback.url;
if (!exist) {
return false;
}
if (!processingJob.parameters.callbacks) {
processingJob.parameters.callbacks = newCallbacks;
} else {
const callbacks = processingJob.parameters.callbacks;
for (const newCallback of newCallbacks) {
const hasCallback = callbacks.findIndex((callback) => {
const exist = callback.url === newCallback.url;
if (!exist) {
return false;
}

const sameROI = featureCollectionBooleanEqual(callback.roi, newCallback.roi);
return sameROI;
});
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
if (hasCallback === -1) {
callbacks.push(newCallback);
const sameROI = featureCollectionBooleanEqual(callback.roi, newCallback.roi);
return sameROI;
});
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
if (hasCallback === -1) {
callbacks.push(newCallback);
}
}
}
await this.jobManagerClient.updateJob<IJobExportParameters>(processingJob.id, {
Expand Down
29 changes: 16 additions & 13 deletions src/tasks/models/tasksManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,21 @@ export class TasksManager {
try {
this.logger.info({ jobId: job.id, callbacks: job.parameters.callbacks, msg: `Sending callback for job: ${job.id}` });
const targetCallbacks = job.parameters.callbacks;
const callbackPromises: Promise<void>[] = [];
for (const target of targetCallbacks) {
const params: ICallbackExportData = { ...callbackParams, roi: job.parameters.roi };
callbackPromises.push(this.callbackClient.send(target.url, params));
}

const promisesResponse = await Promise.allSettled(callbackPromises);
promisesResponse.forEach((response, index) => {
if (response.status === 'rejected') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.logger.error({ reason: response.reason, url: targetCallbacks[index].url, jobId: job.id, msg: `Failed to send callback to url` });
if (targetCallbacks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the condition to:

if (!targetCallbacks) {
    return;
}
// else

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the "else {" as it's not needed

const callbackPromises: Promise<void>[] = [];
for (const target of targetCallbacks) {
const params: ICallbackExportData = { ...callbackParams, roi: job.parameters.roi };
callbackPromises.push(this.callbackClient.send(target.url, params));
}
});

const promisesResponse = await Promise.allSettled(callbackPromises);
promisesResponse.forEach((response, index) => {
if (response.status === 'rejected') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.logger.error({ reason: response.reason, url: targetCallbacks[index].url, jobId: job.id, msg: `Failed to send callback to url` });
}
});
}
} catch (error) {
this.logger.error({ err: error, callbacksUrls: job.parameters.callbacks, jobId: job.id, msg: `Sending callback has failed` });
}
Expand Down Expand Up @@ -338,8 +340,9 @@ export class TasksManager {
expirationTime: expirationDate,
fileSize,
recordCatalogId: job.internalId as string,
requestJobId: job.id,
jobId: job.id,
errorReason,
description: job.description,
};
this.logger.info({
links: callbackParams.links,
Expand Down
Loading