From ee1d421536078be04f8b8a99e4042b7191ba6263 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Fri, 22 Sep 2023 16:33:32 +0300 Subject: [PATCH] Improved chunk generation from cloud storages (#6881) --- CHANGELOG.md | 1 + cvat/apps/engine/cache.py | 132 ++++++++++++++++++++++---------------- 2 files changed, 76 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 496fff231c47..363c32b78bbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Do not reload annotation view when renew the job or update job state () +- Now images from cloud buckets are loaded in parallel when preparing a chunk () ### Deprecated - TDB diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 5ead087ff983..1d4c7925df8d 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -8,7 +8,9 @@ import zipfile from datetime import datetime from io import BytesIO -from tempfile import NamedTemporaryFile +import shutil +import tempfile + from typing import Optional, Tuple import cv2 @@ -111,6 +113,75 @@ def _get_frame_provider_class(): FrameProvider # TODO: remove circular dependency return FrameProvider + from contextlib import contextmanager + + @staticmethod + @contextmanager + def _get_images(db_data, chunk_number): + images = [] + tmp_dir = None + upload_dir = { + StorageChoice.LOCAL: db_data.get_upload_dirname(), + StorageChoice.SHARE: settings.SHARE_ROOT, + StorageChoice.CLOUD_STORAGE: db_data.get_upload_dirname(), + }[db_data.storage] + + try: + if hasattr(db_data, 'video'): + source_path = os.path.join(upload_dir, db_data.video.path) + + reader = VideoDatasetManifestReader(manifest_path=db_data.get_manifest_path(), + source_path=source_path, chunk_number=chunk_number, + chunk_size=db_data.chunk_size, start=db_data.start_frame, + stop=db_data.stop_frame, step=db_data.get_frame_step()) + for frame in reader: + images.append((frame, source_path, None)) + else: + reader = ImageDatasetManifestReader(manifest_path=db_data.get_manifest_path(), + chunk_number=chunk_number, chunk_size=db_data.chunk_size, + start=db_data.start_frame, stop=db_data.stop_frame, + step=db_data.get_frame_step()) + if db_data.storage == StorageChoice.CLOUD_STORAGE: + db_cloud_storage = db_data.cloud_storage + assert db_cloud_storage, 'Cloud storage instance was deleted' + credentials = Credentials() + credentials.convert_from_db({ + 'type': db_cloud_storage.credentials_type, + 'value': db_cloud_storage.credentials, + }) + details = { + 'resource': db_cloud_storage.resource, + 'credentials': credentials, + 'specific_attributes': db_cloud_storage.get_specific_attributes() + } + cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details) + + tmp_dir = tempfile.mkdtemp(prefix='cvat') + files_to_download = [] + checksums = [] + for item in reader: + file_name = f"{item['name']}{item['extension']}" + fs_filename = os.path.join(tmp_dir, file_name) + + files_to_download.append(file_name) + checksums.append(item.get('checksum', None)) + images.append((fs_filename, fs_filename, None)) + + cloud_storage_instance.bulk_download_to_dir(files=files_to_download, upload_dir=tmp_dir) + + for checksum, fs_filename in zip(checksums, images): + if checksum and not md5_hash(fs_filename) == checksum: + slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name)) + else: + for item in reader: + source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}") + images.append((source_path, source_path, None)) + + yield images + finally: + if db_data.storage == StorageChoice.CLOUD_STORAGE and tmp_dir is not None: + shutil.rmtree(tmp_dir) + def _prepare_task_chunk(self, db_data, quality, chunk_number): FrameProvider = self._get_frame_provider_class() @@ -127,64 +198,11 @@ def _prepare_task_chunk(self, db_data, quality, chunk_number): kwargs["dimension"] = DimensionType.DIM_3D writer = writer_classes[quality](image_quality, **kwargs) - images = [] buff = BytesIO() - upload_dir = { - StorageChoice.LOCAL: db_data.get_upload_dirname(), - StorageChoice.SHARE: settings.SHARE_ROOT, - StorageChoice.CLOUD_STORAGE: db_data.get_upload_dirname(), - }[db_data.storage] - if hasattr(db_data, 'video'): - source_path = os.path.join(upload_dir, db_data.video.path) - - reader = VideoDatasetManifestReader(manifest_path=db_data.get_manifest_path(), - source_path=source_path, chunk_number=chunk_number, - chunk_size=db_data.chunk_size, start=db_data.start_frame, - stop=db_data.stop_frame, step=db_data.get_frame_step()) - for frame in reader: - images.append((frame, source_path, None)) - else: - reader = ImageDatasetManifestReader(manifest_path=db_data.get_manifest_path(), - chunk_number=chunk_number, chunk_size=db_data.chunk_size, - start=db_data.start_frame, stop=db_data.stop_frame, - step=db_data.get_frame_step()) - if db_data.storage == StorageChoice.CLOUD_STORAGE: - db_cloud_storage = db_data.cloud_storage - assert db_cloud_storage, 'Cloud storage instance was deleted' - credentials = Credentials() - credentials.convert_from_db({ - 'type': db_cloud_storage.credentials_type, - 'value': db_cloud_storage.credentials, - }) - details = { - 'resource': db_cloud_storage.resource, - 'credentials': credentials, - 'specific_attributes': db_cloud_storage.get_specific_attributes() - } - cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details) - for item in reader: - file_name = f"{item['name']}{item['extension']}" - with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file: - source_path = temp_file.name - buf = cloud_storage_instance.download_fileobj(file_name) - temp_file.write(buf.getvalue()) - temp_file.flush() - checksum = item.get('checksum', None) - if not checksum: - slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name'))) - if checksum and not md5_hash(source_path) == checksum: - slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name)) - images.append((source_path, source_path, None)) - else: - for item in reader: - source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}") - images.append((source_path, source_path, None)) - writer.save_as_chunk(images, buff) + with self._get_images(db_data, chunk_number) as images: + writer.save_as_chunk(images, buff) buff.seek(0) - if db_data.storage == StorageChoice.CLOUD_STORAGE: - images = [image[0] for image in images if os.path.exists(image[0])] - for image_path in images: - os.remove(image_path) + return buff, mime_type def prepare_selective_job_chunk(self, db_job: Job, quality, chunk_number: int):