Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension of cloud storage server part #3386

Merged
merged 27 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8fa9caf
Add preview && some fixes
Marishka17 Jul 3, 2021
46587a9
Fix case with sub dirs on cloud storage
Marishka17 Jul 5, 2021
93eea37
Move server part from ui_support_cloud_storage && fix missing id field
Marishka17 Jul 6, 2021
0aebb6e
Add support_key_secret_key_pair
Marishka17 Jul 9, 2021
41aa91d
Fix several moments
Marishka17 Jul 18, 2021
7e56fa9
Add index resetting
Marishka17 Jul 18, 2021
f94ef7c
Fix pylint errors
Marishka17 Jul 18, 2021
1f2915b
Remove excess migration
Marishka17 Aug 2, 2021
2f5a6ef
Merge branch 'develop' into mk/expansion_server_cloud_storage
Marishka17 Aug 2, 2021
9a8faf4
tmp
Marishka17 Aug 10, 2021
8fb8207
Some fixes
Marishka17 Aug 12, 2021
070dbcf
Fixes
Marishka17 Aug 13, 2021
b3252b1
fix
Marishka17 Aug 13, 2021
deab61b
[server] Add cloud storage status && fixes
Marishka17 Aug 26, 2021
eac737a
Merge develop && resolve conflict
Marishka17 Aug 26, 2021
ea6a0d9
Remove unused import
Marishka17 Aug 26, 2021
3d01a28
Add manifest set_index method
Marishka17 Aug 26, 2021
32761b6
Implement status support for Azure blob container
Marishka17 Aug 26, 2021
f574fee
Move specific attributes parsing into utils
Marishka17 Aug 26, 2021
3ce5bcd
Fix missing in migration
Marishka17 Aug 26, 2021
d354b18
Fix error display
Marishka17 Aug 26, 2021
c1f68a7
some fix
Marishka17 Aug 27, 2021
174e0b4
Merge branch 'develop' into mk/expansion_server_cloud_storage
Marishka17 Aug 27, 2021
b0f42af
Update migration dependency
Marishka17 Aug 30, 2021
5f94b32
Update google cloud storage status
Marishka17 Aug 30, 2021
1cff091
Update migrtaions
Marishka17 Aug 30, 2021
39881bb
Update CHANGELOG
Marishka17 Aug 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ImageDatasetManifestReader, VideoDatasetManifestReader)
from cvat.apps.engine.models import DataChoice, StorageChoice
from cvat.apps.engine.models import DimensionType
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status
from cvat.apps.engine.utils import md5_hash
class CacheInteraction:
def __init__(self, dimension=DimensionType.DIM_2D):
Expand Down Expand Up @@ -71,6 +71,7 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number):
step=db_data.get_frame_step())
if db_data.storage == StorageChoice.CLOUD_STORAGE:
db_cloud_storage = db_data.cloud_storage
assert db_cloud_storage, 'Cloud storage instance was deleted'
credentials = Credentials()
credentials.convert_from_db({
'type': db_cloud_storage.credentials_type,
Expand All @@ -81,22 +82,38 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number):
'credentials': credentials,
'specific_attributes': db_cloud_storage.get_specific_attributes()
}
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
cloud_storage_instance.initialize_content()
for item in reader:
name = f"{item['name']}{item['extension']}"
if name not in cloud_storage_instance:
raise Exception('{} file was not found on a {} storage'.format(name, cloud_storage_instance.name))
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=name.replace(os.path.sep, '#'), delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(name)
temp_file.write(buf.getvalue())
checksum = item.get('checksum', None)
if not checksum:
slogger.glob.warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.glob.warning('Hash sums of files {} do not match'.format(name))
images.append((source_path, source_path, None))
try:
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
cloud_storage_instance.initialize_content()
for item in reader:
file_name = f"{item['name']}{item['extension']}"
if file_name not in cloud_storage_instance:
raise Exception('{} file was not found on a {} storage'.format(file_name, cloud_storage_instance.name))
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(file_name)
temp_file.write(buf.getvalue())
checksum = item.get('checksum', None)
if not checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name))
images.append((source_path, source_path, None))
except Exception as ex:
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
storage_status = cloud_storage_instance.get_status()
if storage_status == Status.FORBIDDEN:
msg = 'The resource {} is no longer available. Access forbidden.'.format(cloud_storage_instance.name)
elif storage_status == Status.NOT_FOUND:
msg = 'The resource {} not found. It may have been deleted.'.format(cloud_storage_instance.name)
else:
# check status of last file
file_status = cloud_storage_instance.get_file_status(file_name)
if file_status == Status.NOT_FOUND:
raise Exception("'{}' not found on the cloud storage '{}'".format(file_name, cloud_storage_instance.name))
elif file_status == Status.FORBIDDEN:
raise Exception("Access to the file '{}' on the '{}' cloud storage is denied".format(file_name, cloud_storage_instance.name))
msg = str(ex)
raise Exception(msg)
else:
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
Expand Down
222 changes: 170 additions & 52 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
#from dataclasses import dataclass
# Copyright (C) 2021 Intel Corporation
#
# SPDX-License-Identifier: MIT

import os
import boto3

from abc import ABC, abstractmethod, abstractproperty
from enum import Enum
from io import BytesIO
import os
import os.path

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.exceptions import WaiterError
from botocore.exceptions import ClientError
from botocore.handlers import disable_signing

from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
from azure.core.exceptions import ResourceExistsError, HttpResponseError
from azure.storage.blob import PublicAccess

from google.cloud import storage
from google.cloud.exceptions import NotFound as GoogleCloudNotFound, Forbidden as GoogleCloudForbidden

from cvat.apps.engine.log import slogger
from cvat.apps.engine.models import CredentialsTypeChoice, CloudProviderChoice

class Status(str, Enum):
AVAILABLE = 'AVAILABLE'
NOT_FOUND = 'NOT_FOUND'
FORBIDDEN = 'FORBIDDEN'

@classmethod
def choices(cls):
return tuple((x.value, x.name) for x in cls)

def __str__(self):
return self.value

class _CloudStorage(ABC):

def __init__(self):
Expand All @@ -32,7 +49,23 @@ def create(self):
pass

@abstractmethod
def exists(self):
def _head_file(self, key):
pass

@abstractmethod
def _head(self):
pass

@abstractmethod
def get_status(self):
pass

@abstractmethod
def get_file_status(self, key):
pass

@abstractmethod
def get_file_last_modified(self, key):
pass

@abstractmethod
Expand Down Expand Up @@ -95,10 +128,6 @@ def get_cloud_storage_instance(cloud_provider, resource, credentials, specific_a
return instance

class AWS_S3(_CloudStorage):
waiter_config = {
'Delay': 5, # The amount of time in seconds to wait between attempts. Default: 5
'MaxAttempts': 3, # The maximum number of attempts to be made. Default: 20
}
transfer_config = {
'max_io_queue': 10,
}
Expand All @@ -117,6 +146,13 @@ def __init__(self,
aws_session_token=session_token,
region_name=region
)
elif access_key_id and secret_key:
self._s3 = boto3.resource(
's3',
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_key,
region_name=region
)
elif any([access_key_id, secret_key, session_token]):
raise Exception('Insufficient data for authorization')
# anonymous access
Expand All @@ -135,26 +171,38 @@ def bucket(self):
def name(self):
return self._bucket.name

def exists(self):
waiter = self._client_s3.get_waiter('bucket_exists')
try:
waiter.wait(
Bucket=self.name,
WaiterConfig=self.waiter_config
)
except WaiterError:
raise Exception('A resource {} unavailable'.format(self.name))
def _head(self):
return self._client_s3.head_bucket(Bucket=self.name)

def is_object_exist(self, key_object):
waiter = self._client_s3.get_waiter('object_exists')
def _head_file(self, key):
return self._client_s3.head_object(Bucket=self.name, Key=key)

def get_status(self):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object
# return only 3 codes: 200, 403, 404
try:
waiter.wait(
Bucket=self._bucket,
Key=key_object,
WaiterConfig=self.waiter_config
)
except WaiterError:
raise Exception('A file {} unavailable'.format(key_object))
self._head()
return Status.AVAILABLE
except ClientError as ex:
code = ex.response['Error']['Code']
if code == '403':
return Status.FORBIDDEN
else:
return Status.NOT_FOUND

def get_file_status(self, key):
try:
self._head_file(key)
return Status.AVAILABLE
except ClientError as ex:
code = ex.response['Error']['Code']
if code == '403':
return Status.FORBIDDEN
else:
return Status.NOT_FOUND

def get_file_last_modified(self, key):
return self._head_file(key).get('LastModified')

def upload_file(self, file_obj, file_name):
self._bucket.upload_fileobj(
Expand Down Expand Up @@ -234,12 +282,35 @@ def create(self):
slogger.glob.info(msg)
raise Exception(msg)

def exists(self):
return self._container_client.exists(timeout=5)
def _head(self):
return self._container_client.get_container_properties()

def _head_file(self, key):
blob_client = self.container.get_blob_client(key)
return blob_client.get_blob_properties()

def is_object_exist(self, file_name):
blob_client = self._container_client.get_blob_client(file_name)
return blob_client.exists()
def get_file_last_modified(self, key):
return self._head_file(key).last_modified

def get_status(self):
try:
self._head()
return Status.AVAILABLE
except HttpResponseError as ex:
if ex.status_code == 403:
return Status.FORBIDDEN
else:
return Status.NOT_FOUND

def get_file_status(self, key):
try:
self._head_file(key)
return Status.AVAILABLE
except HttpResponseError as ex:
if ex.status_code == 403:
return Status.FORBIDDEN
else:
return Status.NOT_FOUND

def upload_file(self, file_obj, file_name):
self._container_client.upload_blob(name=file_name, data=file_obj)
Expand Down Expand Up @@ -269,6 +340,20 @@ def download_fileobj(self, key):
class GOOGLE_DRIVE(_CloudStorage):
pass

def _define_gcs_status(func):
def wrapper(self, key=None):
try:
if not key:
func(self)
else:
func(self, key)
return Status.AVAILABLE
except GoogleCloudNotFound:
return Status.NOT_FOUND
except GoogleCloudForbidden:
return Status.FORBIDDEN
return wrapper

class GoogleCloudStorage(_CloudStorage):

def __init__(self, bucket_name, prefix=None, service_account_json=None, project=None, location=None):
Expand All @@ -294,8 +379,20 @@ def bucket(self):
def name(self):
return self._bucket.name

def exists(self):
return self._storage_client.lookup_bucket(self.name) is not None
def _head(self):
return self._storage_client.get_bucket(bucket_or_name=self.name)

def _head_file(self, key):
blob = self.bucket.blob(key)
return self._storage_client._get_resource(blob.path)

@_define_gcs_status
def get_status(self):
self._head()

@_define_gcs_status
def get_file_status(self, key):
self._head_file(key)

def initialize_content(self):
self._files = [
Expand All @@ -314,9 +411,6 @@ def download_fileobj(self, key):
buf.seek(0)
return buf

def is_object_exist(self, key):
return self.bucket.blob(key).exists()

def upload_file(self, file_obj, file_name):
self.bucket.blob(file_name).upload_from_file(file_obj)

Expand All @@ -342,7 +436,6 @@ def get_file_last_modified(self, key):
blob.reload()
return blob.updated


class Credentials:
__slots__ = ('key', 'secret_key', 'session_token', 'account_name', 'key_file_path', 'credentials_type')

Expand All @@ -356,33 +449,58 @@ def __init__(self, **credentials):

def convert_to_db(self):
converted_credentials = {
CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET : \
" ".join([self.key, self.secret_key, self.session_token]),
CredentialsTypeChoice.KEY_SECRET_KEY_PAIR : \
" ".join([self.key, self.secret_key]),
CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR : " ".join([self.account_name, self.session_token]),
CredentialsTypeChoice.KEY_FILE_PATH: self.key_file_path,
CredentialsTypeChoice.ANONYMOUS_ACCESS: "",
CredentialsTypeChoice.ANONYMOUS_ACCESS: "" if not self.account_name else self.account_name,
}
return converted_credentials[self.credentials_type]

def convert_from_db(self, credentials):
self.credentials_type = credentials.get('type')
if self.credentials_type == CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET:
self.key, self.secret_key, self.session_token = credentials.get('value').split()
if self.credentials_type == CredentialsTypeChoice.KEY_SECRET_KEY_PAIR:
self.key, self.secret_key = credentials.get('value').split()
elif self.credentials_type == CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR:
self.account_name, self.session_token = credentials.get('value').split()
elif self.credentials_type == CredentialsTypeChoice.ANONYMOUS_ACCESS:
self.session_token, self.key, self.secret_key = ('', '', '')
# account_name will be in [some_value, '']
self.account_name = credentials.get('value')
elif self.credentials_type == CredentialsTypeChoice.KEY_FILE_PATH:
self.key_file_path = credentials.get('value')
else:
self.account_name, self.session_token, self.key, self.secret_key = ('', '', '', '')
self.credentials_type = None
raise NotImplementedError('Found {} not supported credentials type'.format(self.credentials_type))

def mapping_with_new_values(self, credentials):
self.credentials_type = credentials.get('credentials_type', self.credentials_type)
self.key = credentials.get('key', self.key)
self.secret_key = credentials.get('secret_key', self.secret_key)
self.session_token = credentials.get('session_token', self.session_token)
self.account_name = credentials.get('account_name', self.account_name)
self.key_file_path = credentials.get('key_file_path', self.key_file_path)
if self.credentials_type == CredentialsTypeChoice.ANONYMOUS_ACCESS:
self.key = ''
self.secret_key = ''
self.session_token = ''
self.key_file_path = ''
self.account_name = credentials.get('account_name', self.account_name)
elif self.credentials_type == CredentialsTypeChoice.KEY_SECRET_KEY_PAIR:
self.key = credentials.get('key', self.key)
self.secret_key = credentials.get('secret_key', self.secret_key)
self.session_token = ''
self.account_name = ''
self.key_file_path = ''
elif self.credentials_type == CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR:
self.session_token = credentials.get('session_token', self.session_token)
self.account_name = credentials.get('account_name', self.account_name)
self.key = ''
self.secret_key = ''
self.key_file_path = ''
elif self.credentials_type == CredentialsTypeChoice.KEY_FILE_PATH:
self.key = ''
self.secret_key = ''
self.session_token = ''
self.account_name = ''
self.key_file_path = credentials.get('key_file_path', self.key_file_path)
else:
raise NotImplementedError('Mapping credentials: unsupported credentials type')


def values(self):
return [self.key, self.secret_key, self.session_token, self.account_name, self.key_file_path]
Loading