From fa15f788020daaf71cf8a4c7cfe33de01bb409ea Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Tue, 31 Aug 2021 14:21:58 +0300 Subject: [PATCH] Extension of cloud storage server part (#3386) * Add preview && some fixes * Fix case with sub dirs on cloud storage * Move server part from ui_support_cloud_storage && fix missing id field * Add support_key_secret_key_pair * Fix several moments * Add index resetting * Fix pylint errors * Remove excess migration * tmp * Some fixes * Fixes * fix * [server] Add cloud storage status && fixes * Remove unused import * Add manifest set_index method * Implement status support for Azure blob container * Move specific attributes parsing into utils * Fix missing in migration * Fix error display * some fix * Update migration dependency * Update google cloud storage status * Update migrtaions * Update CHANGELOG --- CHANGELOG.md | 6 +- cvat/apps/engine/cache.py | 51 ++-- cvat/apps/engine/cloud_provider.py | 222 +++++++++++++---- .../migrations/0042_auto_20210830_1056.py | 27 +++ cvat/apps/engine/models.py | 21 +- cvat/apps/engine/serializers.py | 129 ++++++++-- cvat/apps/engine/task.py | 14 +- cvat/apps/engine/utils.py | 9 +- cvat/apps/engine/views.py | 224 +++++++++++++----- utils/dataset_manifest/core.py | 34 ++- 10 files changed, 573 insertions(+), 164 deletions(-) create mode 100644 cvat/apps/engine/migrations/0042_auto_20210830_1056.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ffa5a216297..f5e168ec4f2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,11 +15,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 and project with 3D tasks () - Additional inline tips in interactors with demo gifs () - Added intelligent scissors blocking feature () +- Support cloud storage status () +- Support cloud storage preview () ### Changed - Non-blocking UI when using interactors () - "Selected opacity" slider now defines opacity level for shapes being drawnSelected opacity () +- Cloud storage creating and updating () +- Way of working with cloud storage content () ### Deprecated @@ -27,7 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed -- TDB +- Support TEMP_KEY_SECRET_KEY_TOKEN_SET for AWS S3 cloud storage () ### Fixed diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 97dd420acfb0..71380f281a7f 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -15,7 +15,7 @@ ImageDatasetManifestReader, VideoDatasetManifestReader) from cvat.apps.engine.models import DataChoice, StorageChoice from cvat.apps.engine.models import DimensionType -from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials +from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.utils import md5_hash class CacheInteraction: def __init__(self, dimension=DimensionType.DIM_2D): @@ -71,6 +71,7 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number): step=db_data.get_frame_step()) if db_data.storage == StorageChoice.CLOUD_STORAGE: db_cloud_storage = db_data.cloud_storage + assert db_cloud_storage, 'Cloud storage instance was deleted' credentials = Credentials() credentials.convert_from_db({ 'type': db_cloud_storage.credentials_type, @@ -81,22 +82,38 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number): 'credentials': credentials, 'specific_attributes': db_cloud_storage.get_specific_attributes() } - cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details) - cloud_storage_instance.initialize_content() - for item in reader: - name = f"{item['name']}{item['extension']}" - if name not in cloud_storage_instance: - raise Exception('{} file was not found on a {} storage'.format(name, cloud_storage_instance.name)) - with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=name.replace(os.path.sep, '#'), delete=False) as temp_file: - source_path = temp_file.name - buf = cloud_storage_instance.download_fileobj(name) - temp_file.write(buf.getvalue()) - checksum = item.get('checksum', None) - if not checksum: - slogger.glob.warning('A manifest file does not contain checksum for image {}'.format(item.get('name'))) - if checksum and not md5_hash(source_path) == checksum: - slogger.glob.warning('Hash sums of files {} do not match'.format(name)) - images.append((source_path, source_path, None)) + try: + cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details) + cloud_storage_instance.initialize_content() + for item in reader: + file_name = f"{item['name']}{item['extension']}" + if file_name not in cloud_storage_instance: + raise Exception('{} file was not found on a {} storage'.format(file_name, cloud_storage_instance.name)) + with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file: + source_path = temp_file.name + buf = cloud_storage_instance.download_fileobj(file_name) + temp_file.write(buf.getvalue()) + checksum = item.get('checksum', None) + if not checksum: + slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name'))) + if checksum and not md5_hash(source_path) == checksum: + slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name)) + images.append((source_path, source_path, None)) + except Exception as ex: + storage_status = cloud_storage_instance.get_status() + if storage_status == Status.FORBIDDEN: + msg = 'The resource {} is no longer available. Access forbidden.'.format(cloud_storage_instance.name) + elif storage_status == Status.NOT_FOUND: + msg = 'The resource {} not found. It may have been deleted.'.format(cloud_storage_instance.name) + else: + # check status of last file + file_status = cloud_storage_instance.get_file_status(file_name) + if file_status == Status.NOT_FOUND: + raise Exception("'{}' not found on the cloud storage '{}'".format(file_name, cloud_storage_instance.name)) + elif file_status == Status.FORBIDDEN: + raise Exception("Access to the file '{}' on the '{}' cloud storage is denied".format(file_name, cloud_storage_instance.name)) + msg = str(ex) + raise Exception(msg) else: for item in reader: source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}") diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index 31619bd13fee..869b2ad1385f 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -1,23 +1,40 @@ -#from dataclasses import dataclass +# Copyright (C) 2021 Intel Corporation +# +# SPDX-License-Identifier: MIT + +import os +import boto3 + from abc import ABC, abstractmethod, abstractproperty +from enum import Enum from io import BytesIO -import os -import os.path -import boto3 from boto3.s3.transfer import TransferConfig -from botocore.exceptions import WaiterError +from botocore.exceptions import ClientError from botocore.handlers import disable_signing from azure.storage.blob import BlobServiceClient -from azure.core.exceptions import ResourceExistsError +from azure.core.exceptions import ResourceExistsError, HttpResponseError from azure.storage.blob import PublicAccess from google.cloud import storage +from google.cloud.exceptions import NotFound as GoogleCloudNotFound, Forbidden as GoogleCloudForbidden from cvat.apps.engine.log import slogger from cvat.apps.engine.models import CredentialsTypeChoice, CloudProviderChoice +class Status(str, Enum): + AVAILABLE = 'AVAILABLE' + NOT_FOUND = 'NOT_FOUND' + FORBIDDEN = 'FORBIDDEN' + + @classmethod + def choices(cls): + return tuple((x.value, x.name) for x in cls) + + def __str__(self): + return self.value + class _CloudStorage(ABC): def __init__(self): @@ -32,7 +49,23 @@ def create(self): pass @abstractmethod - def exists(self): + def _head_file(self, key): + pass + + @abstractmethod + def _head(self): + pass + + @abstractmethod + def get_status(self): + pass + + @abstractmethod + def get_file_status(self, key): + pass + + @abstractmethod + def get_file_last_modified(self, key): pass @abstractmethod @@ -95,10 +128,6 @@ def get_cloud_storage_instance(cloud_provider, resource, credentials, specific_a return instance class AWS_S3(_CloudStorage): - waiter_config = { - 'Delay': 5, # The amount of time in seconds to wait between attempts. Default: 5 - 'MaxAttempts': 3, # The maximum number of attempts to be made. Default: 20 - } transfer_config = { 'max_io_queue': 10, } @@ -117,6 +146,13 @@ def __init__(self, aws_session_token=session_token, region_name=region ) + elif access_key_id and secret_key: + self._s3 = boto3.resource( + 's3', + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_key, + region_name=region + ) elif any([access_key_id, secret_key, session_token]): raise Exception('Insufficient data for authorization') # anonymous access @@ -135,26 +171,38 @@ def bucket(self): def name(self): return self._bucket.name - def exists(self): - waiter = self._client_s3.get_waiter('bucket_exists') - try: - waiter.wait( - Bucket=self.name, - WaiterConfig=self.waiter_config - ) - except WaiterError: - raise Exception('A resource {} unavailable'.format(self.name)) + def _head(self): + return self._client_s3.head_bucket(Bucket=self.name) - def is_object_exist(self, key_object): - waiter = self._client_s3.get_waiter('object_exists') + def _head_file(self, key): + return self._client_s3.head_object(Bucket=self.name, Key=key) + + def get_status(self): + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object + # return only 3 codes: 200, 403, 404 try: - waiter.wait( - Bucket=self._bucket, - Key=key_object, - WaiterConfig=self.waiter_config - ) - except WaiterError: - raise Exception('A file {} unavailable'.format(key_object)) + self._head() + return Status.AVAILABLE + except ClientError as ex: + code = ex.response['Error']['Code'] + if code == '403': + return Status.FORBIDDEN + else: + return Status.NOT_FOUND + + def get_file_status(self, key): + try: + self._head_file(key) + return Status.AVAILABLE + except ClientError as ex: + code = ex.response['Error']['Code'] + if code == '403': + return Status.FORBIDDEN + else: + return Status.NOT_FOUND + + def get_file_last_modified(self, key): + return self._head_file(key).get('LastModified') def upload_file(self, file_obj, file_name): self._bucket.upload_fileobj( @@ -234,12 +282,35 @@ def create(self): slogger.glob.info(msg) raise Exception(msg) - def exists(self): - return self._container_client.exists(timeout=5) + def _head(self): + return self._container_client.get_container_properties() + + def _head_file(self, key): + blob_client = self.container.get_blob_client(key) + return blob_client.get_blob_properties() - def is_object_exist(self, file_name): - blob_client = self._container_client.get_blob_client(file_name) - return blob_client.exists() + def get_file_last_modified(self, key): + return self._head_file(key).last_modified + + def get_status(self): + try: + self._head() + return Status.AVAILABLE + except HttpResponseError as ex: + if ex.status_code == 403: + return Status.FORBIDDEN + else: + return Status.NOT_FOUND + + def get_file_status(self, key): + try: + self._head_file(key) + return Status.AVAILABLE + except HttpResponseError as ex: + if ex.status_code == 403: + return Status.FORBIDDEN + else: + return Status.NOT_FOUND def upload_file(self, file_obj, file_name): self._container_client.upload_blob(name=file_name, data=file_obj) @@ -269,6 +340,20 @@ def download_fileobj(self, key): class GOOGLE_DRIVE(_CloudStorage): pass +def _define_gcs_status(func): + def wrapper(self, key=None): + try: + if not key: + func(self) + else: + func(self, key) + return Status.AVAILABLE + except GoogleCloudNotFound: + return Status.NOT_FOUND + except GoogleCloudForbidden: + return Status.FORBIDDEN + return wrapper + class GoogleCloudStorage(_CloudStorage): def __init__(self, bucket_name, prefix=None, service_account_json=None, project=None, location=None): @@ -294,8 +379,20 @@ def bucket(self): def name(self): return self._bucket.name - def exists(self): - return self._storage_client.lookup_bucket(self.name) is not None + def _head(self): + return self._storage_client.get_bucket(bucket_or_name=self.name) + + def _head_file(self, key): + blob = self.bucket.blob(key) + return self._storage_client._get_resource(blob.path) + + @_define_gcs_status + def get_status(self): + self._head() + + @_define_gcs_status + def get_file_status(self, key): + self._head_file(key) def initialize_content(self): self._files = [ @@ -314,9 +411,6 @@ def download_fileobj(self, key): buf.seek(0) return buf - def is_object_exist(self, key): - return self.bucket.blob(key).exists() - def upload_file(self, file_obj, file_name): self.bucket.blob(file_name).upload_from_file(file_obj) @@ -342,7 +436,6 @@ def get_file_last_modified(self, key): blob.reload() return blob.updated - class Credentials: __slots__ = ('key', 'secret_key', 'session_token', 'account_name', 'key_file_path', 'credentials_type') @@ -356,33 +449,58 @@ def __init__(self, **credentials): def convert_to_db(self): converted_credentials = { - CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET : \ - " ".join([self.key, self.secret_key, self.session_token]), + CredentialsTypeChoice.KEY_SECRET_KEY_PAIR : \ + " ".join([self.key, self.secret_key]), CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR : " ".join([self.account_name, self.session_token]), CredentialsTypeChoice.KEY_FILE_PATH: self.key_file_path, - CredentialsTypeChoice.ANONYMOUS_ACCESS: "", + CredentialsTypeChoice.ANONYMOUS_ACCESS: "" if not self.account_name else self.account_name, } return converted_credentials[self.credentials_type] def convert_from_db(self, credentials): self.credentials_type = credentials.get('type') - if self.credentials_type == CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET: - self.key, self.secret_key, self.session_token = credentials.get('value').split() + if self.credentials_type == CredentialsTypeChoice.KEY_SECRET_KEY_PAIR: + self.key, self.secret_key = credentials.get('value').split() elif self.credentials_type == CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR: self.account_name, self.session_token = credentials.get('value').split() + elif self.credentials_type == CredentialsTypeChoice.ANONYMOUS_ACCESS: + self.session_token, self.key, self.secret_key = ('', '', '') + # account_name will be in [some_value, ''] + self.account_name = credentials.get('value') elif self.credentials_type == CredentialsTypeChoice.KEY_FILE_PATH: self.key_file_path = credentials.get('value') else: - self.account_name, self.session_token, self.key, self.secret_key = ('', '', '', '') - self.credentials_type = None + raise NotImplementedError('Found {} not supported credentials type'.format(self.credentials_type)) def mapping_with_new_values(self, credentials): self.credentials_type = credentials.get('credentials_type', self.credentials_type) - self.key = credentials.get('key', self.key) - self.secret_key = credentials.get('secret_key', self.secret_key) - self.session_token = credentials.get('session_token', self.session_token) - self.account_name = credentials.get('account_name', self.account_name) - self.key_file_path = credentials.get('key_file_path', self.key_file_path) + if self.credentials_type == CredentialsTypeChoice.ANONYMOUS_ACCESS: + self.key = '' + self.secret_key = '' + self.session_token = '' + self.key_file_path = '' + self.account_name = credentials.get('account_name', self.account_name) + elif self.credentials_type == CredentialsTypeChoice.KEY_SECRET_KEY_PAIR: + self.key = credentials.get('key', self.key) + self.secret_key = credentials.get('secret_key', self.secret_key) + self.session_token = '' + self.account_name = '' + self.key_file_path = '' + elif self.credentials_type == CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR: + self.session_token = credentials.get('session_token', self.session_token) + self.account_name = credentials.get('account_name', self.account_name) + self.key = '' + self.secret_key = '' + self.key_file_path = '' + elif self.credentials_type == CredentialsTypeChoice.KEY_FILE_PATH: + self.key = '' + self.secret_key = '' + self.session_token = '' + self.account_name = '' + self.key_file_path = credentials.get('key_file_path', self.key_file_path) + else: + raise NotImplementedError('Mapping credentials: unsupported credentials type') + def values(self): return [self.key, self.secret_key, self.session_token, self.account_name, self.key_file_path] diff --git a/cvat/apps/engine/migrations/0042_auto_20210830_1056.py b/cvat/apps/engine/migrations/0042_auto_20210830_1056.py new file mode 100644 index 000000000000..7b5a496af97c --- /dev/null +++ b/cvat/apps/engine/migrations/0042_auto_20210830_1056.py @@ -0,0 +1,27 @@ +# Generated by Django 3.1.13 on 2021-08-30 10:56 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('engine', '0041_auto_20210827_0258'), + ] + + operations = [ + migrations.AlterField( + model_name='cloudstorage', + name='credentials_type', + field=models.CharField(choices=[('KEY_SECRET_KEY_PAIR', 'KEY_SECRET_KEY_PAIR'), ('ACCOUNT_NAME_TOKEN_PAIR', 'ACCOUNT_NAME_TOKEN_PAIR'), ('KEY_FILE_PATH', 'KEY_FILE_PATH'), ('ANONYMOUS_ACCESS', 'ANONYMOUS_ACCESS')], max_length=29), + ), + migrations.CreateModel( + name='Manifest', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('filename', models.CharField(default='manifest.jsonl', max_length=1024)), + ('cloud_storage', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='manifests', to='engine.cloudstorage')), + ], + ), + ] diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index 73c0da4979fa..22272423655f 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -12,6 +12,7 @@ from django.db import models from django.utils.translation import gettext_lazy as _ +from cvat.apps.engine.utils import parse_specific_attributes class SafeCharField(models.CharField): def get_prep_value(self, value): @@ -557,7 +558,7 @@ def __str__(self): class CredentialsTypeChoice(str, Enum): # ignore bandit issues because false positives - TEMP_KEY_SECRET_KEY_TOKEN_SET = 'TEMP_KEY_SECRET_KEY_TOKEN_SET' # nosec + KEY_SECRET_KEY_PAIR = 'KEY_SECRET_KEY_PAIR' # nosec ACCOUNT_NAME_TOKEN_PAIR = 'ACCOUNT_NAME_TOKEN_PAIR' # nosec KEY_FILE_PATH = 'KEY_FILE_PATH' ANONYMOUS_ACCESS = 'ANONYMOUS_ACCESS' @@ -573,6 +574,13 @@ def list(cls): def __str__(self): return self.value +class Manifest(models.Model): + filename = models.CharField(max_length=1024, default='manifest.jsonl') + cloud_storage = models.ForeignKey('CloudStorage', on_delete=models.CASCADE, null=True, related_name='manifests') + + def __str__(self): + return '{}'.format(self.filename) + class CloudStorage(models.Model): # restrictions: # AWS bucket name, Azure container name - 63 @@ -608,11 +616,10 @@ def get_storage_logs_dirname(self): return os.path.join(self.get_storage_dirname(), 'logs') def get_log_path(self): - return os.path.join(self.get_storage_dirname(), "storage.log") + return os.path.join(self.get_storage_logs_dirname(), "storage.log") + + def get_preview_path(self): + return os.path.join(self.get_storage_dirname(), 'preview.jpeg') def get_specific_attributes(self): - specific_attributes = self.specific_attributes - return { - item.split('=')[0].strip(): item.split('=')[1].strip() - for item in specific_attributes.split('&') - } if specific_attributes else dict() + return parse_specific_attributes(self.specific_attributes) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 784c71b88ec5..ae6c9d608caa 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -11,8 +11,9 @@ from cvat.apps.dataset_manager.formats.utils import get_label_color from cvat.apps.engine import models -from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials +from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.log import slogger +from cvat.apps.engine.utils import parse_specific_attributes class BasicUserSerializer(serializers.ModelSerializer): def validate(self, data): @@ -780,8 +781,22 @@ def create(self, validated_data): return db_review +class ManifestSerializer(serializers.ModelSerializer): + class Meta: + model = models.Manifest + fields = ('filename', ) + + # pylint: disable=no-self-use + def to_internal_value(self, data): + return {'filename': data } + + # pylint: disable=no-self-use + def to_representation(self, instance): + return instance.filename if instance else instance + class BaseCloudStorageSerializer(serializers.ModelSerializer): owner = BasicUserSerializer(required=False) + manifests = ManifestSerializer(many=True, default=[]) class Meta: model = models.CloudStorage exclude = ['credentials'] @@ -794,13 +809,14 @@ class CloudStorageSerializer(serializers.ModelSerializer): secret_key = serializers.CharField(max_length=40, allow_blank=True, required=False) key_file_path = serializers.CharField(max_length=64, allow_blank=True, required=False) account_name = serializers.CharField(max_length=24, allow_blank=True, required=False) + manifests = ManifestSerializer(many=True, default=[]) class Meta: model = models.CloudStorage fields = ( 'provider_type', 'resource', 'display_name', 'owner', 'credentials_type', 'created_date', 'updated_date', 'session_token', 'account_name', 'key', - 'secret_key', 'key_file_path', 'specific_attributes', 'description' + 'secret_key', 'key_file_path', 'specific_attributes', 'description', 'id', 'manifests', ) read_only_fields = ('created_date', 'updated_date', 'owner') @@ -833,29 +849,59 @@ def create(self, validated_data): key_file_path=validated_data.pop('key_file_path', ''), credentials_type = validated_data.get('credentials_type') ) + details = { + 'resource': validated_data.get('resource'), + 'credentials': credentials, + 'specific_attributes': parse_specific_attributes(validated_data.get('specific_attributes', '')) + } + storage = get_cloud_storage_instance(cloud_provider=provider_type, **details) if should_be_created: - details = { - 'resource': validated_data.get('resource'), - 'credentials': credentials, - 'specific_attributes': { - item.split('=')[0].strip(): item.split('=')[1].strip() - for item in validated_data.get('specific_attributes').split('&') - } if len(validated_data.get('specific_attributes', '')) - else dict() - } - storage = get_cloud_storage_instance(cloud_provider=provider_type, **details) try: storage.create() except Exception as ex: slogger.glob.warning("Failed with creating storage\n{}".format(str(ex))) raise - db_storage = models.CloudStorage.objects.create( - credentials=credentials.convert_to_db(), - **validated_data - ) - db_storage.save() - return db_storage + storage_status = storage.get_status() + if storage_status == Status.AVAILABLE: + manifests = validated_data.pop('manifests') + # check manifest files availability + for manifest in manifests: + file_status = storage.get_file_status(manifest.get('filename')) + if file_status == Status.NOT_FOUND: + raise serializers.ValidationError({ + 'manifests': "The '{}' file does not exist on '{}' cloud storage" \ + .format(manifest.get('filename'), storage.name) + }) + elif file_status == Status.FORBIDDEN: + raise serializers.ValidationError({ + 'manifests': "The '{}' file does not available on '{}' cloud storage. Access denied" \ + .format(manifest.get('filename'), storage.name) + }) + + db_storage = models.CloudStorage.objects.create( + credentials=credentials.convert_to_db(), + **validated_data + ) + db_storage.save() + + manifest_file_instances = [models.Manifest(**manifest, cloud_storage=db_storage) for manifest in manifests] + models.Manifest.objects.bulk_create(manifest_file_instances) + + cloud_storage_path = db_storage.get_storage_dirname() + if os.path.isdir(cloud_storage_path): + shutil.rmtree(cloud_storage_path) + + os.makedirs(db_storage.get_storage_logs_dirname(), exist_ok=True) + return db_storage + elif storage_status == Status.FORBIDDEN: + field = 'credentials' + message = 'Cannot create resource {} with specified credentials. Access forbidden.'.format(storage.name) + else: + field = 'recource' + message = 'The resource {} not found. It may have been deleted.'.format(storage.name) + slogger.glob.error(message) + raise serializers.ValidationError({field: message}) # pylint: disable=no-self-use def update(self, instance, validated_data): @@ -870,9 +916,50 @@ def update(self, instance, validated_data): instance.credentials_type = validated_data.get('credentials_type', instance.credentials_type) instance.resource = validated_data.get('resource', instance.resource) instance.display_name = validated_data.get('display_name', instance.display_name) - - instance.save() - return instance + instance.description = validated_data.get('description', instance.description) + instance.specific_attributes = validated_data.get('specific_attributes', instance.specific_attributes) + + # check cloud storage existing + details = { + 'resource': instance.resource, + 'credentials': credentials, + 'specific_attributes': parse_specific_attributes(instance.specific_attributes) + } + storage = get_cloud_storage_instance(cloud_provider=instance.provider_type, **details) + storage_status = storage.get_status() + if storage_status == Status.AVAILABLE: + new_manifest_names = set(i.get('filename') for i in validated_data.get('manifests', [])) + previos_manifest_names = set(i.filename for i in instance.manifests.all()) + delta_to_delete = tuple(previos_manifest_names - new_manifest_names) + delta_to_create = tuple(new_manifest_names - previos_manifest_names) + if delta_to_delete: + instance.manifests.filter(filename__in=delta_to_delete).delete() + if delta_to_create: + # check manifest files existing + for manifest in delta_to_create: + file_status = storage.get_file_status(manifest) + if file_status == Status.NOT_FOUND: + raise serializers.ValidationError({ + 'manifests': "The '{}' file does not exist on '{}' cloud storage" + .format(manifest, storage.name) + }) + elif file_status == Status.FORBIDDEN: + raise serializers.ValidationError({ + 'manifests': "The '{}' file does not available on '{}' cloud storage. Access denied" \ + .format(manifest.get('filename'), storage.name) + }) + manifest_instances = [models.Manifest(filename=f, cloud_storage=instance) for f in delta_to_create] + models.Manifest.objects.bulk_create(manifest_instances) + instance.save() + return instance + elif storage_status == Status.FORBIDDEN: + field = 'credentials' + message = 'Cannot update resource {} with specified credentials. Access forbidden.'.format(storage.name) + else: + field = 'recource' + message = 'The resource {} not found. It may have been deleted.'.format(storage.name) + slogger.glob.error(message) + raise serializers.ValidationError({field: message}) class RelatedFileSerializer(serializers.ModelSerializer): diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index 62d39568c191..e3cb293e80dc 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -252,10 +252,20 @@ def _create_thread(tid, data, isImport=False): 'specific_attributes': db_cloud_storage.get_specific_attributes() } cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details) - cloud_storage_instance.download_file(manifest_file[0], db_data.get_manifest_path()) first_sorted_media_image = sorted(media['image'])[0] cloud_storage_instance.download_file(first_sorted_media_image, os.path.join(upload_dir, first_sorted_media_image)) + # prepare task manifest file from cloud storage manifest file + manifest = ImageManifestManager(db_data.get_manifest_path()) + cloud_storage_manifest = ImageManifestManager( + os.path.join(db_data.cloud_storage.get_storage_dirname(), manifest_file[0]) + ) + cloud_storage_manifest.set_index() + media_files = sorted(media['image']) + content = cloud_storage_manifest.get_subset(media_files) + manifest.create(content) + manifest.init_index() + av_scan_paths(upload_dir) job = rq.get_current_job() @@ -370,8 +380,6 @@ def update_progress(progress): if not (db_data.storage == models.StorageChoice.CLOUD_STORAGE): w, h = extractor.get_image_size(0) else: - manifest = ImageManifestManager(db_data.get_manifest_path()) - manifest.init_index() img_properties = manifest[0] w, h = img_properties['width'], img_properties['height'] area = h * w diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 87b7b856e301..c7d8ed49ccd8 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -98,4 +98,11 @@ def md5_hash(frame): frame = frame.to_image() elif isinstance(frame, str): frame = Image.open(frame, 'r') - return hashlib.md5(frame.tobytes()).hexdigest() # nosec \ No newline at end of file + return hashlib.md5(frame.tobytes()).hexdigest() # nosec + +def parse_specific_attributes(specific_attributes): + assert isinstance(specific_attributes, str), 'Specific attributes must be a string' + return { + item.split('=')[0].strip(): item.split('=')[1].strip() + for item in specific_attributes.split('&') + } if specific_attributes else dict() \ No newline at end of file diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 9462f20b7b85..61688290befc 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -2,16 +2,17 @@ # # SPDX-License-Identifier: MIT +import errno import io -import json import os import os.path as osp +import pytz import shutil import traceback import uuid from datetime import datetime from distutils.util import strtobool -from tempfile import mkstemp, TemporaryDirectory +from tempfile import mkstemp, NamedTemporaryFile import cv2 from django.db.models.query import Prefetch @@ -40,10 +41,12 @@ import cvat.apps.dataset_manager as dm import cvat.apps.dataset_manager.views # pylint: disable=unused-import from cvat.apps.authentication import auth -from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials +from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.dataset_manager.bindings import CvatImportError from cvat.apps.dataset_manager.serializers import DatasetFormatsSerializer from cvat.apps.engine.frame_provider import FrameProvider +from cvat.apps.engine.media_extractors import ImageListReader +from cvat.apps.engine.mime_types import mimetypes from cvat.apps.engine.models import ( Job, StatusChoice, Task, Project, Review, Issue, Comment, StorageMethodChoice, ReviewStatus, StorageChoice, Image, @@ -206,6 +209,7 @@ def plugins(request): class ProjectFilter(filters.FilterSet): name = filters.CharFilter(field_name="name", lookup_expr="icontains") owner = filters.CharFilter(field_name="owner__username", lookup_expr="icontains") + assignee = filters.CharFilter(field_name="assignee__username", lookup_expr="icontains") status = filters.CharFilter(field_name="status", lookup_expr="icontains") class Meta: @@ -233,7 +237,7 @@ class Meta: @method_decorator(name='partial_update', decorator=swagger_auto_schema(operation_summary='Methods does a partial update of chosen fields in a project')) class ProjectViewSet(auth.ProjectGetQuerySetMixin, viewsets.ModelViewSet): queryset = models.Project.objects.all().order_by('-id') - search_fields = ("name", "owner__username", "status") + search_fields = ("name", "owner__username", "assignee__username", "status") filterset_class = ProjectFilter ordering_fields = ("id", "name", "owner", "status", "assignee") http_method_names = ['get', 'post', 'head', 'patch', 'delete'] @@ -1186,6 +1190,18 @@ def process_result(self, result, method_name, obj, **kwargs): 'supported: range=aws_range' return result +class CloudStorageFilter(filters.FilterSet): + display_name = filters.CharFilter(field_name='display_name', lookup_expr='icontains') + provider_type = filters.CharFilter(field_name='provider_type', lookup_expr='icontains') + resource = filters.CharFilter(field_name='resource', lookup_expr='icontains') + credentials_type = filters.CharFilter(field_name='credentials_type', lookup_expr='icontains') + description = filters.CharFilter(field_name='description', lookup_expr='icontains') + owner = filters.CharFilter(field_name='owner__username', lookup_expr='icontains') + + class Meta: + model = models.CloudStorage + fields = ('id', 'display_name', 'provider_type', 'resource', 'credentials_type', 'description', 'owner') + @method_decorator( name='retrieve', decorator=swagger_auto_schema( @@ -1225,8 +1241,8 @@ def process_result(self, result, method_name, obj, **kwargs): class CloudStorageViewSet(auth.CloudStorageGetQuerySetMixin, viewsets.ModelViewSet): http_method_names = ['get', 'post', 'patch', 'delete'] queryset = CloudStorageModel.objects.all().prefetch_related('data').order_by('-id') - search_fields = ('provider_type', 'display_name', 'resource', 'owner__username') - filterset_fields = ['provider_type', 'display_name', 'resource', 'credentials_type'] + search_fields = ('provider_type', 'display_name', 'resource', 'credentials_type', 'owner__username', 'description') + filterset_class = CloudStorageFilter def get_permissions(self): http_method = self.request.method @@ -1256,37 +1272,7 @@ def get_queryset(self): return queryset def perform_create(self, serializer): - # check that instance of cloud storage exists - provider_type = serializer.validated_data.get('provider_type') - credentials = Credentials( - session_token=serializer.validated_data.get('session_token', ''), - account_name=serializer.validated_data.get('account_name', ''), - key=serializer.validated_data.get('key', ''), - secret_key=serializer.validated_data.get('secret_key', ''), - key_file_path=serializer.validated_data.get('key_file_path', '') - ) - details = { - 'resource': serializer.validated_data.get('resource'), - 'credentials': credentials, - 'specific_attributes': { - item.split('=')[0].strip(): item.split('=')[1].strip() - for item in serializer.validated_data.get('specific_attributes').split('&') - } if len(serializer.validated_data.get('specific_attributes', '')) - else dict() - } - storage = get_cloud_storage_instance(cloud_provider=provider_type, **details) - try: - storage.exists() - except Exception as ex: - message = str(ex) - slogger.glob.error(message) - raise - - owner = self.request.data.get('owner') - if owner: - serializer.save() - else: - serializer.save(owner=self.request.user) + serializer.save(owner=self.request.user) def perform_destroy(self, instance): cloud_storage_dirname = instance.get_storage_dirname() @@ -1311,7 +1297,7 @@ def create(self, request, *args, **kwargs): msg_body = "" for ex in exceptions.args: for field, ex_msg in ex.items(): - msg_body += ": ".join([field, str(ex_msg[0])]) + msg_body += ': '.join([field, ex_msg if isinstance(ex_msg, str) else str(ex_msg[0])]) msg_body += '\n' return HttpResponseBadRequest(msg_body) except APIException as ex: @@ -1322,14 +1308,14 @@ def create(self, request, *args, **kwargs): @swagger_auto_schema( method='get', - operation_summary='Method returns a mapped names of an available files from a storage and a manifest content', + operation_summary='Method returns a manifest content', manual_parameters=[ openapi.Parameter('manifest_path', openapi.IN_QUERY, description="Path to the manifest file in a cloud storage", type=openapi.TYPE_STRING) ], responses={ - '200': openapi.Response(description='Mapped names of an available files from a storage and a manifest content'), + '200': openapi.Response(description='A manifest content'), }, tags=['cloud storages'] ) @@ -1348,30 +1334,152 @@ def content(self, request, pk): 'specific_attributes': db_storage.get_specific_attributes() } storage = get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details) - storage.initialize_content() - storage_files = storage.content - + if not db_storage.manifests.count(): + raise Exception('There is no manifest file') manifest_path = request.query_params.get('manifest_path', 'manifest.jsonl') - with TemporaryDirectory(suffix='manifest', prefix='cvat') as tmp_dir: - tmp_manifest_path = os.path.join(tmp_dir, 'manifest.jsonl') - storage.download_file(manifest_path, tmp_manifest_path) - manifest = ImageManifestManager(tmp_manifest_path) - manifest.init_index() - manifest_files = manifest.data - content = {f:[] for f in set(storage_files) | set(manifest_files)} - for key, _ in content.items(): - if key in storage_files: content[key].append('s') # storage - if key in manifest_files: content[key].append('m') # manifest - - data = json.dumps(content) - return Response(data=data, content_type="aplication/json") + file_status = storage.get_file_status(manifest_path) + if file_status == Status.NOT_FOUND: + raise FileNotFoundError(errno.ENOENT, + "Not found on the cloud storage {}".format(db_storage.display_name), manifest_path) + elif file_status == Status.FORBIDDEN: + raise PermissionError(errno.EACCES, + "Access to the file on the '{}' cloud storage is denied".format(db_storage.display_name), manifest_path) + + full_manifest_path = os.path.join(db_storage.get_storage_dirname(), manifest_path) + if not os.path.exists(full_manifest_path) or \ + datetime.utcfromtimestamp(os.path.getmtime(full_manifest_path)).replace(tzinfo=pytz.UTC) < storage.get_file_last_modified(manifest_path): + storage.download_file(manifest_path, full_manifest_path) + manifest = ImageManifestManager(full_manifest_path) + # need to update index + manifest.set_index() + manifest_files = manifest.data + return Response(data=manifest_files, content_type="text/plain") except CloudStorageModel.DoesNotExist: message = f"Storage {pk} does not exist" slogger.glob.error(message) return HttpResponseNotFound(message) + except FileNotFoundError as ex: + msg = f"{ex.strerror} {ex.filename}" + slogger.cloud_storage[pk].info(msg) + return Response(data=msg, status=status.HTTP_404_NOT_FOUND) except Exception as ex: - return HttpResponseBadRequest(str(ex)) + # check that cloud storage was not deleted + storage_status = storage.get_status() + if storage_status == Status.FORBIDDEN: + msg = 'The resource {} is no longer available. Access forbidden.'.format(storage.name) + elif storage_status == Status.NOT_FOUND: + msg = 'The resource {} not found. It may have been deleted.'.format(storage.name) + else: + msg = str(ex) + return HttpResponseBadRequest(msg) + + @swagger_auto_schema( + method='get', + operation_summary='Method returns a preview image from a cloud storage', + responses={ + '200': openapi.Response(description='Preview'), + }, + tags=['cloud storages'] + ) + @action(detail=True, methods=['GET'], url_path='preview') + def preview(self, request, pk): + try: + db_storage = CloudStorageModel.objects.get(pk=pk) + if not os.path.exists(db_storage.get_preview_path()): + credentials = Credentials() + credentials.convert_from_db({ + 'type': db_storage.credentials_type, + 'value': db_storage.credentials, + }) + details = { + 'resource': db_storage.resource, + 'credentials': credentials, + 'specific_attributes': db_storage.get_specific_attributes() + } + storage = get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details) + if not db_storage.manifests.count(): + raise Exception('Cannot get the cloud storage preview. There is no manifest file') + preview_path = None + for manifest_model in db_storage.manifests.all(): + full_manifest_path = os.path.join(db_storage.get_storage_dirname(), manifest_model.filename) + if not os.path.exists(full_manifest_path) or \ + datetime.utcfromtimestamp(os.path.getmtime(full_manifest_path)).replace(tzinfo=pytz.UTC) < storage.get_file_last_modified(manifest_model.filename): + storage.download_file(manifest_model.filename, full_manifest_path) + manifest = ImageManifestManager(os.path.join(db_storage.get_storage_dirname(), manifest_model.filename)) + # need to update index + manifest.set_index() + if not len(manifest): + continue + preview_info = manifest[0] + preview_path = ''.join([preview_info['name'], preview_info['extension']]) + break + if not preview_path: + msg = 'Cloud storage {} does not contain any images'.format(pk) + slogger.cloud_storage[pk].info(msg) + return HttpResponseBadRequest(msg) + + file_status = storage.get_file_status(preview_path) + if file_status == Status.NOT_FOUND: + raise FileNotFoundError(errno.ENOENT, + "Not found on the cloud storage {}".format(db_storage.display_name), preview_path) + elif file_status == Status.FORBIDDEN: + raise PermissionError(errno.EACCES, + "Access to the file on the '{}' cloud storage is denied".format(db_storage.display_name), preview_path) + with NamedTemporaryFile() as temp_image: + storage.download_file(preview_path, temp_image.name) + reader = ImageListReader([temp_image.name]) + preview = reader.get_preview() + preview.save(db_storage.get_preview_path()) + content_type = mimetypes.guess_type(db_storage.get_preview_path())[0] + return HttpResponse(open(db_storage.get_preview_path(), 'rb').read(), content_type) + except CloudStorageModel.DoesNotExist: + message = f"Storage {pk} does not exist" + slogger.glob.error(message) + return HttpResponseNotFound(message) + except Exception as ex: + # check that cloud storage was not deleted + storage_status = storage.get_status() + if storage_status == Status.FORBIDDEN: + msg = 'The resource {} is no longer available. Access forbidden.'.format(storage.name) + elif storage_status == Status.NOT_FOUND: + msg = 'The resource {} not found. It may have been deleted.'.format(storage.name) + else: + msg = str(ex) + return HttpResponseBadRequest(msg) + + @swagger_auto_schema( + method='get', + operation_summary='Method returns a cloud storage status', + responses={ + '200': openapi.Response(description='Status'), + }, + tags=['cloud storages'] + ) + @action(detail=True, methods=['GET'], url_path='status') + def status(self, request, pk): + try: + db_storage = CloudStorageModel.objects.get(pk=pk) + credentials = Credentials() + credentials.convert_from_db({ + 'type': db_storage.credentials_type, + 'value': db_storage.credentials, + }) + details = { + 'resource': db_storage.resource, + 'credentials': credentials, + 'specific_attributes': db_storage.get_specific_attributes() + } + storage = get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details) + storage_status = storage.get_status() + return HttpResponse(storage_status) + except CloudStorageModel.DoesNotExist: + message = f"Storage {pk} does not exist" + slogger.glob.error(message) + return HttpResponseNotFound(message) + except Exception as ex: + msg = str(ex) + return HttpResponseBadRequest(msg) def rq_handler(job, exc_type, exc_value, tb): job.exc_info = "".join( @@ -1511,5 +1619,3 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba meta={ 'request_time': timezone.localtime() }, result_ttl=ttl, failure_ttl=ttl) return Response(status=status.HTTP_202_ACCEPTED) - - diff --git a/utils/dataset_manifest/core.py b/utils/dataset_manifest/core.py index b357daf9b58e..02d099255a0b 100644 --- a/utils/dataset_manifest/core.py +++ b/utils/dataset_manifest/core.py @@ -223,6 +223,9 @@ def load(self): self._index = json.load(index_file, object_hook=lambda d: {int(k): v for k, v in d.items()}) + def remove(self): + os.remove(self._path) + def create(self, manifest, skip): assert os.path.exists(manifest), 'A manifest file not exists, index cannot be created' with open(manifest, 'r+') as manifest_file: @@ -265,6 +268,7 @@ class _ManifestManager(ABC): } def __init__(self, path, *args, **kwargs): self._manifest = _Manifest(path) + self._index = _Index(os.path.dirname(self._manifest.path)) def _parse_line(self, line): """ Getting a random line from the manifest file """ @@ -283,13 +287,20 @@ def _parse_line(self, line): return json.loads(properties) def init_index(self): - self._index = _Index(os.path.dirname(self._manifest.path)) if os.path.exists(self._index.path): self._index.load() else: self._index.create(self._manifest.path, 3 if self._manifest.TYPE == 'video' else 2) self._index.dump() + def reset_index(self): + if os.path.exists(self._index.path): + self._index.remove() + + def set_index(self): + self.reset_index() + self.init_index() + @abstractmethod def create(self, content, **kwargs): pass @@ -331,6 +342,10 @@ def index(self): def data(self): pass + @abstractmethod + def get_subset(self, subset_names): + pass + class VideoManifestManager(_ManifestManager): def __init__(self, manifest_path): super().__init__(manifest_path) @@ -394,7 +409,10 @@ def video_length(self): @property def data(self): - return [self.video_name] + return (self.video_name) + + def get_subset(self, subset_names): + raise NotImplementedError() #TODO: add generic manifest structure file validation class ManifestValidator: @@ -476,4 +494,14 @@ def prepare_meta(sources, **kwargs): @property def data(self): - return [f"{image['name']}{image['extension']}" for _, image in self] \ No newline at end of file + return (f"{image['name']}{image['extension']}" for _, image in self) + + def get_subset(self, subset_names): + return ({ + 'name': f"{image['name']}", + 'extension': f"{image['extension']}", + 'width': image['width'], + 'height': image['height'], + 'meta': image['meta'], + 'checksum': f"{image['checksum']}" + } for _, image in self if f"{image['name']}{image['extension']}" in subset_names) \ No newline at end of file