Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extension of cloud storage server part #3386

Merged
merged 27 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8fa9caf
Add preview && some fixes
Marishka17 Jul 3, 2021
46587a9
Fix case with sub dirs on cloud storage
Marishka17 Jul 5, 2021
93eea37
Move server part from ui_support_cloud_storage && fix missing id field
Marishka17 Jul 6, 2021
0aebb6e
Add support_key_secret_key_pair
Marishka17 Jul 9, 2021
41aa91d
Fix several moments
Marishka17 Jul 18, 2021
7e56fa9
Add index resetting
Marishka17 Jul 18, 2021
f94ef7c
Fix pylint errors
Marishka17 Jul 18, 2021
1f2915b
Remove excess migration
Marishka17 Aug 2, 2021
2f5a6ef
Merge branch 'develop' into mk/expansion_server_cloud_storage
Marishka17 Aug 2, 2021
9a8faf4
tmp
Marishka17 Aug 10, 2021
8fb8207
Some fixes
Marishka17 Aug 12, 2021
070dbcf
Fixes
Marishka17 Aug 13, 2021
b3252b1
fix
Marishka17 Aug 13, 2021
deab61b
[server] Add cloud storage status && fixes
Marishka17 Aug 26, 2021
eac737a
Merge develop && resolve conflict
Marishka17 Aug 26, 2021
ea6a0d9
Remove unused import
Marishka17 Aug 26, 2021
3d01a28
Add manifest set_index method
Marishka17 Aug 26, 2021
32761b6
Implement status support for Azure blob container
Marishka17 Aug 26, 2021
f574fee
Move specific attributes parsing into utils
Marishka17 Aug 26, 2021
3ce5bcd
Fix missing in migration
Marishka17 Aug 26, 2021
d354b18
Fix error display
Marishka17 Aug 26, 2021
c1f68a7
some fix
Marishka17 Aug 27, 2021
174e0b4
Merge branch 'develop' into mk/expansion_server_cloud_storage
Marishka17 Aug 27, 2021
b0f42af
Update migration dependency
Marishka17 Aug 30, 2021
5f94b32
Update google cloud storage status
Marishka17 Aug 30, 2021
1cff091
Update migrtaions
Marishka17 Aug 30, 2021
39881bb
Update CHANGELOG
Marishka17 Aug 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from diskcache import Cache
from django.conf import settings
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, gettempdir

from cvat.apps.engine.log import slogger
from cvat.apps.engine.media_extractors import (Mpeg4ChunkWriter,
Expand Down Expand Up @@ -71,6 +71,7 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number):
step=db_data.get_frame_step())
if db_data.storage == StorageChoice.CLOUD_STORAGE:
db_cloud_storage = db_data.cloud_storage
assert db_cloud_storage, 'Cloud storage instance was deleted'
credentials = Credentials()
credentials.convert_from_db({
'type': db_cloud_storage.credentials_type,
Expand All @@ -81,32 +82,48 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number):
'credentials': credentials,
'specific_attributes': db_cloud_storage.get_specific_attributes()
}
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
cloud_storage_instance.initialize_content()
for item in reader:
name = f"{item['name']}{item['extension']}"
if name not in cloud_storage_instance:
raise Exception('{} file was not found on a {} storage'.format(name, cloud_storage_instance.name))
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=name, delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(name)
temp_file.write(buf.getvalue())
checksum = item.get('checksum', None)
if not checksum:
slogger.glob.warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.glob.warning('Hash sums of files {} do not match'.format(name))
images.append((source_path, source_path, None))
try:
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
cloud_storage_instance.initialize_content()
for item in reader:
# full_name may be 'sub_dir/image.jpeg'
full_name = f"{item['name']}{item['extension']}"
if full_name not in cloud_storage_instance:
raise Exception('{} file was not found on a {} storage'.format(full_name, cloud_storage_instance.name))
head, file_name = os.path.split(full_name)
abs_head = os.path.join(gettempdir(), head)
os.makedirs(abs_head, exist_ok=True)
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name, delete=False, dir=abs_head) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(full_name)
temp_file.write(buf.getvalue())
checksum = item.get('checksum', None)
if not checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(full_name))
images.append((source_path, source_path, None))
except Exception as ex:
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
if not cloud_storage_instance.exists():
msg = 'The resource {} is no longer available. It may have been deleted'.format(cloud_storage_instance.name)
else:
msg = str(ex)
raise Exception(msg)
else:
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
images.append((source_path, source_path, None))
writer.save_as_chunk(images, buff)
buff.seek(0)
if db_data.storage == StorageChoice.CLOUD_STORAGE:
tmp = gettempdir()
created_dirs = set(filter(lambda x: x if x.lstrip(tmp) else None, [os.path.dirname(i[0]) for i in images]))
images = [image[0] for image in images if os.path.exists(image[0])]
for image_path in images:
os.remove(image_path)
for created_dir in created_dirs:
if not os.listdir(created_dir):
os.rmdir(created_dir)
return buff, mime_type

def save_chunk(self, db_data_id, chunk_number, quality, buff, mime_type):
Expand Down
93 changes: 79 additions & 14 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ def name(self):
def create(self):
pass

@abstractmethod
def _head_file(self, key):
pass

@abstractmethod
def get_file_last_modified(self, key):
pass

@abstractmethod
def exists(self):
pass
Expand Down Expand Up @@ -81,10 +89,39 @@ def get_cloud_storage_instance(cloud_provider, resource, credentials, specific_a
raise NotImplementedError()
return instance

def check_cloud_storage_existing(provider_type,
credentials_type,
session_token,
account_name,
key,
secret_key,
resource,
specific_attributes):
credentials = Credentials(
key=key,
secret_key=secret_key,
session_token=session_token,
account_name=account_name,
credentials_type=credentials_type)
details = {
'resource': resource,
'credentials': credentials,
'specific_attributes': {
item.split('=')[0].strip(): item.split('=')[1].strip()
for item in specific_attributes.split('&')
} if len(specific_attributes)
else dict()
}
storage = get_cloud_storage_instance(cloud_provider=provider_type, **details)
if not storage.exists():
message = str('The resource {} unavailable'.format(resource))
slogger.glob.error(message)
raise Exception(message)

class AWS_S3(_CloudStorage):
waiter_config = {
'Delay': 5, # The amount of time in seconds to wait between attempts. Default: 5
'MaxAttempts': 3, # The maximum number of attempts to be made. Default: 20
'Delay': 1, # The amount of time in seconds to wait between attempts. Default: 5
'MaxAttempts': 2, # The maximum number of attempts to be made. Default: 20
}
transfer_config = {
'max_io_queue': 10,
Expand All @@ -104,6 +141,13 @@ def __init__(self,
aws_session_token=session_token,
region_name=region
)
elif access_key_id and secret_key:
self._s3 = boto3.resource(
's3',
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_key,
region_name=region
)
elif any([access_key_id, secret_key, session_token]):
raise Exception('Insufficient data for authorization')
# anonymous access
Expand All @@ -129,19 +173,30 @@ def exists(self):
Bucket=self.name,
WaiterConfig=self.waiter_config
)
return True
except WaiterError:
raise Exception('A resource {} unavailable'.format(self.name))
return False

def is_object_exist(self, key_object):
waiter = self._client_s3.get_waiter('object_exists')
try:
waiter.wait(
Bucket=self._bucket,
Bucket=self.name,
Key=key_object,
WaiterConfig=self.waiter_config
WaiterConfig=self.waiter_config,
)
return True
except WaiterError:
raise Exception('A file {} unavailable'.format(key_object))
return False

def _head_file(self, key):
return self._client_s3.head_object(
Bucket=self.name,
Key=key
)

def get_file_last_modified(self, key):
return self._head_file(key).get('LastModified')

def upload_file(self, file_obj, file_name):
self._bucket.upload_fileobj(
Expand Down Expand Up @@ -222,12 +277,19 @@ def create(self):
raise Exception(msg)

def exists(self):
return self._container_client.exists(timeout=5)
return self._container_client.exists(timeout=2)

def is_object_exist(self, file_name):
blob_client = self._container_client.get_blob_client(file_name)
return blob_client.exists()

def _head_file(self, key):
blob_client = self.container.get_blob_client(key)
return blob_client.get_blob_properties()

def get_file_last_modified(self, key):
return self._head_file(key).last_modified

def upload_file(self, file_obj, file_name):
self._container_client.upload_blob(name=file_name, data=file_obj)

Expand Down Expand Up @@ -268,22 +330,25 @@ def __init__(self, **credentials):

def convert_to_db(self):
converted_credentials = {
CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET : \
" ".join([self.key, self.secret_key, self.session_token]),
CredentialsTypeChoice.KEY_SECRET_KEY_PAIR : \
" ".join([self.key, self.secret_key]),
CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR : " ".join([self.account_name, self.session_token]),
CredentialsTypeChoice.ANONYMOUS_ACCESS: "",
CredentialsTypeChoice.ANONYMOUS_ACCESS: "" if not self.account_name else self.account_name,
}
return converted_credentials[self.credentials_type]

def convert_from_db(self, credentials):
self.credentials_type = credentials.get('type')
if self.credentials_type == CredentialsTypeChoice.TEMP_KEY_SECRET_KEY_TOKEN_SET:
self.key, self.secret_key, self.session_token = credentials.get('value').split()
if self.credentials_type == CredentialsTypeChoice.KEY_SECRET_KEY_PAIR:
self.key, self.secret_key = credentials.get('value').split()
elif self.credentials_type == CredentialsTypeChoice.ACCOUNT_NAME_TOKEN_PAIR:
self.account_name, self.session_token = credentials.get('value').split()
elif self.credentials_type == CredentialsTypeChoice.ANONYMOUS_ACCESS:
self.session_token, self.key, self.secret_key = ('', '', '')
# account_name will be in [some_value, '']
self.account_name = credentials.get('value')
else:
self.account_name, self.session_token, self.key, self.secret_key = ('', '', '', '')
self.credentials_type = None
raise NotImplementedError('Found {} not supported credentials type'.format(self.credentials_type))

def mapping_with_new_values(self, credentials):
self.credentials_type = credentials.get('credentials_type', self.credentials_type)
Expand Down
27 changes: 27 additions & 0 deletions cvat/apps/engine/migrations/0041_auto_20210813_0853.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 3.1.13 on 2021-08-13 08:53

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('engine', '0040_cloud_storage'),
]

operations = [
migrations.AlterField(
model_name='cloudstorage',
name='credentials_type',
field=models.CharField(choices=[('KEY_SECRET_KEY_PAIR', 'KEY_SECRET_KEY_PAIR'), ('ACCOUNT_NAME_TOKEN_PAIR', 'ACCOUNT_NAME_TOKEN_PAIR'), ('ANONYMOUS_ACCESS', 'ANONYMOUS_ACCESS')], max_length=29),
),
migrations.CreateModel(
name='Manifest',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('filename', models.CharField(default='manifest.jsonl', max_length=1024)),
('cloud_storage', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='manifests', to='engine.cloudstorage')),
],
),
]
14 changes: 12 additions & 2 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def __str__(self):

class CredentialsTypeChoice(str, Enum):
# ignore bandit issues because false positives
TEMP_KEY_SECRET_KEY_TOKEN_SET = 'TEMP_KEY_SECRET_KEY_TOKEN_SET' # nosec
KEY_SECRET_KEY_PAIR = 'KEY_SECRET_KEY_PAIR' # nosec
ACCOUNT_NAME_TOKEN_PAIR = 'ACCOUNT_NAME_TOKEN_PAIR' # nosec
ANONYMOUS_ACCESS = 'ANONYMOUS_ACCESS'

Expand All @@ -571,6 +571,13 @@ def list(cls):
def __str__(self):
return self.value

class Manifest(models.Model):
filename = models.CharField(max_length=1024, default='manifest.jsonl')
cloud_storage = models.ForeignKey('CloudStorage', on_delete=models.CASCADE, null=True, related_name='manifests')

def __str__(self):
return '{}'.format(self.filename)

class CloudStorage(models.Model):
# restrictions:
# AWS bucket name, Azure container name - 63
Expand Down Expand Up @@ -606,7 +613,10 @@ def get_storage_logs_dirname(self):
return os.path.join(self.get_storage_dirname(), 'logs')

def get_log_path(self):
return os.path.join(self.get_storage_dirname(), "storage.log")
return os.path.join(self.get_storage_logs_dirname(), "storage.log")

def get_preview_path(self):
return os.path.join(self.get_storage_dirname(), 'preview.jpeg')

def get_specific_attributes(self):
specific_attributes = self.specific_attributes
Expand Down
42 changes: 40 additions & 2 deletions cvat/apps/engine/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from cvat.apps.dataset_manager.formats.utils import get_label_color
from cvat.apps.engine import models
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, check_cloud_storage_existing, Credentials
from cvat.apps.engine.log import slogger

class BasicUserSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -771,8 +771,22 @@ def create(self, validated_data):

return db_review

class ManifestSerializer(serializers.ModelSerializer):
class Meta:
model = models.Manifest
fields = ('filename', )

# pylint: disable=no-self-use
def to_internal_value(self, data):
return {'filename': data }

# pylint: disable=no-self-use
def to_representation(self, instance):
return instance.filename if instance else instance

class BaseCloudStorageSerializer(serializers.ModelSerializer):
owner = BasicUserSerializer(required=False)
manifests = ManifestSerializer(many=True, default=[])
class Meta:
model = models.CloudStorage
exclude = ['credentials']
Expand All @@ -784,13 +798,14 @@ class CloudStorageSerializer(serializers.ModelSerializer):
key = serializers.CharField(max_length=20, allow_blank=True, required=False)
secret_key = serializers.CharField(max_length=40, allow_blank=True, required=False)
account_name = serializers.CharField(max_length=24, allow_blank=True, required=False)
manifests = ManifestSerializer(many=True, default=[])

class Meta:
model = models.CloudStorage
fields = (
'provider_type', 'resource', 'display_name', 'owner', 'credentials_type',
'created_date', 'updated_date', 'session_token', 'account_name', 'key',
'secret_key', 'specific_attributes', 'description'
'secret_key', 'specific_attributes', 'description', 'id', 'manifests',
)
read_only_fields = ('created_date', 'updated_date', 'owner')

Expand Down Expand Up @@ -836,11 +851,22 @@ def create(self, validated_data):
slogger.glob.warning("Failed with creating storage\n{}".format(str(ex)))
raise

manifests = validated_data.pop('manifests')

db_storage = models.CloudStorage.objects.create(
credentials=credentials.convert_to_db(),
**validated_data
)
db_storage.save()

manifest_file_instances = [models.Manifest(**manifest, cloud_storage=db_storage) for manifest in manifests]
models.Manifest.objects.bulk_create(manifest_file_instances)

cloud_storage_path = db_storage.get_storage_dirname()
if os.path.isdir(cloud_storage_path):
shutil.rmtree(cloud_storage_path)

os.makedirs(db_storage.get_storage_logs_dirname(), exist_ok=True)
return db_storage

# pylint: disable=no-self-use
Expand All @@ -857,6 +883,18 @@ def update(self, instance, validated_data):
instance.resource = validated_data.get('resource', instance.resource)
instance.display_name = validated_data.get('display_name', instance.display_name)

check_cloud_storage_existing(instance.provider_type, instance.credentials_type, credentials.session_token,
credentials.account_name, credentials.key, credentials.secret_key, instance.resource, instance.specific_attributes)

new_manifest_names = set(i.get('filename') for i in validated_data.get('manifests', []))
previos_manifest_names = set(i.filename for i in instance.manifests.all())
delta_to_delete = tuple(previos_manifest_names - new_manifest_names)
delta_to_create = tuple(new_manifest_names - previos_manifest_names)
if delta_to_delete:
instance.manifests.filter(filename__in=delta_to_delete).delete()
if delta_to_create:
manifest_instances = [models.Manifest(filename=f, cloud_storage=instance) for f in delta_to_create]
models.Manifest.objects.bulk_create(manifest_instances)
instance.save()
return instance

Expand Down
Loading