Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix export of resources to cloud storage #7317

Merged
merged 12 commits into from
Jan 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- 504 Timeout error when exporting resources to cloud storage
(<https://github.com/opencv/cvat/pull/7317>)
- Enqueuing deferred jobs when their dependencies have been started -> cancelled -> restarted -> finished
(<https://github.com/opencv/cvat/pull/7317>)
2 changes: 1 addition & 1 deletion cvat-core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "cvat-core",
"version": "14.0.3",
"version": "14.0.4",
"type": "module",
"description": "Part of Computer Vision Tool which presents an interface for client-side integration",
"main": "src/api.ts",
Expand Down
15 changes: 9 additions & 6 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,11 @@ function exportDataset(instanceType: 'projects' | 'jobs' | 'tasks') {
.then((response) => {
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${baseURL}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down Expand Up @@ -927,10 +928,11 @@ async function backupTask(id: number, targetStorage: Storage, useDefaultSettings
});
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${url}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down Expand Up @@ -1032,10 +1034,11 @@ async function backupProject(
});
const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE;
const { status } = response;
if (status === 201) params.action = 'download';
if (status === 202 || (isCloudStorage && status === 201)) {

if (status === 202) {
setTimeout(request, 3000);
} else if (status === 201) {
params.action = 'download';
resolve(`${url}?${new URLSearchParams(params).toString()}`);
} else if (isCloudStorage && status === 200) {
resolve();
Expand Down
108 changes: 59 additions & 49 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from rest_framework.parsers import JSONParser
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.exceptions import ValidationError, PermissionDenied, NotFound
from rest_framework.exceptions import ValidationError

import cvat.apps.dataset_manager as dm
from cvat.apps.engine import models
Expand All @@ -38,12 +38,12 @@
from cvat.apps.engine.utils import (
av_scan_paths, process_failed_job, configure_dependent_job_to_download_from_cs,
get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after,
sendfile, define_dependent_job, get_rq_lock_by_user
sendfile, define_dependent_job, get_rq_lock_by_user, build_backup_file_name,
)
from cvat.apps.engine.models import (
StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location)
from cvat.apps.engine.task import JobFileMapping, _create_thread
from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance
from cvat.apps.engine.cloud_provider import download_file_from_bucket, export_resource_to_cloud_storage
from cvat.apps.engine.location import StorageType, get_location_configuration
from cvat.apps.engine.view_utils import get_cloud_storage_for_import_or_export
from cvat.apps.dataset_manager.views import TASK_CACHE_TTL, PROJECT_CACHE_TTL, get_export_cache_dir, clear_export_cache, log_exception
Expand Down Expand Up @@ -955,54 +955,49 @@ def export(db_instance, request, queue_name):
queue = django_rq.get_queue(queue_name)
rq_id = f"export:{obj_type}.id{db_instance.pk}-by-{request.user}"
rq_job = queue.fetch_job(rq_id)

last_instance_update_time = timezone.localtime(db_instance.updated_date)
timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S")
location = location_conf.get('location')

if rq_job:
last_project_update_time = timezone.localtime(db_instance.updated_date)
rq_request = rq_job.meta.get('request', None)
request_time = rq_request.get("timestamp", None) if rq_request else None
if request_time is None or request_time < last_project_update_time:
rq_job.cancel()
if request_time is None or request_time < last_instance_update_time:
# in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER
# we have to enqueue dependent jobs after canceling one
rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER)
rq_job.delete()
else:
if rq_job.is_finished:
file_path = rq_job.return_value()
if action == "download" and os.path.exists(file_path):
rq_job.delete()
if location == Location.LOCAL:
file_path = rq_job.return_value()

if not file_path:
return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR)

timestamp = datetime.strftime(last_project_update_time,
"%Y_%m_%d_%H_%M_%S")
filename = filename or "{}_{}_backup_{}{}".format(
obj_type, db_instance.name, timestamp,
os.path.splitext(file_path)[1]).lower()
elif not os.path.exists(file_path):
return Response('The result file does not exist in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR)

location = location_conf.get('location')
if location == Location.LOCAL:
filename = filename or build_backup_file_name(
class_name=obj_type,
identifier=db_instance.name,
timestamp=timestamp,
extension=os.path.splitext(file_path)[1]
)

if action == "download":
rq_job.delete()
return sendfile(request, file_path, attachment=True,
attachment_filename=filename)
elif location == Location.CLOUD_STORAGE:
try:
storage_id = location_conf['storage_id']
except KeyError:
raise serializers.ValidationError(
'Cloud storage location was selected as the destination,'
' but cloud storage id was not specified')

db_storage = get_cloud_storage_for_import_or_export(
storage_id=storage_id, request=request,
is_default=location_conf['is_default'])
storage = db_storage_to_storage_instance(db_storage)

try:
storage.upload_file(file_path, filename)
except (ValidationError, PermissionDenied, NotFound) as ex:
msg = str(ex) if not isinstance(ex, ValidationError) else \
'\n'.join([str(d) for d in ex.detail])
return Response(data=msg, status=ex.status_code)
return Response(status=status.HTTP_200_OK)
else:
raise NotImplementedError()

return Response(status=status.HTTP_201_CREATED)

elif location == Location.CLOUD_STORAGE:
rq_job.delete()
return Response(status=status.HTTP_200_OK)
else:
if os.path.exists(file_path):
return Response(status=status.HTTP_201_CREATED)
raise NotImplementedError()
elif rq_job.is_failed:
exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info))
rq_job.delete()
Expand All @@ -1014,10 +1009,31 @@ def export(db_instance, request, queue_name):
ttl = dm.views.PROJECT_CACHE_TTL.total_seconds()
user_id = request.user.id

func = _create_backup if location == Location.LOCAL else export_resource_to_cloud_storage
func_args = (db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl)

if location == Location.CLOUD_STORAGE:
try:
storage_id = location_conf['storage_id']
except KeyError:
raise serializers.ValidationError(
'Cloud storage location was selected as the destination,'
' but cloud storage id was not specified')

db_storage = get_cloud_storage_for_import_or_export(
storage_id=storage_id, request=request,
is_default=location_conf['is_default'])
filename_pattern = build_backup_file_name(
class_name=obj_type,
identifier=db_instance.name,
timestamp=timestamp,
)
func_args = (db_storage, filename, filename_pattern, _create_backup) + func_args

with get_rq_lock_by_user(queue, user_id):
queue.enqueue_call(
func=_create_backup,
args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl),
func=func,
args=func_args,
job_id=rq_id,
meta=get_rq_job_meta(request=request, db_obj=db_instance),
depends_on=define_dependent_job(queue, user_id, rq_id=rq_id),
Expand All @@ -1027,12 +1043,6 @@ def export(db_instance, request, queue_name):
return Response(status=status.HTTP_202_ACCEPTED)


def _download_file_from_bucket(db_storage, filename, key):
storage = db_storage_to_storage_instance(db_storage)

with storage.download_fileobj(key) as data, open(filename, 'wb+') as f:
f.write(data.getbuffer())

def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None):
rq_job = queue.fetch_job(rq_id)

Expand Down Expand Up @@ -1077,7 +1087,7 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati
dependent_job = configure_dependent_job_to_download_from_cs(
queue=queue,
rq_id=rq_id,
rq_func=_download_file_from_bucket,
rq_func=download_file_from_bucket,
db_storage=db_storage,
filename=filename,
key=key,
Expand Down
22 changes: 21 additions & 1 deletion cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from enum import Enum
from io import BytesIO
from multiprocessing.pool import ThreadPool
from typing import Dict, List, Optional, Any
from typing import Dict, List, Optional, Any, Callable

import boto3
from azure.core.exceptions import HttpResponseError, ResourceExistsError
Expand Down Expand Up @@ -962,3 +962,23 @@ def db_storage_to_storage_instance(db_storage):
'specific_attributes': db_storage.get_specific_attributes()
}
return get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details)

def download_file_from_bucket(db_storage: Any, filename: str, key: str) -> None:
storage = db_storage_to_storage_instance(db_storage)

with storage.download_fileobj(key) as data, open(filename, 'wb+') as f:
f.write(data.getbuffer())

def export_resource_to_cloud_storage(
db_storage: Any,
key: str,
key_pattern: str,
func: Callable[[int, Optional[str], Optional[str]], str],
*args,
**kwargs,
) -> str:
file_path = func(*args, **kwargs)
storage = db_storage_to_storage_instance(db_storage)
storage.upload_file(file_path, key if key else key_pattern.format(os.path.splitext(file_path)[1].lower()))

return file_path
13 changes: 11 additions & 2 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest import mock
from typing import Optional, Callable, Dict, Any

import django_rq
from attr.converters import to_bool
Expand Down Expand Up @@ -384,7 +385,15 @@ def upload_finished(self, request):
raise NotImplementedError('Must be implemented in the derived class')

class AnnotationMixin:
def export_annotations(self, request, db_obj, export_func, callback, get_data=None):
def export_annotations(
self,
request,
db_obj,
export_func,
callback: Callable[[int, Optional[str], Optional[str]], str],
*,
get_data: Optional[Callable[[int], Dict[str, Any]]]= None,
):
format_name = request.query_params.get("format", "")
action = request.query_params.get("action", "").lower()
filename = request.query_params.get("filename", "")
Expand All @@ -399,7 +408,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No
)

object_name = self._object.__class__.__name__.lower()
rq_id = f"export:annotations-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format"
rq_id = f"export:{request.path.strip('/').split('/')[-1]}-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format"

if format_name:
return export_func(db_instance=self._object,
Expand Down
27 changes: 27 additions & 0 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,30 @@ def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]:

def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]:
return list(map(preload_image, images))

def build_backup_file_name(
*,
class_name: str,
identifier: str | int,
timestamp: str,
extension: str = "{}",
) -> str:
# "<project|task>_<name>_backup_<timestamp>.zip"
return "{}_{}_backup_{}{}".format(
class_name, identifier, timestamp, extension,
).lower()

def build_annotations_file_name(
*,
class_name: str,
identifier: str | int,
timestamp: str,
format_name: str,
is_annotation_file: bool = True,
extension: str = "{}",
) -> str:
# "<project|task|job>_<name|id>_<annotations|dataset>_<timestamp>_<format>.zip"
return "{}_{}_{}_{}_{}{}".format(
class_name, identifier, 'annotations' if is_annotation_file else 'dataset',
timestamp, format_name, extension,
).lower()
Loading
Loading