Skip to content

Commit

Permalink
feat: Finalize-tasks-polling (MAPCO-2888) (#59)
Browse files Browse the repository at this point in the history
* feat: polling with finalize task - first commit

* chore : adding unit testing + polling logic update for failures

* chore: changes to polling

* chore: pr notes + refactor

* fix: lint

* fix: pr notes + fixes

* fix: pr notes

* chore: update helm values

* chore: update methods

---------

Co-authored-by: asafMasa <[email protected]>
  • Loading branch information
ronenkapelian and asafMasa authored Mar 22, 2023
1 parent 921ef84 commit 6e2a861
Show file tree
Hide file tree
Showing 28 changed files with 1,054 additions and 350 deletions.
84 changes: 51 additions & 33 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,6 @@
}
}
},
"httpRetry": {
"attempts": {
"__name": "HTTP_RETRY_ATTEMPTS",
"__format": "number"
},
"delay": "HTTP_RETRY_DELAY",
"shouldResetTimeout": {
"__name": "HTTP_RETRY_SHOULD_RESET_TIMEOUT",
"__format": "boolean"
}
},
"jobManager": {
"url": "JOB_MANAGER_URL",
"jobDomain": "JOB_DOMAIN",
"expirationDays": {
"__name": "JOB_MANAGER_EXPIRATION_DAYS",
"__format": "number"
}
},
"rasterCatalogManager": {
"url": "RASTER_CATALOG_MANAGER_URL"
},
"workerTypes": {
"tiles": {
"jobType": "WORKER_TYPES_TILES_JOB_TYPE",
"taskType": "WORKER_TYPES_TILES_TASK_TYPE"
}
},
"storageEstimation": {
"jpegTileEstimatedSizeInBytes": {
"__name": "JPEG_TILE_ESTIMATED_SIZE_IN_BYTES",
Expand All @@ -85,15 +57,61 @@
"__format": "boolean"
}
},
"disableHttpClientLogs": {
"__name": "DISABLE_HTTP_CLIENT_LOGS",
"__format": "boolean"
"cleanupExpirationDays": {
"__name": "CLEANUP_EXPIRATION_DAYS",
"__format": "number"
},
"tilesProvider": "TILES_PROVIDER",
"gpkgsLocation": "GPKGS_LOCATION",
"downloadServerUrl": "DOWNLOAD_SERVER_URL",
"pollingTimeoutMS": {
"__name": "POLLING_TIMEOUT_MS",
"finalizePollingTimeMS": {
"__name": "FINALIZE_POLLING_TIMEOUT_MS",
"__format": "number"
},
"externalClientsConfig": {
"clientsUrls": {
"jobManager": {
"url": "JOB_MANAGER_URL",
"jobDomain": "JOB_DOMAIN",
"dequeueFinalizeIntervalMs": {
"__name": "QUEUE_DEQUEUE_FINALIZE_INTERVAL_MS",
"__format": "number"
},
"finalizeTasksAttempts": {
"__name": "FINALIZE_TASKS_ATTEMPTS",
"__format": "number"
}
},
"rasterCatalogManager": {
"url": "RASTER_CATALOG_MANAGER_URL"
},
"heartbeatManager": {
"url": "QUEUE_HEART_BEAT_MANAGER_BASE_URL",
"heartbeatIntervalMs": {
"__name": "QUEUE_HEART_BEAT_INTERVAL_MS",
"__format": "number"
}
}
},
"exportJobAndTaskTypes": {
"jobType": "EXPORT_JOB_TYPE",
"taskTilesType": "TILES_TASK_TYPE",
"taskFinalizeType": "FINALIZE_TASK_TYPE"
},
"httpRetry": {
"attempts": {
"__name": "HTTP_RETRY_ATTEMPTS",
"__format": "number"
},
"delay": "HTTP_RETRY_DELAY",
"shouldResetTimeout": {
"__name": "HTTP_RETRY_SHOULD_RESET_TIMEOUT",
"__format": "boolean"
}
},
"disableHttpClientLogs": {
"__name": "DISABLE_HTTP_CLIENT_LOGS",
"__format": "boolean"
}
}
}
52 changes: 31 additions & 21 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,44 @@
}
}
},
"httpRetry": {
"attempts": 5,
"delay": "exponential",
"shouldResetTimeout": true
},
"jobManager": {
"url": "http://job-manager-job-manager",
"jobDomain": "RASTER",
"expirationDays": 30
},
"rasterCatalogManager": {
"url": "http://raster-catalog-manager"
},
"workerTypes": {
"tiles": {
"jobType": "rasterTilesExporter",
"taskType": "rasterTilesExporter"
}
},
"storageEstimation": {
"jpegTileEstimatedSizeInBytes": 12500,
"pngTileEstimatedSizeInBytes": 12500,
"storageFactorBuffer": 1.25,
"validateStorageSize": true
},
"disableHttpClientLogs": false,
"cleanupExpirationDays": 30,
"externalClientsConfig": {
"clientsUrls": {
"jobManager": {
"url": "http://job-manager-job-manager",
"jobDomain": "RASTER",
"cleanupExpirationDays": 30,
"dequeueFinalizeIntervalMs": 1000,
"finalizeTasksAttempts": 5
},
"rasterCatalogManager": {
"url": "http://catalog-manager-catalog-manager"
},
"heartbeatManager": {
"url": "http://heartbeat-manage-heartbeat-manager",
"heartbeatIntervalMs": 300
}
},
"exportJobAndTaskTypes": {
"jobType": "rasterTilesExporter",
"taskTilesType": "rasterTilesExporter",
"taskFinalizeType": "rasterFinalizeExporter"
},
"httpRetry": {
"attempts": 5,
"delay": "exponential",
"shouldResetTimeout": true
},
"disableHttpClientLogs": false
},
"tilesProvider": "S3",
"gpkgsLocation": "/app/tiles_outputs/gpkgs",
"downloadServerUrl": "http://download-service",
"pollingTimeoutMS": 2000
"finalizePollingTimeMS": 2000
}
14 changes: 10 additions & 4 deletions helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@ data:
TELEMETRY_METRICS_URL: {{ $metricsUrl }}
{{ end }}
JOB_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.jobManager | quote }}
JOB_MANAGER_EXPIRATION_DAYS: {{ .Values.env.jobManager.expirationDays | quote }}
CLEANUP_EXPIRATION_DAYS: {{ .Values.env.cleanupExpirationDays | quote }}
RASTER_CATALOG_MANAGER_URL: {{ .Values.rasterCommon.serviceUrls.catalogManager | quote }}
JOB_DOMAIN: {{ .Values.rasterCommon.jobManagement.jobDomain | quote }}
WORKER_TYPES_TILES_JOB_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.jobType | quote }}
WORKER_TYPES_TILES_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.taskType | quote }}
EXPORT_JOB_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.jobType | quote }}
TILES_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.exporter.taskTilesType | quote }}
HTTP_RETRY_ATTEMPTS: {{ .Values.env.httpRetry.attempts | quote }}
HTTP_RETRY_DELAY: {{ .Values.env.httpRetry.delay | quote }}
HTTP_RETRY_SHOULD_RESET_TIMEOUT: {{ .Values.env.httpRetry.shouldResetTimeout | quote }}
DISABLE_HTTP_CLIENT_LOGS: {{ .Values.env.disableHttpClientLogs | quote }}
TILES_PROVIDER: {{ .Values.rasterCommon.storage.tilesStorageProvider | quote }}
DOWNLOAD_SERVER_URL: {{ .Values.rasterCommon.serviceUrls.downloadServer | quote }}
POLLING_TIMEOUT_MS: {{ .Values.env.pollingTimeoutMS | quote }}
FINALIZE_POLLING_TIMEOUT_MS: {{ .Values.env.finalizePollingTimeMS | quote }}
JPEG_TILE_ESTIMATED_SIZE_IN_BYTES: {{ .Values.env.estimatedStorageCalculation.jpegTileEstimatedSizeInBytes | quote }}
PNG_TILE_ESTIMATED_SIZE_IN_BYTES: {{ .Values.env.estimatedStorageCalculation.pngTileEstimatedSizeInBytes | quote }}
STORAGE_FACTOR_BUFFER: {{ .Values.env.estimatedStorageCalculation.storageFactorBuffer | quote }}
VALIDATE_STORAGE_SIZE: {{ .Values.env.estimatedStorageCalculation.validateStorageSize | quote }}
QUEUE_JOB_MANAGER_BASE_URL: {{ .Values.rasterCommon.serviceUrls.jobManager | quote }}
QUEUE_HEART_BEAT_MANAGER_BASE_URL: {{ .Values.rasterCommon.serviceUrls.heartbeatManager | quote }}
QUEUE_DEQUEUE_FINALIZE_INTERVAL_MS: {{ .Values.env.queue.dequeueFinalizeIntervalMs | quote }}
QUEUE_HEART_BEAT_INTERVAL_MS: {{ .Values.env.queue.heartbeatIntervalMs | quote }}
FINALIZE_TASK_TYPE: {{ .Values.rasterCommon.jobManagement.taskFinalizeType | quote }}
FINALIZE_TASKS_ATTEMPTS: {{ .Values.env.queue.finalizeTasksAttempts | quote }}
{{- end }}
17 changes: 11 additions & 6 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ global:
rasterCommon:
serviceUrls:
catalogManager: "http://catalog-raster-dev-raster-catalog-manager"
heartbeatManager: "http://heartbeat-raster-dev-raster-heartbeat-manager"
jobManager: "http://job-manager-raster-dev-discrete-ingestion-db"
downloadServer: "http://files-server-raster-dev-files-server"
jobManagement:
jobDomain: RASTER
exporter:
jobType: tilesExport
taskType: tilesExport
taskTilesType: tilesExport
taskFinalizeType: tilesFinalize
authentication:
opa:
enabled: false
Expand Down Expand Up @@ -81,25 +83,28 @@ env:
logPrettyPrintEnabled: false
responseCompressionEnabled: true
requestPayloadLimit: 1mb
pollingTimeoutMS: 2000
finalizePollingTimeMS: 2000
disableHttpClientLogs: false
cleanupExpirationDays: 30
tracing:
enabled: false
url: http://localhost:55681/v1/trace
metrics:
enabled: false
url: http://localhost:55681/v1/metrics
jobManager:
expirationDays: 30
httpRetry:
attempts: 5
delay: exponential
shouldResetTimeout: true
estimatedStorageCalculation:
jpegTileEstimatedSizeInBytes: 12500
pngTileEstimatedSizeInBytes: 12500
jpegTileEstimatedSizeInBytes: 27000
pngTileEstimatedSizeInBytes: 263000
storageFactorBuffer: 1.25
validateStorageSize: true
queue:
dequeueFinalizeIntervalMs: 2000
heartbeatIntervalMs: 300
finalizeTasksAttempts: 3

resources:
enabled: true
Expand Down
8 changes: 7 additions & 1 deletion src/clients/callbackClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@ import { ICallbackData, ICallbackExportData, IConfig } from '../common/interface
@singleton()
export class CallbackClient extends HttpClient {
public constructor(@inject(SERVICES.LOGGER) logger: Logger, @inject(SERVICES.CONFIG) private readonly config: IConfig) {
super(logger, '', 'requestCallback', config.get<IHttpRetryConfig>('httpRetry'), config.get<boolean>('disableHttpClientLogs'));
super(
logger,
'',
'requestCallback',
config.get<IHttpRetryConfig>('externalClientsConfig.httpRetry'),
config.get<boolean>('externalClientsConfig.disableHttpClientLogs')
);
}

public async send(callbackUrl: string, data: ICallbackData | ICallbackExportData): Promise<void> {
Expand Down
22 changes: 13 additions & 9 deletions src/clients/jobManagerWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ export class JobManagerWrapper extends JobManagerClient {
public constructor(@inject(SERVICES.LOGGER) protected readonly logger: Logger) {
super(
logger,
config.get<string>('workerTypes.tiles.jobType'),
config.get<string>('jobManager.url'),
config.get<IHttpRetryConfig>('httpRetry'),
config.get<string>('externalClientsConfig.exportJobAndTaskTypes.jobType'),
config.get<string>('externalClientsConfig.clientsUrls.jobManager.url'),
config.get<IHttpRetryConfig>('externalClientsConfig.httpRetry'),
'jobManagerClient',
config.get<boolean>('disableHttpClientLogs')
config.get<boolean>('externalClientsConfig.disableHttpClientLogs')
);
this.expirationDays = config.get<number>('jobManager.expirationDays');
this.tilesJobType = config.get<string>('workerTypes.tiles.jobType');
this.tilesTaskType = config.get<string>('workerTypes.tiles.taskType');
this.jobDomain = config.get<string>('jobManager.jobDomain');
this.expirationDays = config.get<number>('cleanupExpirationDays');
this.tilesJobType = config.get<string>('externalClientsConfig.exportJobAndTaskTypes.jobType');
this.tilesTaskType = config.get<string>('externalClientsConfig.exportJobAndTaskTypes.taskTilesType');
this.jobDomain = config.get<string>('externalClientsConfig.clientsUrls.jobManager.jobDomain');
}

/**
Expand Down Expand Up @@ -253,11 +253,15 @@ export class JobManagerWrapper extends JobManagerClient {
});
}

public async deleteTaskById(jobId: string, taskId: string): Promise<void> {
const deleteTaskUrl = `/jobs/${jobId}/tasks/${taskId}`;
await this.delete(deleteTaskUrl);
}

public async validateAndUpdateExpiration(jobId: string): Promise<void> {
const getOrUpdateURL = `/jobs/${jobId}`;
const newExpirationDate = getUTCDate();
newExpirationDate.setDate(newExpirationDate.getDate() + this.expirationDays);

const job = await this.get<JobResponse | JobExportResponse | undefined>(getOrUpdateURL);
if (job) {
const oldExpirationDate = new Date(job.parameters.cleanupData?.cleanupExpirationTimeUTC as Date);
Expand Down
40 changes: 40 additions & 0 deletions src/clients/queueClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { TaskHandler as QueueHandler, JobManagerClient } from '@map-colonies/mc-priority-queue';
import { inject, singleton } from 'tsyringe';
import { Logger } from '@map-colonies/js-logger';
import { IHttpRetryConfig } from '@map-colonies/mc-utils';
import { IConfig, IQueueConfig } from '../common/interfaces';
import { SERVICES } from '../common/constants';

@singleton()
export class QueueClient {
public readonly queueHandlerForFinalizeTasks: QueueHandler;
public readonly jobsClient: JobManagerClient;

public constructor(
@inject(SERVICES.CONFIG) config: IConfig,
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.QUEUE_CONFIG) private readonly queueConfig: IQueueConfig
) {
this.queueHandlerForFinalizeTasks = new QueueHandler(
logger,
this.queueConfig.jobType,
this.queueConfig.jobManagerBaseUrl,
this.queueConfig.heartbeatManagerBaseUrl,
this.queueConfig.dequeueFinalizeIntervalMs,
this.queueConfig.heartbeatIntervalMs,
config.get<IHttpRetryConfig>('externalClientsConfig.httpRetry'),
undefined,
undefined,
config.get<boolean>('externalClientsConfig.disableHttpClientLogs'),
config.get<boolean>('externalClientsConfig.disableHttpClientLogs')
);
this.jobsClient = new JobManagerClient(
logger,
this.queueConfig.jobType,
this.queueConfig.jobManagerBaseUrl,
config.get<IHttpRetryConfig>('externalClientsConfig.httpRetry'),
'jobManagerClient',
config.get<boolean>('externalClientsConfig.disableHttpClientLogs')
);
}
}
6 changes: 3 additions & 3 deletions src/clients/rasterCatalogManagerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ export class RasterCatalogManagerClient extends HttpClient {
public constructor(@inject(SERVICES.LOGGER) protected readonly logger: Logger) {
super(
logger,
config.get<string>('rasterCatalogManager.url'),
config.get<string>('externalClientsConfig.clientsUrls.rasterCatalogManager.url'),
'RasterCatalogManager',
config.get<IHttpRetryConfig>('httpRetry'),
config.get<boolean>('disableHttpClientLogs')
config.get<IHttpRetryConfig>('externalClientsConfig.httpRetry'),
config.get<boolean>('externalClientsConfig.disableHttpClientLogs')
);
}

Expand Down
1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export const SERVICES: Record<string, symbol> = {
CONFIG: Symbol('Config'),
TRACER: Symbol('Tracer'),
METER: Symbol('Meter'),
QUEUE_CONFIG: Symbol('IQueueconfig'),
};
/* eslint-enable @typescript-eslint/naming-convention */

Expand Down
Loading

0 comments on commit 6e2a861

Please sign in to comment.