Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify upload data for task #5498

Merged
merged 28 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f1adf1b
Add data pattern
Marishka17 Dec 14, 2022
6f0e50f
Update test
Marishka17 Dec 21, 2022
8e3bc4f
Small changes
Marishka17 Dec 21, 2022
a9f6e10
Merge branch 'develop' into mk/symplify_upload_data_for_task
Marishka17 Dec 21, 2022
3c74388
Update requirements
Marishka17 Dec 21, 2022
b8be000
Black && isort
Marishka17 Dec 21, 2022
7b2d235
Update test
Marishka17 Dec 21, 2022
2cb0cbf
Small fix
Marishka17 Dec 21, 2022
6a58dc3
Fix manifest root
Marishka17 Dec 21, 2022
5e75d97
Remove av requirement
Marishka17 Dec 22, 2022
445d914
Update cvat/apps/engine/serializers.py
Marishka17 Dec 22, 2022
34c6b8b
Merge branch 'mk/symplify_upload_data_for_task' of https://github.com…
Marishka17 Dec 22, 2022
1290edd
Update cvat/apps/engine/task.py
Marishka17 Dec 22, 2022
0ab87de
Apply comments
Marishka17 Dec 22, 2022
088db46
Merge branch 'mk/symplify_upload_data_for_task' of https://github.com…
Marishka17 Dec 22, 2022
782f30a
black
Marishka17 Dec 22, 2022
1fc948f
Apply comments && debug
Marishka17 Dec 22, 2022
01bbefd
Merge branch 'develop' into mk/symplify_upload_data_for_task
zhiltsov-max Dec 23, 2022
1a0c1b9
Remove validation method
Marishka17 Dec 23, 2022
38feb3e
Merge branch 'mk/symplify_upload_data_for_task' of https://github.com…
Marishka17 Dec 23, 2022
95ae618
Fix linters issues
Marishka17 Dec 23, 2022
d619b97
Small fix && debug
Marishka17 Dec 23, 2022
d9ebfd2
d
Marishka17 Dec 23, 2022
b40af24
d
Marishka17 Dec 23, 2022
7cce69b
revert debug
Marishka17 Dec 23, 2022
8898a8b
Merge branch 'develop' into mk/symplify_upload_data_for_task
zhiltsov-max Dec 26, 2022
d708332
Merge branch 'develop' into mk/symplify_upload_data_for_task
Marishka17 Dec 27, 2022
9210794
Update changelog
Marishka17 Dec 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,13 @@ class DataSerializer(WriteOnceMixin, serializers.ModelSerializer):
use_cache = serializers.BooleanField(default=False)
copy_data = serializers.BooleanField(default=False)
cloud_storage_id = serializers.IntegerField(write_only=True, allow_null=True, required=False)
filename_pattern = serializers.CharField(allow_null=True, required=False)

class Meta:
model = models.Data
fields = ('chunk_size', 'size', 'image_quality', 'start_frame', 'stop_frame', 'frame_filter',
'compressed_chunk_type', 'original_chunk_type', 'client_files', 'server_files', 'remote_files', 'use_zip_chunks',
'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method')
'cloud_storage_id', 'use_cache', 'copy_data', 'storage_method', 'storage', 'sorting_method', 'filename_pattern')

# pylint: disable=no-self-use
def validate_frame_filter(self, value):
Expand All @@ -396,6 +397,7 @@ def validate(self, attrs):
if 'start_frame' in attrs and 'stop_frame' in attrs \
and attrs['start_frame'] > attrs['stop_frame']:
raise serializers.ValidationError('Stop frame must be more or equal start frame')

zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
return attrs

def create(self, validated_data):
Expand Down
130 changes: 85 additions & 45 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# SPDX-License-Identifier: MIT

import itertools
import fnmatch
import os
import sys
from rest_framework.serializers import ValidationError
Expand Down Expand Up @@ -127,7 +128,7 @@ def _save_task_to_db(db_task, extractor):
db_task.data.save()
db_task.save()

def _count_files(data, manifest_files=None):
def _count_files(data):
share_root = settings.SHARE_ROOT
server_files = []

Expand Down Expand Up @@ -158,7 +159,7 @@ def count_files(file_mapping, counter):
if mime in counter:
counter[mime].append(rel_path)
elif rel_path.endswith('.jsonl'):
manifest_files.append(rel_path)
continue
else:
slogger.glob.warn("Skip '{}' file (its mime type doesn't "
"correspond to supported MIME file type)".format(full_path))
Expand All @@ -177,6 +178,12 @@ def count_files(file_mapping, counter):

return counter

def _find_manifest_files(data):
manifest_files = []
for files in ['client_files', 'server_files', 'remote_files']:
manifest_files.extend(list(filter(lambda x: x.endswith('.jsonl'), data[files])))
return manifest_files

def _validate_data(counter, manifest_files=None):
unique_entries = 0
multiple_entries = 0
Expand Down Expand Up @@ -207,10 +214,10 @@ def _validate_data(counter, manifest_files=None):

return counter, task_modes[0]

def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage):
def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage, data_storage_method):
if manifests:
if len(manifests) != 1:
raise Exception('Only one manifest file can be attached with data')
raise ValidationError('Only one manifest file can be attached to data')
manifest_file = manifests[0]
full_manifest_path = os.path.join(root_dir, manifests[0])
if is_in_cloud:
Expand All @@ -221,8 +228,10 @@ def _validate_manifest(manifests, root_dir, is_in_cloud, db_cloud_storage):
< cloud_storage_instance.get_file_last_modified(manifest_file):
cloud_storage_instance.download_file(manifest_file, full_manifest_path)
if is_manifest(full_manifest_path):
if not (settings.USE_CACHE or data_storage_method != models.StorageMethodChoice.CACHE):
raise ValidationError("Manifest file can be uploaded only if 'Use cache' option is also selected")
return manifest_file
raise Exception('Invalid manifest was uploaded')
raise ValidationError('Invalid manifest was uploaded')
return None

def _validate_url(url):
Expand Down Expand Up @@ -291,6 +300,26 @@ def _download_data(urls, upload_dir):
def _get_manifest_frame_indexer(start_frame=0, frame_step=1):
return lambda frame_id: start_frame + frame_id * frame_step

def _create_task_manifest_based_on_cloud_storage_manifest(
sorted_media,
cloud_storage_manifest_prefix,
cloud_storage_manifest,
manifest
):
if cloud_storage_manifest_prefix:
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media
]
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix)
def _add_prefix(properties):
file_name = properties['name']
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name)
return properties
content = list(map(_add_prefix, raw_content))
else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media)
sorted_content = (i[1] for i in sorted(zip(sequence, content)))
manifest.create(sorted_content)

@transaction.atomic
def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
Expand All @@ -300,69 +329,80 @@ def _create_thread(db_task, data, isBackupRestore=False, isDatasetImport=False):
slogger.glob.info("create task #{}".format(db_task.id))

db_data = db_task.data
upload_dir = db_data.get_upload_dirname()
upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT
is_data_in_cloud = db_data.storage == models.StorageChoice.CLOUD_STORAGE

if data['remote_files'] and not isDatasetImport:
data['remote_files'] = _download_data(data['remote_files'], upload_dir)

manifest_files = []
media = _count_files(data, manifest_files)
media, task_mode = _validate_data(media, manifest_files)

if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif db_data.storage == models.StorageChoice.SHARE:
upload_dir = settings.SHARE_ROOT

# find and validate manifest file
manifest_files = _find_manifest_files(data)
manifest_root = None
if db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:

# we should also handle this case because files from the share source have not been downloaded yet
if data['copy_data']:
manifest_root = settings.SHARE_ROOT
elif db_data.storage in {models.StorageChoice.LOCAL, models.StorageChoice.SHARE}:
manifest_root = upload_dir
elif is_data_in_cloud:
manifest_root = db_data.cloud_storage.get_storage_dirname()

manifest_file = _validate_manifest(
manifest_files, manifest_root,
is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None
is_data_in_cloud, db_data.cloud_storage if is_data_in_cloud else None,
db_data.storage_method,
)
if manifest_file and (not settings.USE_CACHE or db_data.storage_method != models.StorageMethodChoice.CACHE):
raise Exception("File with meta information can be uploaded if 'Use cache' option is also selected")

if data['server_files'] and is_data_in_cloud:
if is_data_in_cloud:
cloud_storage_instance = db_storage_to_storage_instance(db_data.cloud_storage)
sorted_media = sort(media['image'], data['sorting_method'])

data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for start_frame in range(0, data_size, segment_step):
first_sorted_media_image = sorted_media[start_frame]
cloud_storage_instance.download_file(first_sorted_media_image, os.path.join(upload_dir, first_sorted_media_image))

# prepare task manifest file from cloud storage manifest file
# NOTE we should create manifest before defining chunk_size
# FIXME in the future when will be implemented archive support
manifest = ImageManifestManager(db_data.get_manifest_path())
cloud_storage_manifest = ImageManifestManager(
os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file),
db_data.cloud_storage.get_storage_dirname()
)
cloud_storage_manifest_prefix = os.path.dirname(manifest_file)
cloud_storage_manifest.set_index()
if cloud_storage_manifest_prefix:
sorted_media_without_manifest_prefix = [
os.path.relpath(i, cloud_storage_manifest_prefix) for i in sorted_media
]
sequence, raw_content = cloud_storage_manifest.get_subset(sorted_media_without_manifest_prefix)
def _add_prefix(properties):
file_name = properties['name']
properties['name'] = os.path.join(cloud_storage_manifest_prefix, file_name)
return properties
content = list(map(_add_prefix, raw_content))
cloud_storage_manifest_prefix = os.path.dirname(manifest_file)

# update list with server files if task creation approach with pattern and manifest file is used
if is_data_in_cloud and data['filename_pattern']:
if 1 != len(data['server_files']):
l = len(data['server_files']) - 1
raise ValidationError(
'Using a filename_pattern is only supported with a manifest file, '
f'but others {l} file{"s" if l > 1 else ""} {"were" if l > 1 else "was"} found'
'Please remove extra files and keep only manifest file in server_files field.'
)

cloud_storage_manifest_data = list(cloud_storage_manifest.data) if not cloud_storage_manifest_prefix \
else [os.path.join(cloud_storage_manifest_prefix, f) for f in cloud_storage_manifest.data]
if data['filename_pattern'] == '*':
server_files = cloud_storage_manifest_data
else:
sequence, content = cloud_storage_manifest.get_subset(sorted_media)
sorted_content = (i[1] for i in sorted(zip(sequence, content)))
manifest.create(sorted_content)
server_files = fnmatch.filter(cloud_storage_manifest_data, data['filename_pattern'])
data['server_files'].extend(server_files)

# count and validate uploaded files
media = _count_files(data)
media, task_mode = _validate_data(media, manifest_files)

if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL:
_copy_data_from_source(data['server_files'], upload_dir, data.get('server_files_path'))
elif is_data_in_cloud:
sorted_media = sort(media['image'], data['sorting_method'])

# download previews from cloud storage
data_size = len(sorted_media)
segment_step, *_ = _get_task_segment_data(db_task, data_size)
for preview_frame in range(0, data_size, segment_step):
preview = sorted_media[preview_frame]
cloud_storage_instance.download_file(preview, os.path.join(upload_dir, preview))

# Define task manifest content based on cloud storage manifest content and uploaded files
_create_task_manifest_based_on_cloud_storage_manifest(
sorted_media, cloud_storage_manifest_prefix,
cloud_storage_manifest, manifest)

av_scan_paths(upload_dir)

Expand Down
119 changes: 119 additions & 0 deletions tests/python/rest_api/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
# SPDX-License-Identifier: MIT

import json
import os.path as osp
import subprocess
from copy import deepcopy
from functools import partial
from http import HTTPStatus
from tempfile import TemporaryDirectory
from time import sleep

import pytest
from cvat_sdk.api_client import apis, models
from cvat_sdk.core.helpers import get_paginated_collection
from deepdiff import DeepDiff

import shared.utils.s3 as s3
from shared.utils.config import get_method, make_api_client, patch_method
from shared.utils.helpers import generate_image_files

Expand Down Expand Up @@ -675,6 +680,120 @@ def test_create_task_with_cloud_storage_files(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)

@pytest.mark.with_external_services
@pytest.mark.parametrize("cloud_storage_id", [1])
@pytest.mark.parametrize(
"manifest, filename_pattern, sub_dir, task_size",
[
("manifest.jsonl", "*", True, 3), # public bucket
("manifest.jsonl", "test/*", True, 3),
("manifest.jsonl", "test/sub*1.jpeg", True, 1),
("manifest.jsonl", "*image*.jpeg", True, 3),
("manifest.jsonl", "wrong_pattern", True, 0),
("abc_manifest.jsonl", "[a-c]*.jpeg", False, 2),
("abc_manifest.jsonl", "[d]*.jpeg", False, 1),
("abc_manifest.jsonl", "[e-z]*.jpeg", False, 0),
],
)
@pytest.mark.parametrize("org", [""])
def test_create_task_with_file_pattern(
self,
cloud_storage_id,
manifest,
filename_pattern,
sub_dir,
task_size,
org,
cloud_storages,
request,
):
# prepare dataset on the bucket
prefixes = ("test_image_",) * 3 if sub_dir else ("a_", "b_", "d_")
images = generate_image_files(3, prefixes=prefixes)
s3_client = s3.make_client()

cloud_storage = cloud_storages[cloud_storage_id]

for image in images:
s3_client.create_file(
data=image,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"{'test/sub/' if sub_dir else ''}{image.name}",
)
)

with TemporaryDirectory() as tmp_dir:
for image in images:
with open(osp.join(tmp_dir, image.name), "wb") as f:
f.write(image.getvalue())

command = [
"docker",
"run",
"--rm",
"-u",
"root:root",
"-v",
f"{tmp_dir}:/local",
"--entrypoint",
"python3",
"cvat/server",
"utils/dataset_manifest/create.py",
"--output-dir",
"/local",
"/local",
]
subprocess.run(command, check=True)
with open(osp.join(tmp_dir, "manifest.jsonl"), mode="rb") as m_file:
s3_client.create_file(
data=m_file.read(),
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
request.addfinalizer(
partial(
s3_client.remove_file,
bucket=cloud_storage["resource"],
filename=f"test/sub/{manifest}" if sub_dir else manifest,
)
)

task_spec = {
"name": f"Task with files from cloud storage {cloud_storage_id}",
"labels": [
{
"name": "car",
}
],
}

data_spec = {
"image_quality": 75,
"use_cache": True,
"cloud_storage_id": cloud_storage_id,
"server_files": [f"test/sub/{manifest}" if sub_dir else manifest],
"filename_pattern": filename_pattern,
}

if task_size:
task_id = self._test_create_task(
self._USERNAME, task_spec, data_spec, content_type="application/json", org=org
)

with make_api_client(self._USERNAME) as api_client:
(task, response) = api_client.tasks_api.retrieve(task_id, org=org)
assert response.status == HTTPStatus.OK
assert task.size == task_size
else:
status = self._test_cannot_create_task(self._USERNAME, task_spec, data_spec)
assert "No media data found" in status.message

@pytest.mark.with_external_services
@pytest.mark.parametrize(
"cloud_storage_id, manifest, org",
Expand Down
5 changes: 3 additions & 2 deletions tests/python/shared/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ def generate_image_file(filename="image.png", size=(50, 50), color=(0, 0, 0)):
return f


def generate_image_files(count) -> List[BytesIO]:
def generate_image_files(count, prefixes=None) -> List[BytesIO]:
images = []
for i in range(count):
image = generate_image_file(f"{i}.jpeg", color=(i, i, i))
prefix = prefixes[i] if prefixes else ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works only because len(prefixes) == count in the code. Probably it is a bug. Let's fix in your next PR.

image = generate_image_file(f"{prefix}{i}.jpeg", color=(i, i, i))
images.append(image)

return images