diff --git a/datumaro/components/errors.py b/datumaro/components/errors.py index 20d821fe12..e5c381f394 100644 --- a/datumaro/components/errors.py +++ b/datumaro/components/errors.py @@ -2,9 +2,11 @@ # # SPDX-License-Identifier: MIT -from typing import Any, Tuple +from typing import Any, Optional, Tuple -from attrs import define, field +from attrs import define, field, validators + +from datumaro.util.attrs_util import has_length, not_empty class ImmutableObjectError(Exception): @@ -197,21 +199,75 @@ class DatasetImportError(DatumaroError): pass +class InvalidAnnotationError(DatasetImportError): + """ + A basic dataset parsing error. Should include the problem description in + the message. + """ + + +@define(auto_exc=False) +class InvalidFieldError(InvalidAnnotationError): + name: str = field(validator=[validators.instance_of(str), not_empty]) + """Field name""" + + def __str__(self) -> str: + return f"Invalid annotation field '{self.name}' value" + + +@define(auto_exc=False) +class InvalidFieldTypeError(InvalidFieldError): + actual: str = field(validator=[validators.instance_of(str), not_empty]) + """Actual type of the field""" + + expected: Tuple[str] = field(validator=[validators.instance_of(tuple), not_empty]) + """The list of expected types of the field""" + + def __str__(self) -> str: + if len(self.expected) == 1: + expected = self.expected[0] + else: + expected = "one of " + ", ".join(self.expected) + return f"Invalid annotation field '{self.name}' type '{self.actual}'. Expected '{expected}'" + + +@define(auto_exc=False) +class MissingFieldError(InvalidFieldError): + def __str__(self) -> str: + return f"Missing annotation field '{self.name}'" + + +@define(auto_exc=False) +class UndeclaredLabelError(InvalidAnnotationError): + id: str = field(validator=validators.instance_of(str)) + """Index or name""" + + def __str__(self) -> str: + return f"Undeclared label '{self.id}'" + + @define(auto_exc=False) class ItemImportError(DatasetImportError): """ - Represents additional item error info. The error itself is supposed to be - in the `__cause__` member. + Wraps a dataset parsing error and provides additional error context info. + The error itself is supposed to be in the `__cause__` member. """ - item_id: Tuple[str, str] + item_id: Tuple[Optional[str], Optional[str]] = field( + validator=[validators.instance_of(tuple), has_length(2)] + ) + """ + (id, subset) of the item with problem. + If id or subset cannot be reported, such field is set to None. + """ def __str__(self): - return "Failed to import item %s" % (self.item_id,) + return f"Failed to import item {self.item_id}" class AnnotationImportError(ItemImportError): - pass + def __str__(self): + return f"Failed to import item {self.item_id} annotation" @define(auto_exc=False) diff --git a/datumaro/plugins/coco_format/extractor.py b/datumaro/plugins/coco_format/extractor.py index 26270ad7a7..40e552e0f9 100644 --- a/datumaro/plugins/coco_format/extractor.py +++ b/datumaro/plugins/coco_format/extractor.py @@ -2,9 +2,9 @@ # # SPDX-License-Identifier: MIT -import logging as log import os.path as osp -from typing import Any +from inspect import isclass +from typing import Any, Dict, Tuple, Type, TypeVar, Union, overload import pycocotools.mask as mask_utils from attrs import define @@ -22,15 +22,24 @@ Polygon, RleMask, ) +from datumaro.components.errors import ( + DatasetImportError, + InvalidAnnotationError, + InvalidFieldTypeError, + MissingFieldError, + UndeclaredLabelError, +) from datumaro.components.extractor import DEFAULT_SUBSET_NAME, DatasetItem, SourceExtractor from datumaro.components.media import Image -from datumaro.util import parse_json_file, take_by +from datumaro.util import NOTSET, parse_json_file, take_by from datumaro.util.image import lazy_image, load_image from datumaro.util.mask_tools import bgr2index from datumaro.util.meta_file_util import has_meta_file, parse_meta_file from .format import CocoPath, CocoTask +T = TypeVar("T") + class _CocoExtractor(SourceExtractor): """ @@ -48,7 +57,9 @@ def __init__( keep_original_category_ids=False, **kwargs, ): - assert osp.isfile(path), path + if not osp.isfile(path): + raise DatasetImportError(f"Can't find JSON file at '{path}'") + self._path = path if not subset: parts = osp.splitext(osp.basename(path))[0].split(task.name + "_", maxsplit=1) @@ -101,19 +112,31 @@ def _load_categories(self, json_data, *, keep_original_ids): CocoTask.panoptic, ]: self._load_label_categories( - json_data["categories"], + self._parse_field(json_data, "categories", list), keep_original_ids=keep_original_ids, ) if self._task == CocoTask.person_keypoints: - self._load_person_kp_categories(json_data["categories"]) + self._load_person_kp_categories(self._parse_field(json_data, "categories", list)) def _load_label_categories(self, json_cat, *, keep_original_ids): categories = LabelCategories() label_map = {} + cats = sorted( + ( + { + "id": self._parse_field(c, "id", int), + "name": self._parse_field(c, "name", str), + "supercategory": c.get("supercategory"), + } + for c in json_cat + ), + key=lambda cat: cat["id"], + ) + if keep_original_ids: - for cat in sorted(json_cat, key=lambda cat: cat["id"]): + for cat in cats: label_map[cat["id"]] = cat["id"] while len(categories) < cat["id"]: @@ -121,7 +144,7 @@ def _load_label_categories(self, json_cat, *, keep_original_ids): categories.add(cat["name"], parent=cat.get("supercategory")) else: - for idx, cat in enumerate(sorted(json_cat, key=lambda cat: cat["id"])): + for idx, cat in enumerate(cats): label_map[cat["id"]] = idx categories.add(cat["name"], parent=cat.get("supercategory")) @@ -131,8 +154,12 @@ def _load_label_categories(self, json_cat, *, keep_original_ids): def _load_person_kp_categories(self, json_cat): categories = PointsCategories() for cat in json_cat: - label_id = self._label_map[cat["id"]] - categories.add(label_id, labels=cat["keypoints"], joints=cat["skeleton"]) + label_id = self._label_map[self._parse_field(cat, "id", int)] + categories.add( + label_id, + labels=self._parse_field(cat, "keypoints", list), + joints=self._parse_field(cat, "skeleton", list), + ) self._categories[AnnotationType.points] = categories @@ -140,25 +167,28 @@ def _load_items(self, json_data): pbars = self._ctx.progress_reporter.split(2) items = {} img_infos = {} - for img_info in pbars[0].iter(json_data["images"], desc="Parsing image info"): + for img_info in pbars[0].iter( + self._parse_field(json_data, "images", list), + desc=f"Parsing image info in '{osp.basename(self._path)}'", + ): + img_id = None try: - img_id = img_info.get("id") - if not isinstance(img_id, int): - raise ValueError("Invalid image id value '%s'" % img_id) - + img_id = self._parse_field(img_info, "id", int) img_infos[img_id] = img_info if img_info.get("height") and img_info.get("width"): - image_size = (img_info["height"], img_info["width"]) + image_size = ( + self._parse_field(img_info, "height", int), + self._parse_field(img_info, "width", int), + ) else: image_size = None + file_name = self._parse_field(img_info, "file_name", str) items[img_id] = DatasetItem( - id=osp.splitext(img_info["file_name"])[0], + id=osp.splitext(file_name)[0], subset=self._subset, - media=Image( - path=osp.join(self._images_dir, img_info["file_name"]), size=image_size - ), + media=Image(path=osp.join(self._images_dir, file_name), size=image_size), annotations=[], attributes={"id": img_id}, ) @@ -166,11 +196,15 @@ def _load_items(self, json_data): self._ctx.error_policy.report_item_error(e, item_id=(img_id, self._subset)) if self._task is not CocoTask.panoptic: - for ann in pbars[1].iter(json_data["annotations"], desc="Parsing annotations"): + for ann in pbars[1].iter( + self._parse_field(json_data, "annotations", list), + desc=f"Parsing annotations in '{osp.basename(self._path)}'", + ): + img_id = None try: - img_id = ann.get("image_id") - if not isinstance(img_id, int): - raise ValueError("Invalid image id value '%s'" % img_id) + img_id = self._parse_field(ann, "image_id", int) + if img_id not in img_infos: + raise InvalidAnnotationError(f"Unknown image id '{img_id}'") self._load_annotations( ann, img_infos[img_id], parsed_annotations=items[img_id].annotations @@ -180,11 +214,15 @@ def _load_items(self, json_data): e, item_id=(img_id, self._subset) ) else: - for ann in pbars[1].iter(json_data["annotations"], desc="Parsing annotations"): + for ann in pbars[1].iter( + self._parse_field(json_data, "annotations", list), + desc=f"Parsing annotations in '{osp.basename(self._path)}'", + ): + img_id = None try: - img_id = ann.get("image_id") - if not isinstance(img_id, int): - raise ValueError("Invalid image id value '%s'" % img_id) + img_id = self._parse_field(ann, "image_id", int) + if img_id not in img_infos: + raise InvalidAnnotationError(f"Unknown image id '{img_id}'") self._load_panoptic_ann(ann, items[img_id].annotations) except Exception as e: @@ -200,13 +238,13 @@ def _load_panoptic_ann(self, ann, parsed_annotations=None): # For the panoptic task, each annotation struct is a per-image # annotation rather than a per-object annotation. - mask_path = osp.join(self._mask_dir, ann["file_name"]) + mask_path = osp.join(self._mask_dir, self._parse_field(ann, "file_name", str)) mask = lazy_image(mask_path, loader=self._load_pan_mask) mask = CompiledMask(instance_mask=mask) - for segm_info in ann["segments_info"]: + for segm_info in self._parse_field(ann, "segments_info", list): cat_id = self._get_label_id(segm_info) - segm_id = segm_info["id"] - attributes = {"is_crowd": bool(segm_info["iscrowd"])} + segm_id = self._parse_field(segm_info, "id", int) + attributes = {"is_crowd": bool(self._parse_field(segm_info, "iscrowd", int))} parsed_annotations.append( Mask( image=mask.lazy_extract(segm_id), @@ -236,19 +274,45 @@ def __call__(self): return mask_utils.merge(rles) def _get_label_id(self, ann): - if not ann["category_id"]: + cat_id = self._parse_field(ann, "category_id", int) + if not cat_id: return None - return self._label_map[ann["category_id"]] + + label_id = self._label_map.get(cat_id) + if label_id is None: + raise UndeclaredLabelError(str(cat_id)) + return label_id + + @overload + def _parse_field(self, ann: Dict[str, Any], key: str, cls: Type[T]) -> T: + ... + + @overload + def _parse_field(self, ann: Dict[str, Any], key: str, cls: Tuple[Type, ...]) -> Any: + ... + + def _parse_field( + self, ann: Dict[str, Any], key: str, cls: Union[Type[T], Tuple[Type, ...]] + ) -> Any: + value = ann.get(key, NOTSET) + if value is NOTSET: + raise MissingFieldError(key) + elif not isinstance(value, cls): + cls = (cls,) if isclass(cls) else cls + raise InvalidFieldTypeError( + key, actual=str(type(value)), expected=tuple(str(t) for t in cls) + ) + return value def _load_annotations(self, ann, image_info=None, parsed_annotations=None): if parsed_annotations is None: parsed_annotations = [] - ann_id = ann["id"] + ann_id = self._parse_field(ann, "id", int) attributes = ann.get("attributes", {}) if "score" in ann: - attributes["score"] = ann["score"] + attributes["score"] = self._parse_field(ann, "score", (int, float)) group = ann_id # make sure all tasks' annotations are merged @@ -259,10 +323,16 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): ): label_id = self._get_label_id(ann) - attributes["is_crowd"] = bool(ann["iscrowd"]) + attributes["is_crowd"] = bool(self._parse_field(ann, "iscrowd", int)) if self._task is CocoTask.person_keypoints: - keypoints = ann["keypoints"] + keypoints = self._parse_field(ann, "keypoints", list) + if len(keypoints) % 3 != 0: + raise InvalidAnnotationError( + f"Keypoints have invalid value count {len(keypoints)}, " + "which is not divisible by 3. Expected (x, y, visibility) triplets." + ) + points = [] visibility = [] for x, y, v in take_by(keypoints, 3): @@ -281,7 +351,7 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): ) ) - segmentation = ann["segmentation"] + segmentation = self._parse_field(ann, "segmentation", (list, dict)) if segmentation and segmentation != [[]]: rle = None @@ -289,6 +359,17 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): if not self._merge_instance_polygons: # polygon - a single object can consist of multiple parts for polygon_points in segmentation: + if len(polygon_points) % 2 != 0: + raise InvalidAnnotationError( + f"Polygon has invalid value count {len(polygon_points)}, " + "which is not divisible by 2." + ) + elif len(polygon_points) < 6: + raise InvalidAnnotationError( + f"Polygon has invalid value count {len(polygon_points)}. " + "Expected at least 3 (x, y) pairs." + ) + parsed_annotations.append( Polygon( points=polygon_points, @@ -300,26 +381,27 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): ) else: # merge all parts into a single mask RLE - rle = self._lazy_merged_mask( - segmentation, image_info["height"], image_info["width"] - ) + img_h = self._parse_field(image_info, "height", int) + img_w = self._parse_field(image_info, "width", int) + rle = self._lazy_merged_mask(segmentation, img_h, img_w) elif isinstance(segmentation["counts"], list): # uncompressed RLE - img_h = image_info["height"] - img_w = image_info["width"] - mask_h, mask_w = segmentation["size"] - if img_h == mask_h and img_w == mask_w: - rle = self._lazy_merged_mask([segmentation], mask_h, mask_w) - else: - log.warning( - "item #%s: mask #%s " - "does not match image size: %s vs. %s. " - "Skipping this annotation.", - image_info["id"], - ann_id, - (mask_h, mask_w), - (img_h, img_w), + img_h = self._parse_field(image_info, "height", int) + img_w = self._parse_field(image_info, "width", int) + + mask_size = self._parse_field(segmentation, "size", list) + if len(mask_size) != 2: + raise InvalidAnnotationError( + f"Mask size has wrong value count {len(mask_size)}. Expected 2 values." + ) + mask_h, mask_w = mask_size + + if not ((img_h == mask_h) and (img_w == mask_w)): + raise InvalidAnnotationError( + "Mask #%s does not match image size: %s vs. %s" + % (ann_id, (mask_h, mask_w), (img_h, img_w)) ) + rle = self._lazy_merged_mask([segmentation], mask_h, mask_w) else: # compressed RLE rle = segmentation @@ -331,7 +413,13 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): ) ) else: - x, y, w, h = ann["bbox"] + bbox = self._parse_field(ann, "bbox", list) + if len(bbox) != 4: + raise InvalidAnnotationError( + f"Bbox has wrong value count {len(bbox)}. Expected 4 values." + ) + + x, y, w, h = bbox parsed_annotations.append( Bbox(x, y, w, h, label=label_id, id=ann_id, attributes=attributes, group=group) ) @@ -341,7 +429,7 @@ def _load_annotations(self, ann, image_info=None, parsed_annotations=None): Label(label=label_id, id=ann_id, attributes=attributes, group=group) ) elif self._task is CocoTask.captions: - caption = ann["caption"] + caption = self._parse_field(ann, "caption", str) parsed_annotations.append( Caption(caption, id=ann_id, attributes=attributes, group=group) ) diff --git a/datumaro/plugins/kitti_raw_format/extractor.py b/datumaro/plugins/kitti_raw_format/extractor.py index a62be48a24..fb4169191b 100644 --- a/datumaro/plugins/kitti_raw_format/extractor.py +++ b/datumaro/plugins/kitti_raw_format/extractor.py @@ -132,7 +132,7 @@ def _parse(cls, path): attr = None if track is not None or shape is not None or attr is not None: - raise Exception("Failed to parse anotations from '%s'" % path) + raise Exception("Failed to parse annotations from '%s'" % path) special_attrs = KittiRawPath.SPECIAL_ATTRS common_attrs = ["occluded"] diff --git a/datumaro/util/attrs_util.py b/datumaro/util/attrs_util.py index 7f73c7e219..3415d99365 100644 --- a/datumaro/util/attrs_util.py +++ b/datumaro/util/attrs_util.py @@ -11,8 +11,15 @@ def not_empty(inst, attribute, x): assert len(x) != 0, x +def has_length(n): + def _validator(inst, attribute, x): + assert len(x) != 0, x + + return _validator + + def default_if_none(conv): - def validator(inst, attribute, value): + def _validator(inst, attribute, value): default = attribute.default if value is None: if callable(default): @@ -32,14 +39,14 @@ def validator(inst, attribute, value): value = conv(value) setattr(inst, attribute.name, value) - return validator + return _validator def ensure_cls(c): - def converter(arg): + def _converter(arg): if isinstance(arg, c): return arg else: return c(**arg) - return converter + return _converter diff --git a/tests/test_coco_format.py b/tests/test_coco_format.py index 03fedab892..8f60b6cb67 100644 --- a/tests/test_coco_format.py +++ b/tests/test_coco_format.py @@ -1,6 +1,7 @@ import os import os.path as osp import pickle # nosec - disable B403:import_pickle check +from copy import deepcopy from functools import partial from itertools import product from unittest import TestCase @@ -20,6 +21,15 @@ ) from datumaro.components.dataset import Dataset from datumaro.components.environment import Environment +from datumaro.components.errors import ( + AnnotationImportError, + DatasetImportError, + InvalidAnnotationError, + InvalidFieldTypeError, + ItemImportError, + MissingFieldError, + UndeclaredLabelError, +) from datumaro.components.extractor import DatasetItem from datumaro.components.media import Image from datumaro.plugins.coco_format.converter import ( @@ -32,7 +42,9 @@ CocoPersonKeypointsConverter, CocoStuffConverter, ) +from datumaro.plugins.coco_format.extractor import CocoInstancesExtractor from datumaro.plugins.coco_format.importer import CocoImporter +from datumaro.util import dump_json_file from datumaro.util.test_utils import ( TestDir, check_save_and_load, @@ -836,6 +848,205 @@ def test_can_pickle(self): compare_datasets_strict(self, source, parsed) +class CocoExtractorTests(TestCase): + ANNOTATION_JSON_TEMPLATE = { + "images": [ + { + "id": 5, + "width": 10, + "height": 5, + "file_name": "a.jpg", + } + ], + "annotations": [ + { + "id": 1, + "image_id": 5, + "category_id": 1, + "segmentation": [], + "area": 3.0, + "bbox": [2, 2, 3, 1], + "iscrowd": 0, + } + ], + "categories": [ + { + "id": 1, + "name": "test", + } + ], + } + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_unexpected_file(self): + with TestDir() as test_dir: + with self.assertRaisesRegex(DatasetImportError, "JSON file"): + CocoInstancesExtractor(test_dir) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_missing_item_field(self): + for field in ["id", "file_name"]: + with self.subTest(field=field): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["images"][0].pop(field) + dump_json_file(ann_path, anns) + + with self.assertRaises(ItemImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, MissingFieldError) + self.assertEqual(capture.exception.__cause__.name, field) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_missing_ann_field(self): + for field in ["id", "image_id", "segmentation", "iscrowd", "category_id", "bbox"]: + with self.subTest(field=field): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0].pop(field) + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, MissingFieldError) + self.assertEqual(capture.exception.__cause__.name, field) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_missing_global_field(self): + for field in ["images", "annotations", "categories"]: + with self.subTest(field=field): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns.pop(field) + dump_json_file(ann_path, anns) + + with self.assertRaises(MissingFieldError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertEqual(capture.exception.name, field) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_missing_category_field(self): + for field in ["id", "name"]: + with self.subTest(field=field): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["categories"][0].pop(field) + dump_json_file(ann_path, anns) + + with self.assertRaises(MissingFieldError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertEqual(capture.exception.name, field) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_undeclared_label(self): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0]["category_id"] = 2 + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, UndeclaredLabelError) + self.assertEqual(capture.exception.__cause__.id, "2") + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_bbox(self): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0]["bbox"] = [1, 2, 3, 4, 5] + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidAnnotationError) + self.assertIn("Bbox has wrong value count", str(capture.exception.__cause__)) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_polygon_odd_points(self): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0]["segmentation"] = [[1, 2, 3]] + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidAnnotationError) + self.assertIn("not divisible by 2", str(capture.exception.__cause__)) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_polygon_less_than_3_points(self): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0]["segmentation"] = [[1, 2, 3, 4]] + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidAnnotationError) + self.assertIn("at least 3 (x, y) pairs", str(capture.exception.__cause__)) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_image_id(self): + with TestDir() as test_dir: + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0]["image_id"] = 10 + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidAnnotationError) + self.assertIn("Unknown image id", str(capture.exception.__cause__)) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_item_field_type(self): + with TestDir() as test_dir: + for field, value in [("id", "q"), ("width", "q"), ("height", "q"), ("file_name", 0)]: + with self.subTest(field=field, value=value): + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["images"][0][field] = value + dump_json_file(ann_path, anns) + + with self.assertRaises(ItemImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidFieldTypeError) + self.assertEqual(capture.exception.__cause__.name, field) + self.assertEqual(capture.exception.__cause__.actual, str(type(value))) + + @mark_requirement(Requirements.DATUM_ERROR_REPORTING) + def test_can_report_invalid_ann_field_type(self): + with TestDir() as test_dir: + for field, value in [ + ("id", "a"), + ("image_id", "a"), + ("segmentation", "a"), + ("iscrowd", "a"), + ("category_id", "a"), + ("bbox", "a"), + ("score", "a"), + ]: + with self.subTest(field=field): + ann_path = osp.join(test_dir, "ann.json") + anns = deepcopy(self.ANNOTATION_JSON_TEMPLATE) + anns["annotations"][0][field] = value + dump_json_file(ann_path, anns) + + with self.assertRaises(AnnotationImportError) as capture: + Dataset.import_from(ann_path, "coco_instances") + self.assertIsInstance(capture.exception.__cause__, InvalidFieldTypeError) + self.assertEqual(capture.exception.__cause__.name, field) + self.assertEqual(capture.exception.__cause__.actual, str(type(value))) + + class CocoConverterTest(TestCase): def _test_save_and_load( self, source_dataset, converter, test_dir, target_dataset=None, importer_args=None, **kwargs