diff --git a/datumaro/plugins/icdar_format/converter.py b/datumaro/plugins/icdar_format/converter.py index c7aeee19ef..186d48d296 100644 --- a/datumaro/plugins/icdar_format/converter.py +++ b/datumaro/plugins/icdar_format/converter.py @@ -10,185 +10,118 @@ from datumaro.util.image import save_image from datumaro.util.mask_tools import paint_mask -from .format import IcdarPath, IcdarTask +from .format import IcdarPath -class _WordRecognitionConverter: - def __init__(self): - self.annotations = '' - - def save_annotations(self, item, path): - self.annotations += '%s, ' % (item.id + IcdarPath.IMAGE_EXT) - for ann in item.annotations: - if ann.type != AnnotationType.caption: - continue - self.annotations += '\"%s\"' % ann.caption - self.annotations += '\n' - - def write(self, path): - file = osp.join(path, 'gt.txt') - os.makedirs(osp.dirname(file), exist_ok=True) - with open(file, 'w') as f: - f.write(self.annotations) - - def is_empty(self): - return len(self.annotations) == 0 - -class _TextLocalizationConverter: - def __init__(self): - self.annotations = {} - - def save_annotations(self, item, path): - annotation = '' - for ann in item.annotations: - if ann.type == AnnotationType.bbox: - annotation += ' '.join(str(p) for p in ann.points) - if ann.attributes and 'text' in ann.attributes: - annotation += ' \"%s\"' % ann.attributes['text'] - elif ann.type == AnnotationType.polygon: - annotation += ','.join(str(p) for p in ann.points) - if ann.attributes and 'text' in ann.attributes: - annotation += ',\"%s\"' % ann.attributes['text'] - annotation += '\n' - self.annotations[item.id] = annotation - - def write(self, path): - os.makedirs(path, exist_ok=True) - for item in self.annotations: - file = osp.join(path, 'gt_' + item + '.txt') - with open(file, 'w') as f: - f.write(self.annotations[item]) - - def is_empty(self): - return len(self.annotations) == 0 - -class _TextSegmentationConverter: - def __init__(self): - self.annotations = {} +class IcdarWordRecognitionConverter(Converter): + DEFAULT_IMAGE_EXT = IcdarPath.IMAGE_EXT - def save_annotations(self, item, path): - annotation = '' - colormap = [(255, 255, 255)] - anns = [a for a in item.annotations - if a.type == AnnotationType.mask] - if anns: - is_not_index = len([p for p in anns if 'index' not in p.attributes]) - if is_not_index: - raise Exception("Item %s: a mask must have" - "'index' attribute" % item.id) - anns = sorted(anns, key=lambda a: a.attributes['index']) - group = anns[0].group - for ann in anns: - if ann.group != group or (not ann.group and anns[0].group != 0): - annotation += '\n' - text = '' - if ann.attributes: - if 'text' in ann.attributes: - text = ann.attributes['text'] - if text == ' ': - annotation += '#' - if 'color' in ann.attributes and \ - len(ann.attributes['color'].split()) == 3: - color = ann.attributes['color'].split() - colormap.append( - (int(color[0]), int(color[1]), int(color[2]))) - annotation += ' '.join(p for p in color) - else: - raise Exception("Item %s: a mask must have " - "an RGB color attribute, e. g. '10 7 50'" % item.id) - if 'center' in ann.attributes: - annotation += ' %s' % ann.attributes['center'] - else: - annotation += ' - -' - bbox = ann.get_bbox() - annotation += ' %s %s %s %s' % (bbox[0], bbox[1], - bbox[0] + bbox[2], bbox[1] + bbox[3]) - annotation += ' \"%s\"' % text + def apply(self): + for subset_name, subset in self._extractor.subsets().items(): + annotation = '' + for item in subset: + if item.has_image and self._save_images: + self._save_image(item, osp.join(self._save_dir, subset_name, + IcdarPath.IMAGES_DIR, item.id + IcdarPath.IMAGE_EXT)) + + annotation += '%s, ' % (item.id + IcdarPath.IMAGE_EXT) + for ann in item.annotations: + if ann.type != AnnotationType.caption: + continue + annotation += '\"%s\"' % ann.caption annotation += '\n' - group = ann.group - - mask = CompiledMask.from_instance_masks(anns, - instance_labels=[m.attributes['index'] + 1 for m in anns]) - mask = paint_mask(mask.class_mask, - { i: colormap[i] for i in range(len(colormap)) }) - save_image(osp.join(path, item.id + '_GT' + IcdarPath.GT_EXT), - mask, create_dir=True) - self.annotations[item.id] = annotation - - def write(self, path): - os.makedirs(path, exist_ok=True) - for item in self.annotations: - file = osp.join(path, item + '_GT' + '.txt') - with open(file, 'w') as f: - f.write(self.annotations[item]) + if len(annotation): + anno_file = osp.join(self._save_dir, subset_name, 'gt.txt') + os.makedirs(osp.dirname(anno_file), exist_ok=True) + with open(anno_file, 'w', encoding='utf-8') as f: + f.write(annotation) - def is_empty(self): - return len(self.annotations) == 0 - - -class IcdarConverter(Converter): +class IcdarTextLocalizationConverter(Converter): DEFAULT_IMAGE_EXT = IcdarPath.IMAGE_EXT - _TASK_CONVERTER = { - IcdarTask.word_recognition: _WordRecognitionConverter, - IcdarTask.text_localization: _TextLocalizationConverter, - IcdarTask.text_segmentation: _TextSegmentationConverter, - } - - def __init__(self, extractor, save_dir, tasks=None, **kwargs): - super().__init__(extractor, save_dir, **kwargs) - - assert tasks is None or isinstance(tasks, (IcdarTask, list, str)) - if isinstance(tasks, IcdarTask): - tasks = [tasks] - elif isinstance(tasks, str): - tasks = [IcdarTask[tasks]] - elif tasks: - for i, t in enumerate(tasks): - if isinstance(t, str): - tasks[i] = IcdarTask[t] - else: - assert t in IcdarTask, t - self._tasks = tasks - - def _make_task_converter(self, task): - if task not in self._TASK_CONVERTER: - raise NotImplementedError() - return self._TASK_CONVERTER[task]() - - def _make_task_converters(self): - return { task: self._make_task_converter(task) - for task in (self._tasks or self._TASK_CONVERTER) } - def apply(self): for subset_name, subset in self._extractor.subsets().items(): - task_converters = self._make_task_converters() for item in subset: - for task, task_conv in task_converters.items(): - if item.has_image and self._save_images: - self._save_image(item, osp.join( - self._save_dir, subset_name, IcdarPath.IMAGES_DIR, - item.id + IcdarPath.IMAGE_EXT)) - task_conv.save_annotations(item, osp.join(self._save_dir, - IcdarPath.TASK_DIR[task], subset_name)) - - for task, task_conv in task_converters.items(): - if task_conv.is_empty() and not self._tasks: - continue - task_conv.write(osp.join(self._save_dir, - IcdarPath.TASK_DIR[task], subset_name)) - -class IcdarWordRecognitionConverter(IcdarConverter): - def __init__(self, *args, **kwargs): - kwargs['tasks'] = IcdarTask.word_recognition - super().__init__(*args, **kwargs) + if item.has_image and self._save_images: + self._save_image(item, osp.join(self._save_dir, subset_name, + IcdarPath.IMAGES_DIR, item.id + IcdarPath.IMAGE_EXT)) + + annotation = '' + for ann in item.annotations: + if ann.type == AnnotationType.bbox: + annotation += ' '.join(str(p) for p in ann.points) + if ann.attributes and 'text' in ann.attributes: + annotation += ' \"%s\"' % ann.attributes['text'] + elif ann.type == AnnotationType.polygon: + annotation += ','.join(str(p) for p in ann.points) + if ann.attributes and 'text' in ann.attributes: + annotation += ',\"%s\"' % ann.attributes['text'] + annotation += '\n' + anno_file = osp.join(self._save_dir, subset_name, osp.dirname(item.id), + 'gt_' + osp.basename(item.id) + '.txt') + os.makedirs(osp.dirname(anno_file), exist_ok=True) + with open(anno_file, 'w', encoding='utf-8') as f: + f.write(annotation) -class IcdarTextLocalizationConverter(IcdarConverter): - def __init__(self, *args, **kwargs): - kwargs['tasks'] = IcdarTask.text_localization - super().__init__(*args, **kwargs) +class IcdarTextSegmentationConverter(Converter): + DEFAULT_IMAGE_EXT = IcdarPath.IMAGE_EXT -class IcdarTextSegmentationConverter(IcdarConverter): - def __init__(self, *args, **kwargs): - kwargs['tasks'] = IcdarTask.text_segmentation - super().__init__(*args, **kwargs) + def apply(self): + for subset_name, subset in self._extractor.subsets().items(): + for item in subset: + if item.has_image and self._save_images: + self._save_image(item, osp.join(self._save_dir, subset_name, + IcdarPath.IMAGES_DIR, item.id + IcdarPath.IMAGE_EXT)) + + annotation = '' + colormap = [(255, 255, 255)] + anns = [a for a in item.annotations + if a.type == AnnotationType.mask] + if anns: + is_not_index = len([p for p in anns if 'index' not in p.attributes]) + if is_not_index: + raise Exception("Item %s: a mask must have" + "'index' attribute" % item.id) + anns = sorted(anns, key=lambda a: a.attributes['index']) + group = anns[0].group + for ann in anns: + if ann.group != group or (not ann.group and anns[0].group != 0): + annotation += '\n' + text = '' + if ann.attributes: + if 'text' in ann.attributes: + text = ann.attributes['text'] + if text == ' ': + annotation += '#' + if 'color' in ann.attributes and \ + len(ann.attributes['color'].split()) == 3: + color = ann.attributes['color'].split() + colormap.append( + (int(color[0]), int(color[1]), int(color[2]))) + annotation += ' '.join(p for p in color) + else: + raise Exception("Item %s: a mask must have " + "an RGB color attribute, e. g. '10 7 50'" % item.id) + if 'center' in ann.attributes: + annotation += ' %s' % ann.attributes['center'] + else: + annotation += ' - -' + bbox = ann.get_bbox() + annotation += ' %s %s %s %s' % (bbox[0], bbox[1], + bbox[0] + bbox[2], bbox[1] + bbox[3]) + annotation += ' \"%s\"' % text + annotation += '\n' + group = ann.group + + mask = CompiledMask.from_instance_masks(anns, + instance_labels=[m.attributes['index'] + 1 for m in anns]) + mask = paint_mask(mask.class_mask, + { i: colormap[i] for i in range(len(colormap)) }) + save_image(osp.join(self._save_dir, subset_name, + item.id + '_GT' + IcdarPath.GT_EXT), mask, create_dir=True) + + anno_file = osp.join(self._save_dir, subset_name, + item.id + '_GT' + '.txt') + os.makedirs(osp.dirname(anno_file), exist_ok=True) + with open(anno_file, 'w', encoding='utf-8') as f: + f.write(annotation) \ No newline at end of file diff --git a/datumaro/plugins/icdar_format/extractor.py b/datumaro/plugins/icdar_format/extractor.py index 41a4854f58..3b304a9670 100644 --- a/datumaro/plugins/icdar_format/extractor.py +++ b/datumaro/plugins/icdar_format/extractor.py @@ -38,7 +38,6 @@ def __init__(self, path, task): else: self._items = list(self._load_segmentation_items().values()) - def _load_recognition_items(self): items = {} with open(self._path, encoding='utf-8') as f: @@ -69,10 +68,11 @@ def _load_recognition_items(self): def _load_localization_items(self): items = {} - for path in glob(osp.join(self._path, '*.txt')): - item_id = osp.splitext(osp.basename(path))[0] - if item_id.startswith('gt_'): - item_id = item_id[3:] + for path in glob(osp.join(self._path, '**', '*.txt'), recursive=True): + item_id = osp.splitext(osp.relpath(path, self._path))[0] + if osp.basename(item_id).startswith('gt_'): + item_id = osp.join(osp.dirname(item_id), osp.basename(item_id)[3:]) + item_id = item_id.replace('\\', '/') image_path = osp.join(self._path, IcdarPath.IMAGES_DIR, item_id + IcdarPath.IMAGE_EXT) if item_id not in items: @@ -115,8 +115,9 @@ def _load_localization_items(self): def _load_segmentation_items(self): items = {} - for path in glob(osp.join(self._path, '*.txt')): - item_id = osp.splitext(osp.basename(path))[0] + for path in glob(osp.join(self._path, '**', '*.txt'), recursive=True): + item_id = osp.splitext(osp.relpath(path, self._path))[0] + item_id = item_id.replace('\\', '/') if item_id.endswith('_GT'): item_id = item_id[:-3] image_path = osp.join(self._path, IcdarPath.IMAGES_DIR, @@ -203,30 +204,18 @@ def __init__(self, path, **kwargs): kwargs['task'] = IcdarTask.text_segmentation super().__init__(path, **kwargs) -class IcdarImporter(Importer): - _TASKS = [ - (IcdarTask.word_recognition, 'icdar_word_recognition', 'word_recognition'), - (IcdarTask.text_localization, 'icdar_text_localization', 'text_localization'), - (IcdarTask.text_segmentation, 'icdar_text_segmentation', 'text_segmentation'), - ] +class IcdarWordRecognitionImporter(Importer): + @classmethod + def find_sources(cls, path): + return cls._find_sources_recursive(path, '.txt', 'icdar_word_recognition') + +class IcdarTextLocalizationImporter(Importer): + @classmethod + def find_sources(cls, path): + return cls._find_sources_recursive(path, '', 'icdar_text_localization') + +class IcdarTextSegmentationImporter(Importer): @classmethod def find_sources(cls, path): - sources = [] - paths = [path] - if osp.basename(path) not in IcdarPath.TASK_DIR.values(): - paths = [p for p in glob(osp.join(path, '**')) - if osp.basename(p) in IcdarPath.TASK_DIR.values()] - for path in paths: - for task, extractor_type, task_dir in cls._TASKS: - if not osp.isdir(path) or osp.basename(path) != task_dir: - continue - if task is IcdarTask.word_recognition: - ext = '.txt' - elif task is IcdarTask.text_localization or \ - task is IcdarTask.text_segmentation: - ext = '' - sources += cls._find_sources_recursive(path, ext, - extractor_type, file_filter=lambda p: - osp.basename(p) != IcdarPath.VOCABULARY_FILE) - return sources + return cls._find_sources_recursive(path, '', 'icdar_text_segmentation') diff --git a/datumaro/plugins/icdar_format/format.py b/datumaro/plugins/icdar_format/format.py index 00f9493691..fb52a83eaf 100644 --- a/datumaro/plugins/icdar_format/format.py +++ b/datumaro/plugins/icdar_format/format.py @@ -15,10 +15,3 @@ class IcdarPath: IMAGE_EXT = '.png' GT_EXT = '.bmp' IMAGES_DIR = 'images' - VOCABULARY_FILE = 'vocabulary.txt' - - TASK_DIR = { - IcdarTask.word_recognition: 'word_recognition', - IcdarTask.text_localization: 'text_localization', - IcdarTask.text_segmentation: 'text_segmentation', - } diff --git a/tests/test_icdar_format.py b/tests/test_icdar_format.py index 69a4c89109..fa7150c2f5 100644 --- a/tests/test_icdar_format.py +++ b/tests/test_icdar_format.py @@ -1,4 +1,5 @@ import os.path as osp +from functools import partial from unittest import TestCase import numpy as np @@ -8,7 +9,7 @@ from datumaro.plugins.icdar_format.converter import ( IcdarTextLocalizationConverter, IcdarTextSegmentationConverter, IcdarWordRecognitionConverter) -from datumaro.plugins.icdar_format.extractor import IcdarImporter +from datumaro.plugins.icdar_format.extractor import IcdarWordRecognitionImporter from datumaro.util.test_utils import (TestDir, compare_datasets, test_save_and_load) @@ -17,7 +18,7 @@ class IcdarImporterTest(TestCase): def test_can_detect(self): - self.assertTrue(IcdarImporter.detect( + self.assertTrue(IcdarWordRecognitionImporter.detect( osp.join(DUMMY_DATASET_DIR, 'word_recognition'))) def test_can_import_captions(self): @@ -37,7 +38,8 @@ def test_can_import_captions(self): ]) dataset = Dataset.import_from( - osp.join(DUMMY_DATASET_DIR, 'word_recognition'), 'icdar') + osp.join(DUMMY_DATASET_DIR, 'word_recognition'), + 'icdar_word_recognition') compare_datasets(self, expected_dataset, dataset) @@ -60,7 +62,8 @@ def test_can_import_bboxes(self): ]) dataset = Dataset.import_from( - osp.join(DUMMY_DATASET_DIR, 'text_localization'), 'icdar') + osp.join(DUMMY_DATASET_DIR, 'text_localization'), + 'icdar_text_localization') compare_datasets(self, expected_dataset, dataset) @@ -89,48 +92,50 @@ def test_can_import_masks(self): ]) dataset = Dataset.import_from( - osp.join(DUMMY_DATASET_DIR, 'text_segmentation'), 'icdar') + osp.join(DUMMY_DATASET_DIR, 'text_segmentation'), + 'icdar_text_segmentation') compare_datasets(self, expected_dataset, dataset) class IcdarConverterTest(TestCase): def _test_save_and_load(self, source_dataset, converter, test_dir, - target_dataset=None, importer_args=None): + importer, target_dataset=None, importer_args=None, **kwargs): return test_save_and_load(self, source_dataset, converter, test_dir, - importer='icdar', - target_dataset=target_dataset, importer_args=importer_args) + importer, target_dataset=target_dataset, importer_args=importer_args, + **kwargs) def test_can_save_and_load_captions(self): expected_dataset = Dataset.from_iterable([ - DatasetItem(id=1, subset='train', - annotations=[ + DatasetItem(id='a/b/1', subset='train', + image=np.ones((10, 15, 3)), annotations=[ Caption('caption_0'), ]), DatasetItem(id=2, subset='train', - annotations=[ + image=np.ones((10, 15, 3)), annotations=[ Caption('caption_1'), ]), ]) with TestDir() as test_dir: self._test_save_and_load(expected_dataset, - IcdarWordRecognitionConverter.convert, test_dir) + partial(IcdarWordRecognitionConverter.convert, save_images=True), + test_dir, 'icdar_word_recognition') def test_can_save_and_load_bboxes(self): expected_dataset = Dataset.from_iterable([ - DatasetItem(id=1, subset='train', - annotations=[ + DatasetItem(id='a/b/1', subset='train', + image=np.ones((10, 15, 3)), annotations=[ Bbox(1, 3, 6, 10), Bbox(0, 1, 3, 5, attributes={'text': 'word_0'}), ]), DatasetItem(id=2, subset='train', - annotations=[ + image=np.ones((10, 15, 3)), annotations=[ Polygon([0, 0, 3, 0, 4, 7, 1, 8], attributes={'text': 'word_1'}), Polygon([1, 2, 5, 3, 6, 8, 0, 7]), ]), DatasetItem(id=3, subset='train', - annotations=[ + image=np.ones((10, 15, 3)), annotations=[ Polygon([2, 2, 8, 3, 7, 10, 2, 9], attributes={'text': 'word_2'}), Bbox(0, 2, 5, 9, attributes={'text': 'word_3'}), @@ -139,12 +144,13 @@ def test_can_save_and_load_bboxes(self): with TestDir() as test_dir: self._test_save_and_load(expected_dataset, - IcdarTextLocalizationConverter.convert, test_dir) + partial(IcdarTextLocalizationConverter.convert, save_images=True), + test_dir, 'icdar_text_localization') def test_can_save_and_load_masks(self): expected_dataset = Dataset.from_iterable([ - DatasetItem(id=1, subset='train', - annotations=[ + DatasetItem(id='a/b/1', subset='train', + image=np.ones((10, 15, 3)), annotations=[ Mask(image=np.array([[0, 0, 0, 1, 1]]), group=1, attributes={ 'index': 1, 'color': '82 174 214', 'text': 'j', 'center': '0 3' }), @@ -153,7 +159,7 @@ def test_can_save_and_load_masks(self): 'center': '0 1' }), ]), DatasetItem(id=2, subset='train', - annotations=[ + image=np.ones((10, 15, 3)), annotations=[ Mask(image=np.array([[0, 0, 0, 0, 0, 1]]), group=0, attributes={ 'index': 3, 'color': '183 6 28', 'text': ' ', 'center': '0 5' }), @@ -171,7 +177,8 @@ def test_can_save_and_load_masks(self): with TestDir() as test_dir: self._test_save_and_load(expected_dataset, - IcdarTextSegmentationConverter.convert, test_dir) + partial(IcdarTextSegmentationConverter.convert, save_images=True), + test_dir, 'icdar_text_segmentation') def test_can_save_and_load_with_no_subsets(self): expected_dataset = Dataset.from_iterable([ @@ -183,16 +190,21 @@ def test_can_save_and_load_with_no_subsets(self): with TestDir() as test_dir: self._test_save_and_load(expected_dataset, - IcdarTextLocalizationConverter.convert, test_dir) + IcdarTextLocalizationConverter.convert, test_dir, + 'icdar_text_localization') def test_can_save_dataset_with_cyrillic_and_spaces_in_filename(self): expected_dataset = Dataset.from_iterable([ - DatasetItem(id='кириллица с пробелом', image=np.ones((8, 8, 3)), - annotations=[ - Bbox(0, 1, 3, 5), - ]), + DatasetItem(id='кириллица с пробелом', + image=np.ones((8, 8, 3))), ]) - with TestDir() as test_dir: - self._test_save_and_load(expected_dataset, - IcdarTextLocalizationConverter.convert, test_dir) + for importer, converter in [ + ('icdar_word_recognition', IcdarWordRecognitionConverter), + ('icdar_text_localization', IcdarTextLocalizationConverter), + ('icdar_text_segmentation', IcdarTextSegmentationConverter), + ]: + with self.subTest(subformat=converter), TestDir() as test_dir: + self._test_save_and_load(expected_dataset, + partial(converter.convert, save_images=True), + test_dir, importer, require_images=True)