diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index 19cf587cff2f..7426e9017090 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -5,6 +5,7 @@ import math from io import BytesIO from enum import Enum +import itertools import numpy as np from PIL import Image @@ -80,7 +81,7 @@ def _get_frame(self, frame_number, chunk_path_getter, extracted_chunk, chunk_rea extracted_chunk = chunk_number chunk_reader = reader_class([chunk_path]) - frame, frame_name = chunk_reader[frame_offset] + frame, frame_name, _ = next(itertools.islice(chunk_reader, frame_offset, None)) if reader_class is VideoReader: return (self._av_frame_to_png_bytes(frame), 'image/png') diff --git a/cvat/apps/engine/media_extractors.py b/cvat/apps/engine/media_extractors.py index a775365d15e1..778aec9a589f 100644 --- a/cvat/apps/engine/media_extractors.py +++ b/cvat/apps/engine/media_extractors.py @@ -6,8 +6,7 @@ import tempfile import shutil import zipfile -from io import BytesIO -import itertools +import io from abc import ABC, abstractmethod import av @@ -25,6 +24,13 @@ def get_mime(name): return 'unknown' +def create_tmp_dir(): + return tempfile.mkdtemp(prefix='cvat-', suffix='.data') + +def delete_tmp_dir(tmp_dir): + if tmp_dir: + shutil.rmtree(tmp_dir) + class IMediaReader(ABC): def __init__(self, source_path, step, start, stop): self._source_path = sorted(source_path) @@ -32,78 +38,74 @@ def __init__(self, source_path, step, start, stop): self._start = start self._stop = stop - @staticmethod - def create_tmp_dir(): - return tempfile.mkdtemp(prefix='cvat-', suffix='.data') - - @staticmethod - def delete_tmp_dir(tmp_dir): - if tmp_dir: - shutil.rmtree(tmp_dir) - @abstractmethod def __iter__(self): pass @abstractmethod - def __getitem__(self, k): + def get_preview(self): pass @abstractmethod - def save_preview(self, preview_path): + def get_progress(self, pos): pass - def slice_by_size(self, size): - # stopFrame should be included - it = itertools.islice(self, self._start, self._stop + 1 if self._stop else None) - frames = list(itertools.islice(it, 0, size * self._step, self._step)) - while frames: - yield frames - frames = list(itertools.islice(it, 0, size * self._step, self._step)) - @property - @abstractmethod - def image_names(self): - pass + @staticmethod + def _get_preview(obj): + if isinstance(obj, io.IOBase): + preview = Image.open(obj) + else: + preview = obj + preview.thumbnail((128, 128)) + + return preview.convert('RGB') @abstractmethod def get_image_size(self): pass -#Note step, start, stop have no affect class ImageListReader(IMediaReader): - def __init__(self, source_path, step=1, start=0, stop=0): + def __init__(self, source_path, step=1, start=0, stop=None): if not source_path: raise Exception('No image found') + + if stop is None: + stop = len(source_path) + else: + stop = min(len(source_path), stop + 1) + step = max(step, 1) + assert stop > start + super().__init__( source_path=source_path, - step=1, - start=0, - stop=0, + step=step, + start=start, + stop=stop, ) def __iter__(self): - return zip(self._source_path, self.image_names) + for i in range(self._start, self._stop, self._step): + yield (self.get_image(i), self.get_path(i), i) - def __getitem__(self, k): - return (self._source_path[k], self.image_names[k]) + def get_path(self, i): + return self._source_path[i] - def __len__(self): - return len(self._source_path) + def get_image(self, i): + return self._source_path[i] - def save_preview(self, preview_path): - shutil.copyfile(self._source_path[0], preview_path) + def get_progress(self, pos): + return (pos - self._start + 1) / (self._stop - self._start) - @property - def image_names(self): - return self._source_path + def get_preview(self): + fp = open(self._source_path[0], "rb") + return self._get_preview(fp) def get_image_size(self): img = Image.open(self._source_path[0]) return img.width, img.height -#Note step, start, stop have no affect class DirectoryReader(ImageListReader): - def __init__(self, source_path, step=1, start=0, stop=0): + def __init__(self, source_path, step=1, start=0, stop=None): image_paths = [] for source in source_path: for root, _, files in os.walk(source): @@ -112,41 +114,38 @@ def __init__(self, source_path, step=1, start=0, stop=0): image_paths.extend(paths) super().__init__( source_path=image_paths, - step=1, - start=0, - stop=0, + step=step, + start=start, + stop=stop, ) -#Note step, start, stop have no affect class ArchiveReader(DirectoryReader): - def __init__(self, source_path, step=1, start=0, stop=0): - self._tmp_dir = self.create_tmp_dir() + def __init__(self, source_path, step=1, start=0, stop=None): + self._tmp_dir = create_tmp_dir() self._archive_source = source_path[0] Archive(self._archive_source).extractall(self._tmp_dir) super().__init__( source_path=[self._tmp_dir], - step=1, - start=0, - stop=0, + step=step, + start=start, + stop=stop, ) def __del__(self): - if (self._tmp_dir): - self.delete_tmp_dir(self._tmp_dir) + delete_tmp_dir(self._tmp_dir) - @property - def image_names(self): - return [os.path.join(os.path.dirname(self._archive_source), os.path.relpath(p, self._tmp_dir)) for p in super().image_names] + def get_path(self, i): + base_dir = os.path.dirname(self._archive_source) + return os.path.join(base_dir, os.path.relpath(self._source_path[i], self._tmp_dir)) -#Note step, start, stop have no affect class PdfReader(DirectoryReader): - def __init__(self, source_path, step=1, start=0, stop=0): + def __init__(self, source_path, step=1, start=0, stop=None): if not source_path: raise Exception('No PDF found') from pdf2image import convert_from_path self._pdf_source = source_path[0] - self._tmp_dir = self.create_tmp_dir() + self._tmp_dir = create_tmp_dir() file_ = convert_from_path(self._pdf_source) basename = os.path.splitext(os.path.basename(self._pdf_source))[0] for page_num, page in enumerate(file_): @@ -155,96 +154,88 @@ def __init__(self, source_path, step=1, start=0, stop=0): super().__init__( source_path=[self._tmp_dir], - step=1, - start=0, - stop=0, + step=step, + start=start, + stop=stop, ) def __del__(self): - if (self._tmp_dir): - self.delete_tmp_dir(self._tmp_dir) + delete_tmp_dir(self._tmp_dir) - @property - def image_names(self): - return [os.path.join(os.path.dirname(self._pdf_source), os.path.relpath(p, self._tmp_dir)) for p in super().image_names] + def get_path(self, i): + base_dir = os.path.dirname(self._pdf_source) + return os.path.join(base_dir, os.path.relpath(self._source_path[i], self._tmp_dir)) -class ZipReader(IMediaReader): - def __init__(self, source_path, step=1, start=0, stop=0): +class ZipReader(ImageListReader): + def __init__(self, source_path, step=1, start=0, stop=None): self._zip_source = zipfile.ZipFile(source_path[0], mode='r') file_list = [f for f in self._zip_source.namelist() if get_mime(f) == 'image'] super().__init__(file_list, step, start, stop) - def __iter__(self): - for f in zip(self._source_path, self.image_names): - yield (BytesIO(self._zip_source.read(f[0])), f[1]) - - def __len__(self): - return len(self._source_path) - - def __getitem__(self, k): - return (BytesIO(self._zip_source.read(self._source_path[k])), self.image_names[k]) - def __del__(self): self._zip_source.close() - def save_preview(self, preview_path): - with open(preview_path, 'wb') as f: - f.write(self._zip_source.read(self._source_path[0])) + def get_preview(self): + io_image = io.BytesIO(self._zip_source.read(self._source_path[0])) + return self._get_preview(io_image) def get_image_size(self): - img = Image.open(BytesIO(self._zip_source.read(self._source_path[0]))) + img = Image.open(io.BytesIO(self._zip_source.read(self._source_path[0]))) return img.width, img.height - @property - def image_names(self): - return [os.path.join(os.path.dirname(self._zip_source.filename), p) for p in self._source_path] + def get_image(self, i): + return io.BytesIO(self._zip_source.read(self._source_path[i])) -class VideoReader(IMediaReader): - def __init__(self, source_path, step=1, start=0, stop=0): - self._output_fps = 25 + def get_path(self, i): + return os.path.join(os.path.dirname(self._zip_source.filename), self._source_path[i]) +class VideoReader(IMediaReader): + def __init__(self, source_path, step=1, start=0, stop=None): super().__init__( source_path=source_path, step=step, start=start, - stop=stop, + stop=stop + 1 if stop is not None else stop, ) - def __iter__(self): - def decode_frames(container): - for packet in container.demux(): - if packet.stream.type == 'video': - for frame in packet.decode(): - yield frame + def _has_frame(self, i): + if i >= self._start: + if (i - self._start) % self._step == 0: + if self._stop is None or i < self._stop: + return True + + return False + + def _decode(self, container): + frame_num = 0 + for packet in container.demux(): + if packet.stream.type == 'video': + for image in packet.decode(): + frame_num += 1 + if self._has_frame(frame_num - 1): + yield (image, self._source_path[0], image.pts) + def __iter__(self): container = self._get_av_container() source_video_stream = container.streams.video[0] source_video_stream.thread_type = 'AUTO' - image_names = self.image_names - return itertools.zip_longest(decode_frames(container), image_names, fillvalue=image_names[0]) + return self._decode(container) - def __len__(self): + def get_progress(self, pos): container = self._get_av_container() # Not for all containers return real value - length = container.streams.video[0].frames - return length - - def __getitem__(self, k): - return next(itertools.islice(self, k, k + 1)) + stream = container.streams.video[0] + return pos / stream.duration if stream.duration else None def _get_av_container(self): return av.open(av.datasets.curated(self._source_path[0])) - def save_preview(self, preview_path): + def get_preview(self): container = self._get_av_container() stream = container.streams.video[0] preview = next(container.decode(stream)) - preview.to_image().save(preview_path) - - @property - def image_names(self): - return self._source_path + return self._get_preview(preview.to_image()) def get_image_size(self): image = (next(iter(self)))[0] @@ -266,7 +257,7 @@ def _compress_image(image_path, quality): image = Image.fromarray(im_data.astype(np.int32)) converted_image = image.convert('RGB') image.close() - buf = BytesIO() + buf = io.BytesIO() converted_image.save(buf, format='JPEG', quality=quality, optimize=True) buf.seek(0) width, height = converted_image.size @@ -280,9 +271,9 @@ def save_as_chunk(self, images, chunk_path): class ZipChunkWriter(IChunkWriter): def save_as_chunk(self, images, chunk_path): with zipfile.ZipFile(chunk_path, 'x') as zip_chunk: - for idx, (image, image_name) in enumerate(images): - arcname = '{:06d}{}'.format(idx, os.path.splitext(image_name)[1]) - if isinstance(image, BytesIO): + for idx, (image, path, _) in enumerate(images): + arcname = '{:06d}{}'.format(idx, os.path.splitext(path)[1]) + if isinstance(image, io.BytesIO): zip_chunk.writestr(arcname, image.getvalue()) else: zip_chunk.write(filename=image, arcname=arcname) @@ -294,7 +285,7 @@ class ZipCompressedChunkWriter(IChunkWriter): def save_as_chunk(self, images, chunk_path): image_sizes = [] with zipfile.ZipFile(chunk_path, 'x') as zip_chunk: - for idx, (image, _) in enumerate(images): + for idx, (image, _ , _) in enumerate(images): w, h, image_buf = self._compress_image(image, self._image_quality) image_sizes.append((w, h)) arcname = '{:06d}.jpeg'.format(idx) @@ -344,7 +335,7 @@ def save_as_chunk(self, images, chunk_path): @staticmethod def _encode_images(images, container, stream): - for frame, _ in images: + for frame, _, _ in images: # let libav set the correct pts and time_base frame.pts = None frame.time_base = None diff --git a/cvat/apps/engine/migrations/0024_auto_20191023_1025.py b/cvat/apps/engine/migrations/0024_auto_20191023_1025.py index 7ec65bcf44a1..ef5adc27f4dd 100644 --- a/cvat/apps/engine/migrations/0024_auto_20191023_1025.py +++ b/cvat/apps/engine/migrations/0024_auto_20191023_1025.py @@ -68,14 +68,18 @@ def migrate_task_data(db_task_id, db_data_id, original_video, original_images, s original_chunk_writer = Mpeg4ChunkWriter(100) compressed_chunk_writer = ZipCompressedChunkWriter(image_quality) - for chunk_idx, chunk_images in enumerate(reader.slice_by_size(chunk_size)): + counter = itertools.count() + generator = itertools.groupby(reader, lambda x: next(counter) // chunk_size) + for chunk_idx, chunk_images in generator: + chunk_images = list(chunk_images) original_chunk_path = os.path.join(original_cache_dir, '{}.mp4'.format(chunk_idx)) original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path) compressed_chunk_path = os.path.join(compressed_cache_dir, '{}.zip'.format(chunk_idx)) compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path) - reader.save_preview(os.path.join(db_data_dir, 'preview.jpeg')) + preview = reader.get_preview() + preview.save(os.path.join(db_data_dir, 'preview.jpeg')) else: original_chunk_writer = ZipChunkWriter(100) for chunk_idx, chunk_image_ids in enumerate(slice_by_size(range(size), chunk_size)): @@ -131,14 +135,18 @@ def migrate_task_data(db_task_id, db_data_id, original_video, original_images, s original_chunk_writer = ZipChunkWriter(100) compressed_chunk_writer = ZipCompressedChunkWriter(image_quality) - for chunk_idx, chunk_images in enumerate(reader.slice_by_size(chunk_size)): + counter = itertools.count() + generator = itertools.groupby(reader, lambda x: next(counter) // chunk_size) + for chunk_idx, chunk_images in generator: + chunk_images = list(chunk_images) compressed_chunk_path = os.path.join(compressed_cache_dir, '{}.zip'.format(chunk_idx)) compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path) original_chunk_path = os.path.join(original_cache_dir, '{}.zip'.format(chunk_idx)) original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path) - reader.save_preview(os.path.join(db_data_dir, 'preview.jpeg')) + preview = reader.get_preview() + preview.save(os.path.join(db_data_dir, 'preview.jpeg')) shutil.rmtree(old_db_task_dir) return_dict[db_task_id] = (True, '') except Exception as e: diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index aba87f5c5631..baf9f81e4a99 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: MIT +import itertools import os import sys import rq @@ -237,15 +238,25 @@ def _create_thread(tid, data): source_path=[os.path.join(upload_dir, f) for f in media_files], step=db_data.get_frame_step(), start=db_data.start_frame, - stop=db_data.stop_frame, + stop=data['stop_frame'], ) db_task.mode = task_mode db_data.compressed_chunk_type = models.DataChoice.VIDEO if task_mode == 'interpolation' and not data['use_zip_chunks'] else models.DataChoice.IMAGESET db_data.original_chunk_type = models.DataChoice.VIDEO if task_mode == 'interpolation' else models.DataChoice.IMAGESET def update_progress(progress): - job.meta['status'] = 'Images are being compressed... {}%'.format(round(progress * 100)) + progress_animation = '|/-\\' + if not hasattr(update_progress, 'call_counter'): + update_progress.call_counter = 0 + + status_template = 'Images are being compressed {}' + if progress: + current_progress = '{}%'.format(round(progress * 100)) + else: + current_progress = '{}'.format(progress_animation[update_progress.call_counter]) + job.meta['status'] = status_template.format(current_progress) job.save_meta() + update_progress.call_counter = (update_progress.call_counter + 1) % len(progress_animation) compressed_chunk_writer_class = Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == DataChoice.VIDEO else ZipCompressedChunkWriter original_chunk_writer_class = Mpeg4ChunkWriter if db_data.original_chunk_type == DataChoice.VIDEO else ZipChunkWriter @@ -262,45 +273,52 @@ def update_progress(progress): else: db_data.chunk_size = 36 - frame_counter = 0 - total_len = len(extractor) or 100 - image_names = [] - image_sizes = [] - for chunk_idx, chunk_images in enumerate(extractor.slice_by_size(db_data.chunk_size)): - for img in chunk_images: - image_names.append(img[1]) + video_path = "" + video_size = (0, 0) + counter = itertools.count() + generator = itertools.groupby(extractor, lambda x: next(counter) // db_data.chunk_size) + for chunk_idx, chunk_data in generator: + chunk_data = list(chunk_data) original_chunk_path = db_data.get_original_chunk_path(chunk_idx) - original_chunk_writer.save_as_chunk(chunk_images, original_chunk_path) + original_chunk_writer.save_as_chunk(chunk_data, original_chunk_path) compressed_chunk_path = db_data.get_compressed_chunk_path(chunk_idx) - img_sizes = compressed_chunk_writer.save_as_chunk(chunk_images, compressed_chunk_path) - - image_sizes.extend(img_sizes) + img_sizes = compressed_chunk_writer.save_as_chunk(chunk_data, compressed_chunk_path) + + if db_task.mode == 'annotation': + db_images.extend([ + models.Image( + data=db_data, + path=os.path.relpath(data[1], upload_dir), + frame=data[2], + width=size[0], + height=size[1]) + + for data, size in zip(chunk_data, img_sizes) + ]) + else: + video_size = img_sizes[0] + video_path = chunk_data[0][1] - db_data.size += len(chunk_images) - update_progress(db_data.size / total_len) + db_data.size += len(chunk_data) + progress = extractor.get_progress(chunk_data[-1][2]) + update_progress(progress) if db_task.mode == 'annotation': - for image_name, image_size in zip(image_names, image_sizes): - db_images.append(models.Image( - data=db_data, - path=os.path.relpath(image_name, upload_dir), - frame=frame_counter, - width=image_size[0], - height=image_size[1], - )) - frame_counter += 1 models.Image.objects.bulk_create(db_images) + db_images = [] else: models.Video.objects.create( data=db_data, - path=os.path.relpath(image_names[0], upload_dir), - width=image_sizes[0][0], height=image_sizes[0][1]) - if db_data.stop_frame == 0: - db_data.stop_frame = db_data.start_frame + (db_data.size - 1) * db_data.get_frame_step() + path=os.path.relpath(video_path, upload_dir), + width=video_size[0], height=video_size[1]) + + if db_data.stop_frame == 0: + db_data.stop_frame = db_data.start_frame + (db_data.size - 1) * db_data.get_frame_step() - extractor.save_preview(db_data.get_preview_path()) + preview = extractor.get_preview() + preview.save(db_data.get_preview_path()) slogger.glob.info("Founded frames {} for Data #{}".format(db_data.size, db_data.id)) _save_task_to_db(db_task) diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index 1c2f7a87173d..eb327080f5f7 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -1641,7 +1641,7 @@ def _test_api_v1_tasks_id_data_spec(self, user, spec, data, expected_compressed_ self.assertEqual(response.status_code, expected_status_code) if expected_status_code == status.HTTP_200_OK: preview = Image.open(io.BytesIO(b"".join(response.streaming_content))) - self.assertEqual(preview.size, image_sizes[0]) + self.assertLessEqual(preview.size, image_sizes[0]) # check compressed chunk response = self._get_compressed_chunk(task_id, user, 0) diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index fb590b69382b..f1ac8e404c4a 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -415,6 +415,10 @@ def data(self, request, pk): db_task.save() data = {k:v for k, v in serializer.data.items()} data['use_zip_chunks'] = serializer.validated_data['use_zip_chunks'] + # if the value of stop_frame is 0, then inside the function we cannot know + # the value specified by the user or it's default value from the database + if 'stop_frame' not in serializer.validated_data: + data['stop_frame'] = None task.create(db_task.id, data) return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: