Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tus for task annotations import #4327

Merged
merged 16 commits into from
Mar 11, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## \[2.1.0] - Unreleased
### Added
- Task annotations importing via chunk uploads (<https://github.com/openvinotoolkit/cvat/pull/4327>)
- Advanced filtration and sorting for a list of tasks/projects/cloudstorages (<https://github.com/openvinotoolkit/cvat/pull/4403>)

### Changed
Expand Down
161 changes: 98 additions & 63 deletions cvat-core/src/server-proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,49 @@
});
}

async function chunkUpload(file, uploadConfig) {
const params = enableOrganization();
const {
endpoint, chunkSize, totalSize, onUpdate,
} = uploadConfig;
let { totalSentSize } = uploadConfig;
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: Axios.defaults.headers.common.Authorization,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
if (onUpdate && Number.isInteger(totalSentSize) && Number.isInteger(totalSize)) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate(percentage);
}
},
onSuccess() {
if (totalSentSize) totalSentSize += file.size;
resolve(totalSentSize);
},
});
upload.start();
});
}

function generateError(errorData) {
if (errorData.response) {
const message = `${errorData.message}. ${JSON.stringify(errorData.response.data) || ''}.`;
Expand Down Expand Up @@ -816,42 +859,6 @@

onUpdate('The data are being uploaded to the server..', null);

async function chunkUpload(taskId, file) {
return new Promise((resolve, reject) => {
const upload = new tus.Upload(file, {
endpoint: `${origin}${backendAPI}/tasks/${taskId}/data/`,
metadata: {
filename: file.name,
filetype: file.type,
},
headers: {
Authorization: `Token ${store.get('token')}`,
},
chunkSize,
retryDelays: null,
onError(error) {
reject(error);
},
onBeforeRequest(req) {
const xhr = req.getUnderlyingObject();
const { org } = params;
req.setHeader('X-Organization', org);
xhr.withCredentials = true;
},
onProgress(bytesUploaded) {
const currentUploadedSize = totalSentSize + bytesUploaded;
const percentage = currentUploadedSize / totalSize;
onUpdate('The data are being uploaded to the server', percentage);
},
onSuccess() {
totalSentSize += file.size;
resolve();
},
});
upload.start();
});
}

async function bulkUpload(taskId, files) {
const fileBulks = files.reduce((fileGroups, file) => {
const lastBulk = fileGroups[fileGroups.length - 1];
Expand Down Expand Up @@ -891,8 +898,17 @@
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
const uploadConfig = {
endpoint: `${origin}${backendAPI}/tasks/${response.data.id}/data/`,
onUpdate: (percentage) => {
onUpdate('The data are being uploaded to the server', percentage);
},
chunkSize,
totalSize,
totalSentSize,
};
for (const file of chunkFiles) {
await chunkUpload(response.data.id, file);
uploadConfig.totalSentSize += await chunkUpload(file, uploadConfig);
}
if (bulkFiles.length > 0) {
await bulkUpload(response.data.id, bulkFiles);
Expand Down Expand Up @@ -1215,38 +1231,57 @@

// Session is 'task' or 'job'
async function uploadAnnotations(session, id, file, format) {
const { backendAPI } = config;
const { backendAPI, origin } = config;
const params = {
...enableOrganization(),
format,
filename: file.name,
};
let annotationData = new FormData();
annotationData.append('annotation_file', file);

return new Promise((resolve, reject) => {
async function request() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
annotationData,
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
annotationData = new FormData();
setTimeout(request, 3000);
} else {
resolve();
const chunkSize = config.uploadChunkSize * 1024 * 1024;
const uploadConfig = {
chunkSize,
endpoint: `${origin}${backendAPI}/${session}s/${id}/annotations/`,
};
try {
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(`${backendAPI}/${session}s/${id}/annotations`,
new FormData(), {
params,
proxy: config.proxy,
headers: { 'Upload-Finish': true },
});
return new Promise((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.put(
`${backendAPI}/${session}s/${id}/annotations`,
new FormData(),
{
params,
proxy: config.proxy,
},
);
if (response.status === 202) {
setTimeout(requestStatus, 3000);
} else {
resolve();
}
} catch (errorData) {
reject(generateError(errorData));
}
} catch (errorData) {
reject(generateError(errorData));
}
}

setTimeout(request);
});
setTimeout(requestStatus);
});
} catch (errorData) {
generateError(errorData);
return null;
}
}

// Session is 'task' or 'job'
Expand Down
32 changes: 10 additions & 22 deletions cvat/apps/engine/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django.conf import settings
from django.core.cache import cache
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response

from cvat.apps.engine.serializers import DataSerializer
Expand All @@ -26,6 +25,7 @@ def __init__(self, file_id, upload_dir):
self.offset = cache.get("tus-uploads/{}/offset".format(file_id))

def init_file(self):
os.makedirs(self.upload_dir, exist_ok=True)
file_path = os.path.join(self.upload_dir, self.file_id)
with open(file_path, 'wb') as file:
file.seek(self.file_size - 1)
Expand Down Expand Up @@ -100,7 +100,7 @@ class UploadMixin(object):
'Access-Control-Allow-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset,content-type",
'Cache-Control': 'no-store'
}
_file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'
file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'

def _tus_response(self, status, data=None, extra_headers=None):
response = Response(data, status)
Expand Down Expand Up @@ -147,9 +147,6 @@ def init_tus_upload(self, request):
if request.method == 'OPTIONS':
return self._tus_response(status=status.HTTP_204)
else:
if not self.can_upload():
return self._tus_response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
metadata = self._get_metadata(request)
filename = metadata.get('filename', '')
if not self.validate_filename(filename):
Expand All @@ -173,13 +170,14 @@ def init_tus_upload(self, request):

tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir())

location = request.build_absolute_uri()
if 'HTTP_X_FORWARDED_HOST' not in request.META:
location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')
return self._tus_response(
status=status.HTTP_201_CREATED,
extra_headers={'Location': '{}{}'.format(request.build_absolute_uri(), tus_file.file_id)})
extra_headers={'Location': '{}{}'.format(location, tus_file.file_id)})

@action(detail=True, methods=['HEAD', 'PATCH'], url_path=r'data/'+_file_id_regex)
def append_tus_chunk(self, request, pk, file_id):
self.get_object() # call check_object_permissions as well
def append_tus_chunk(self, request, file_id):
if request.method == 'HEAD':
tus_file = TusFile.get_tusfile(str(file_id), self.get_upload_dir())
if tus_file:
Expand Down Expand Up @@ -211,26 +209,16 @@ def validate_filename(self, filename):
file_path = os.path.join(upload_dir, filename)
return os.path.commonprefix((os.path.realpath(file_path), upload_dir)) == upload_dir

def can_upload(self):
db_model = self.get_object()
model_data = db_model.data
return model_data.size == 0

def get_upload_dir(self):
db_model = self.get_object()
return db_model.data.get_upload_dirname()
return self._object.data.get_upload_dirname()

def get_request_client_files(self, request):
db_model = self.get_object()
serializer = DataSerializer(db_model, data=request.data)
serializer = DataSerializer(self._object, data=request.data)
serializer.is_valid(raise_exception=True)
data = {k: v for k, v in serializer.validated_data.items()}
return data.get('client_files', None);
return data.get('client_files', None)

def append(self, request):
if not self.can_upload():
return Response(data='Adding more data is not allowed',
status=status.HTTP_400_BAD_REQUEST)
client_files = self.get_request_client_files(request)
if client_files:
upload_dir = self.get_upload_dir()
Expand Down
3 changes: 3 additions & 0 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ def get_log_path(self):
def get_task_artifacts_dirname(self):
return os.path.join(self.get_task_dirname(), 'artifacts')

def get_tmp_dirname(self):
return os.path.join(self.get_task_dirname(), "tmp")

def __str__(self):
return self.name

Expand Down
Loading