Skip to content

Commit

Permalink
Tus for task annotations import (#4327)
Browse files Browse the repository at this point in the history
* add data_type to tus mixin

* added tus for task annotations import

* added tus for jobs annotations import

* applied comments

* fix test

* fix incorrect upload endpoint

* add location creation based on origin

* remove unused import

* remove data_type

* remove unused comment

* update changelog to new release

Co-authored-by: Nikita Manovich <[email protected]>
  • Loading branch information
klakhov and Nikita Manovich authored Mar 11, 2022
1 parent 1225fbb commit 1fa2676
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 151 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## \[2.1.0] - Unreleased
### Added
- Task annotations importing via chunk uploads (<https://github.com/openvinotoolkit/cvat/pull/4327>)
- Advanced filtration and sorting for a list of tasks/projects/cloudstorages (<https://github.com/openvinotoolkit/cvat/pull/4403>)

### Changed
Expand Down
161 changes: 98 additions & 63 deletions cvat-core/src/server-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,49 @@
});
}

async function chunkUpload(file, uploadConfig) {
const params = enableOrganization();
const {
endpoint, chunkSize, totalSize, onUpdate,
} = uploadConfig;
let { totalSentSize } = uploadConfig;
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: Axios.defaults.headers.common.Authorization,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
if (onUpdate && Number.isInteger(totalSentSize) && Number.isInteger(totalSize)) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate(percentage);
}
},
onSuccess() {
if (totalSentSize) totalSentSize += file.size;
resolve(totalSentSize);
},
});
upload.start();
});
}

function generateError(errorData) {
if (errorData.response) {
const message = `${errorData.message}. ${JSON.stringify(errorData.response.data) || ''}.`;
Expand Down Expand Up @@ -816,42 +859,6 @@

onUpdate('The data are being uploaded to the server..', null);

async function chunkUpload(taskId, file) {
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint: `${origin}${backendAPI}/tasks/${taskId}/data/`,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: `Token ${store.get('token')}`,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate('The data are being uploaded to the server', percentage);
},
onSuccess() {
totalSentSize += file.size;
resolve();
},
});
upload.start();
});
}

async function bulkUpload(taskId, files) {
const fileBulks = files.reduce((fileGroups, file) => {
const lastBulk = fileGroups[fileGroups.length - 1];
Expand Down Expand Up @@ -891,8 +898,17 @@
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
const uploadConfig = {
endpoint: `${origin}${backendAPI}/tasks/${response.data.id}/data/`,
onUpdate: (percentage) => {
onUpdate('The data are being uploaded to the server', percentage);
},
chunkSize,
totalSize,
totalSentSize,
};
for (const file of chunkFiles) {
await chunkUpload(response.data.id, file);
uploadConfig.totalSentSize += await chunkUpload(file, uploadConfig);
}
if (bulkFiles.length > 0) {
await bulkUpload(response.data.id, bulkFiles);
Expand Down Expand Up @@ -1215,38 +1231,57 @@

// Session is 'task' or 'job'
async function uploadAnnotations(session, id, file, format) {
const { backendAPI } = config;
const { backendAPI, origin } = config;
const params = {
...enableOrganization(),
format,
filename: file.name,
};
let annotationData = new FormData();
annotationData.append('annotation_file', file);

return new Promise((resolve, reject) => {
async function request() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
annotationData,
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
annotationData = new FormData();
setTimeout(request, 3000);
} else {
resolve();
const chunkSize = config.uploadChunkSize * 1024 * 1024;
const uploadConfig = {
chunkSize,
endpoint: `${origin}${backendAPI}/${session}s/${id}/annotations/`,
};
try {
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Finish': true },
});
return new Promise((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
new FormData(),
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
setTimeout(requestStatus, 3000);
} else {
resolve();
}
} catch (errorData) {
reject(generateError(errorData));
}
} catch (errorData) {
reject(generateError(errorData));
}
}

setTimeout(request);
});
setTimeout(requestStatus);
});
} catch (errorData) {
generateError(errorData);
return null;
}
}

// Session is 'task' or 'job'
Expand Down
32 changes: 10 additions & 22 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django.conf import settings
from django.core.cache import cache
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response

from cvat.apps.engine.serializers import DataSerializer
Expand All @@ -26,6 +25,7 @@ def __init__(self, file_id, upload_dir):
self.offset = cache.get("tus-uploads/{}/offset".format(file_id))

def init_file(self):
os.makedirs(self.upload_dir, exist_ok=True)
file_path = os.path.join(self.upload_dir, self.file_id)
with open(file_path, 'wb') as file:
file.seek(self.file_size - 1)
Expand Down Expand Up @@ -100,7 +100,7 @@ class UploadMixin(object):
'Access-Control-Allow-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset,content-type",
'Cache-Control': 'no-store'
}
_file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'
file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'

def _tus_response(self, status, data=None, extra_headers=None):
response = Response(data, status)
Expand Down Expand Up @@ -147,9 +147,6 @@ def init_tus_upload(self, request):
if request.method == 'OPTIONS':
return self._tus_response(status=status.HTTP_204)
else:
if not self.can_upload():
return self._tus_response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
metadata = self._get_metadata(request)
filename = metadata.get('filename', '')
if not self.validate_filename(filename):
Expand All @@ -173,13 +170,14 @@ def init_tus_upload(self, request):

tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir())

location = request.build_absolute_uri()
if 'HTTP_X_FORWARDED_HOST' not in request.META:
location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')
return self._tus_response(
status=status.HTTP_201_CREATED,
extra_headers={'Location': '{}{}'.format(request.build_absolute_uri(), tus_file.file_id)})
extra_headers={'Location': '{}{}'.format(location, tus_file.file_id)})

@action(detail=True, methods=['HEAD', 'PATCH'], url_path=r'data/'+_file_id_regex)
def append_tus_chunk(self, request, pk, file_id):
self.get_object() # call check_object_permissions as well
def append_tus_chunk(self, request, file_id):
if request.method == 'HEAD':
tus_file = TusFile.get_tusfile(str(file_id), self.get_upload_dir())
if tus_file:
Expand Down Expand Up @@ -211,26 +209,16 @@ def validate_filename(self, filename):
file_path = os.path.join(upload_dir, filename)
return os.path.commonprefix((os.path.realpath(file_path), upload_dir)) == upload_dir

def can_upload(self):
db_model = self.get_object()
model_data = db_model.data
return model_data.size == 0

def get_upload_dir(self):
db_model = self.get_object()
return db_model.data.get_upload_dirname()
return self._object.data.get_upload_dirname()

def get_request_client_files(self, request):
db_model = self.get_object()
serializer = DataSerializer(db_model, data=request.data)
serializer = DataSerializer(self._object, data=request.data)
serializer.is_valid(raise_exception=True)
data = {k: v for k, v in serializer.validated_data.items()}
return data.get('client_files', None);
return data.get('client_files', None)

def append(self, request):
if not self.can_upload():
return Response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
client_files = self.get_request_client_files(request)
if client_files:
upload_dir = self.get_upload_dir()
Expand Down
3 changes: 3 additions & 0 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ def get_log_path(self):
def get_task_artifacts_dirname(self):
return os.path.join(self.get_task_dirname(), 'artifacts')

def get_tmp_dirname(self):
return os.path.join(self.get_task_dirname(), "tmp")

def __str__(self):
return self.name

Expand Down
Loading

0 comments on commit 1fa2676

Please sign in to comment.