Skip to content

Commit

Permalink
Improved chunk generation from cloud storages (#6881)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhavoro authored Sep 22, 2023
1 parent adc8895 commit ee1d421
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- Do not reload annotation view when renew the job or update job state (<https://github.com/opencv/cvat/pull/6851>)
- Now images from cloud buckets are loaded in parallel when preparing a chunk (<https://github.com/opencv/cvat/pull/6881>)

### Deprecated
- TDB
Expand Down
132 changes: 75 additions & 57 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import zipfile
from datetime import datetime
from io import BytesIO
from tempfile import NamedTemporaryFile
import shutil
import tempfile

from typing import Optional, Tuple

import cv2
Expand Down Expand Up @@ -111,6 +113,75 @@ def _get_frame_provider_class():
FrameProvider # TODO: remove circular dependency
return FrameProvider

from contextlib import contextmanager

@staticmethod
@contextmanager
def _get_images(db_data, chunk_number):
images = []
tmp_dir = None
upload_dir = {
StorageChoice.LOCAL: db_data.get_upload_dirname(),
StorageChoice.SHARE: settings.SHARE_ROOT,
StorageChoice.CLOUD_STORAGE: db_data.get_upload_dirname(),
}[db_data.storage]

try:
if hasattr(db_data, 'video'):
source_path = os.path.join(upload_dir, db_data.video.path)

reader = VideoDatasetManifestReader(manifest_path=db_data.get_manifest_path(),
source_path=source_path, chunk_number=chunk_number,
chunk_size=db_data.chunk_size, start=db_data.start_frame,
stop=db_data.stop_frame, step=db_data.get_frame_step())
for frame in reader:
images.append((frame, source_path, None))
else:
reader = ImageDatasetManifestReader(manifest_path=db_data.get_manifest_path(),
chunk_number=chunk_number, chunk_size=db_data.chunk_size,
start=db_data.start_frame, stop=db_data.stop_frame,
step=db_data.get_frame_step())
if db_data.storage == StorageChoice.CLOUD_STORAGE:
db_cloud_storage = db_data.cloud_storage
assert db_cloud_storage, 'Cloud storage instance was deleted'
credentials = Credentials()
credentials.convert_from_db({
'type': db_cloud_storage.credentials_type,
'value': db_cloud_storage.credentials,
})
details = {
'resource': db_cloud_storage.resource,
'credentials': credentials,
'specific_attributes': db_cloud_storage.get_specific_attributes()
}
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)

tmp_dir = tempfile.mkdtemp(prefix='cvat')
files_to_download = []
checksums = []
for item in reader:
file_name = f"{item['name']}{item['extension']}"
fs_filename = os.path.join(tmp_dir, file_name)

files_to_download.append(file_name)
checksums.append(item.get('checksum', None))
images.append((fs_filename, fs_filename, None))

cloud_storage_instance.bulk_download_to_dir(files=files_to_download, upload_dir=tmp_dir)

for checksum, fs_filename in zip(checksums, images):
if checksum and not md5_hash(fs_filename) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name))
else:
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
images.append((source_path, source_path, None))

yield images
finally:
if db_data.storage == StorageChoice.CLOUD_STORAGE and tmp_dir is not None:
shutil.rmtree(tmp_dir)

def _prepare_task_chunk(self, db_data, quality, chunk_number):
FrameProvider = self._get_frame_provider_class()

Expand All @@ -127,64 +198,11 @@ def _prepare_task_chunk(self, db_data, quality, chunk_number):
kwargs["dimension"] = DimensionType.DIM_3D
writer = writer_classes[quality](image_quality, **kwargs)

images = []
buff = BytesIO()
upload_dir = {
StorageChoice.LOCAL: db_data.get_upload_dirname(),
StorageChoice.SHARE: settings.SHARE_ROOT,
StorageChoice.CLOUD_STORAGE: db_data.get_upload_dirname(),
}[db_data.storage]
if hasattr(db_data, 'video'):
source_path = os.path.join(upload_dir, db_data.video.path)

reader = VideoDatasetManifestReader(manifest_path=db_data.get_manifest_path(),
source_path=source_path, chunk_number=chunk_number,
chunk_size=db_data.chunk_size, start=db_data.start_frame,
stop=db_data.stop_frame, step=db_data.get_frame_step())
for frame in reader:
images.append((frame, source_path, None))
else:
reader = ImageDatasetManifestReader(manifest_path=db_data.get_manifest_path(),
chunk_number=chunk_number, chunk_size=db_data.chunk_size,
start=db_data.start_frame, stop=db_data.stop_frame,
step=db_data.get_frame_step())
if db_data.storage == StorageChoice.CLOUD_STORAGE:
db_cloud_storage = db_data.cloud_storage
assert db_cloud_storage, 'Cloud storage instance was deleted'
credentials = Credentials()
credentials.convert_from_db({
'type': db_cloud_storage.credentials_type,
'value': db_cloud_storage.credentials,
})
details = {
'resource': db_cloud_storage.resource,
'credentials': credentials,
'specific_attributes': db_cloud_storage.get_specific_attributes()
}
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
for item in reader:
file_name = f"{item['name']}{item['extension']}"
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(file_name)
temp_file.write(buf.getvalue())
temp_file.flush()
checksum = item.get('checksum', None)
if not checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name))
images.append((source_path, source_path, None))
else:
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
images.append((source_path, source_path, None))
writer.save_as_chunk(images, buff)
with self._get_images(db_data, chunk_number) as images:
writer.save_as_chunk(images, buff)
buff.seek(0)
if db_data.storage == StorageChoice.CLOUD_STORAGE:
images = [image[0] for image in images if os.path.exists(image[0])]
for image_path in images:
os.remove(image_path)

return buff, mime_type

def prepare_selective_job_chunk(self, db_job: Job, quality, chunk_number: int):
Expand Down

0 comments on commit ee1d421

Please sign in to comment.