Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataset_manager: remove imports that were deprecated in Python 3.9 #8745

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cvat/apps/dataset_manager/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from copy import copy, deepcopy

import math
from typing import Container, Optional, Sequence
from collections.abc import Container, Sequence
from typing import Optional
import numpy as np
from itertools import chain
from scipy.optimize import linear_sum_assignment
Expand Down
81 changes: 41 additions & 40 deletions cvat/apps/dataset_manager/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import os.path as osp
import re
import sys
from collections import OrderedDict, defaultdict
from collections.abc import Iterable, Iterator, Mapping, Sequence
from functools import reduce
from operator import add
from pathlib import Path
from types import SimpleNamespace
from typing import (Any, Callable, DefaultDict, Dict, Iterable, Iterator, List, Literal, Mapping,
NamedTuple, Optional, OrderedDict, Sequence, Set, Tuple, Union)
from typing import Any, Callable, Literal, NamedTuple, Optional, Union

from attrs.converters import to_bool
import datumaro as dm
Expand Down Expand Up @@ -277,12 +278,12 @@ def __init__(self,
self._create_callback = create_callback
self._MAX_ANNO_SIZE = 30000
self._frame_info = {}
self._frame_mapping: Dict[str, int] = {}
self._frame_mapping: dict[str, int] = {}
self._frame_step = db_task.data.get_frame_step()
self._db_data: models.Data = db_task.data
self._use_server_track_ids = use_server_track_ids
self._required_frames = included_frames
self._initialized_included_frames: Optional[Set[int]] = None
self._initialized_included_frames: Optional[set[int]] = None
self._db_subset = db_task.subset

super().__init__(db_task)
Expand Down Expand Up @@ -960,50 +961,50 @@ class LabeledShape:
type: str = attrib()
frame: int = attrib()
label: str = attrib()
points: List[float] = attrib()
points: list[float] = attrib()
occluded: bool = attrib()
attributes: List[InstanceLabelData.Attribute] = attrib()
attributes: list[InstanceLabelData.Attribute] = attrib()
source: str = attrib(default='manual')
group: int = attrib(default=0)
rotation: int = attrib(default=0)
z_order: int = attrib(default=0)
task_id: int = attrib(default=None)
subset: str = attrib(default=None)
outside: bool = attrib(default=False)
elements: List['ProjectData.LabeledShape'] = attrib(default=[])
elements: list['ProjectData.LabeledShape'] = attrib(default=[])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid Mutable Default Arguments in attrib

Using a mutable default value default=[] in attrib can lead to unexpected behavior, as all instances will share the same list. Instead, use factory=list to generate a new list for each instance.

Apply this diff to fix the issue:

- elements: list['ProjectData.LabeledShape'] = attrib(default=[])
+ elements: list['ProjectData.LabeledShape'] = attrib(factory=list)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
elements: list['ProjectData.LabeledShape'] = attrib(default=[])
elements: list['ProjectData.LabeledShape'] = attrib(factory=list)


@attrs
class TrackedShape:
type: str = attrib()
frame: int = attrib()
points: List[float] = attrib()
points: list[float] = attrib()
occluded: bool = attrib()
outside: bool = attrib()
keyframe: bool = attrib()
attributes: List[InstanceLabelData.Attribute] = attrib()
attributes: list[InstanceLabelData.Attribute] = attrib()
rotation: int = attrib(default=0)
source: str = attrib(default='manual')
group: int = attrib(default=0)
z_order: int = attrib(default=0)
label: str = attrib(default=None)
track_id: int = attrib(default=0)
elements: List['ProjectData.TrackedShape'] = attrib(default=[])
elements: list['ProjectData.TrackedShape'] = attrib(default=[])

@attrs
class Track:
label: str = attrib()
shapes: List['ProjectData.TrackedShape'] = attrib()
shapes: list['ProjectData.TrackedShape'] = attrib()
source: str = attrib(default='manual')
group: int = attrib(default=0)
task_id: int = attrib(default=None)
subset: str = attrib(default=None)
elements: List['ProjectData.Track'] = attrib(default=[])
elements: list['ProjectData.Track'] = attrib(default=[])

@attrs
class Tag:
frame: int = attrib()
label: str = attrib()
attributes: List[InstanceLabelData.Attribute] = attrib()
attributes: list[InstanceLabelData.Attribute] = attrib()
source: str = attrib(default='manual')
group: int = attrib(default=0)
task_id: int = attrib(default=None)
Expand All @@ -1017,8 +1018,8 @@ class Frame:
name: str = attrib()
width: int = attrib()
height: int = attrib()
labeled_shapes: List[Union['ProjectData.LabeledShape', 'ProjectData.TrackedShape']] = attrib()
tags: List['ProjectData.Tag'] = attrib()
labeled_shapes: list[Union['ProjectData.LabeledShape', 'ProjectData.TrackedShape']] = attrib()
tags: list['ProjectData.Tag'] = attrib()
task_id: int = attrib(default=None)
subset: str = attrib(default=None)

Expand All @@ -1037,12 +1038,12 @@ def __init__(self,
self._host = host
self._soft_attribute_import = False
self._project_annotation = project_annotation
self._tasks_data: Dict[int, TaskData] = {}
self._frame_info: Dict[Tuple[int, int], Literal["path", "width", "height", "subset"]] = dict()
self._tasks_data: dict[int, TaskData] = {}
self._frame_info: dict[tuple[int, int], Literal["path", "width", "height", "subset"]] = dict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct Type Annotation for _frame_info Dictionary

The type annotation dict[tuple[int, int], Literal["path", "width", "height", "subset"]] seems incorrect. Using Literal here implies the values are single strings, but the actual values are likely dictionaries containing these keys. Consider updating the ValueType to dict[str, Any] to accurately reflect the structure.

Apply this diff to fix the type annotation:

- self._frame_info: dict[tuple[int, int], Literal["path", "width", "height", "subset"]] = dict()
+ self._frame_info: dict[tuple[int, int], dict[str, Any]] = dict()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._frame_info: dict[tuple[int, int], Literal["path", "width", "height", "subset"]] = dict()
self._frame_info: dict[tuple[int, int], dict[str, Any]] = dict()

# (subset, path): (task id, frame number)
self._frame_mapping: Dict[Tuple[str, str], Tuple[int, int]] = dict()
self._frame_steps: Dict[int, int] = {}
self.new_tasks: Set[int] = set()
self._frame_mapping: dict[tuple[str, str], tuple[int, int]] = dict()
self._frame_steps: dict[int, int] = {}
self.new_tasks: set[int] = set()
self._use_server_track_ids = use_server_track_ids

InstanceLabelData.__init__(self, db_project)
Expand Down Expand Up @@ -1080,12 +1081,12 @@ def _init_tasks(self):
subsets = set()
for task in self._db_tasks.values():
subsets.add(task.subset)
self._subsets: List[str] = list(subsets)
self._subsets: list[str] = list(subsets)

self._frame_steps: Dict[int, int] = {task.id: task.data.get_frame_step() for task in self._db_tasks.values()}
self._frame_steps: dict[int, int] = {task.id: task.data.get_frame_step() for task in self._db_tasks.values()}

def _init_task_frame_offsets(self):
self._task_frame_offsets: Dict[int, int] = dict()
self._task_frame_offsets: dict[int, int] = dict()
s = 0
subset = None

Expand All @@ -1100,7 +1101,7 @@ def _init_task_frame_offsets(self):
def _init_frame_info(self):
self._frame_info = dict()
self._deleted_frames = { (task.id, frame): True for task in self._db_tasks.values() for frame in task.data.deleted_frames }
original_names = DefaultDict[Tuple[str, str], int](int)
original_names = defaultdict[tuple[str, str], int](int)
for task in self._db_tasks.values():
defaulted_subset = get_defaulted_subset(task.subset, self._subsets)
if hasattr(task.data, 'video'):
Expand Down Expand Up @@ -1254,7 +1255,7 @@ def _export_track(self, track: dict, task_id: int, task_size: int, idx: int):
)

def group_by_frame(self, include_empty: bool = False):
frames: Dict[Tuple[str, int], ProjectData.Frame] = {}
frames: dict[tuple[str, int], ProjectData.Frame] = {}
def get_frame(task_id: int, idx: int) -> ProjectData.Frame:
frame_info = self._frame_info[(task_id, idx)]
abs_frame = self.abs_frame_id(task_id, idx)
Expand Down Expand Up @@ -1365,7 +1366,7 @@ def db_project(self):
return self._db_project

@property
def subsets(self) -> List[str]:
def subsets(self) -> list[str]:
return self._subsets

@property
Expand Down Expand Up @@ -1447,7 +1448,7 @@ def split_dataset(self, dataset: dm.Dataset):
subset_dataset: dm.Dataset = dataset.subsets()[task_data.db_instance.subset].as_dataset()
yield subset_dataset, task_data

def add_labels(self, labels: List[dict]):
def add_labels(self, labels: list[dict]):
attributes = []
_labels = []
for label in labels:
Expand All @@ -1468,14 +1469,14 @@ def is_video(self) -> bool:
return self.db_task.mode == 'interpolation'

class MediaProvider:
def __init__(self, sources: Dict[int, MediaSource]) -> None:
def __init__(self, sources: dict[int, MediaSource]) -> None:
self._sources = sources

def unload(self) -> None:
pass

class MediaProvider2D(MediaProvider):
def __init__(self, sources: Dict[int, MediaSource]) -> None:
def __init__(self, sources: dict[int, MediaSource]) -> None:
super().__init__(sources)
self._current_source_id = None
self._frame_provider = None
Expand Down Expand Up @@ -1526,7 +1527,7 @@ def _unload_source(self) -> None:
self._current_source_id = None

class MediaProvider3D(MediaProvider):
def __init__(self, sources: Dict[int, MediaSource]) -> None:
def __init__(self, sources: dict[int, MediaSource]) -> None:
super().__init__(sources)
self._images_per_source = {
source_id: {
Expand Down Expand Up @@ -1554,7 +1555,7 @@ def get_media_for_frame(self, source_id: int, frame_id: int, **image_kwargs) ->

return dm.PointCloud(point_cloud_path, extra_images=related_images)

MEDIA_PROVIDERS_BY_DIMENSION: Dict[DimensionType, MediaProvider] = {
MEDIA_PROVIDERS_BY_DIMENSION: dict[DimensionType, MediaProvider] = {
DimensionType.DIM_3D: MediaProvider3D,
DimensionType.DIM_2D: MediaProvider2D,
}
Expand All @@ -1579,7 +1580,7 @@ def categories(self) -> dict:

@staticmethod
def _load_categories(labels: list):
categories: Dict[dm.AnnotationType,
categories: dict[dm.AnnotationType,
dm.Categories] = {}

label_categories = dm.LabelCategories(attributes=['occluded'])
Expand Down Expand Up @@ -1666,7 +1667,7 @@ def __init__(
{0: MediaSource(db_task)}
)

dm_items: List[dm.DatasetItem] = []
dm_items: list[dm.DatasetItem] = []
for frame_data in instance_data.group_by_frame(include_empty=True):
dm_media_args = { 'path': frame_data.name + ext }
if dimension == DimensionType.DIM_3D:
Expand Down Expand Up @@ -1763,13 +1764,13 @@ def __init__(
}
)

ext_per_task: Dict[int, str] = {
ext_per_task: dict[int, str] = {
task.id: TaskFrameProvider.VIDEO_FRAME_EXT if is_video else ''
for task in project_data.tasks
for is_video in [task.mode == 'interpolation']
}

dm_items: List[dm.DatasetItem] = []
dm_items: list[dm.DatasetItem] = []
for frame_data in project_data.group_by_frame(include_empty=True):
dm_media_args = { 'path': frame_data.name + ext_per_task[frame_data.task_id] }
if self._dimension == DimensionType.DIM_3D:
Expand Down Expand Up @@ -1881,7 +1882,7 @@ def _clean_display_message(self) -> str:
message = "Dataset must contain a file:" + message
return re.sub(r' +', " ", message)

def mangle_image_name(name: str, subset: str, names: DefaultDict[Tuple[str, str], int]) -> str:
def mangle_image_name(name: str, subset: str, names: defaultdict[tuple[str, str], int]) -> str:
name, ext = name.rsplit(osp.extsep, maxsplit=1)

if not names[(subset, name)]:
Expand All @@ -1902,7 +1903,7 @@ def mangle_image_name(name: str, subset: str, names: DefaultDict[Tuple[str, str]
i += 1
raise Exception('Cannot mangle image name')

def get_defaulted_subset(subset: str, subsets: List[str]) -> str:
def get_defaulted_subset(subset: str, subsets: list[str]) -> str:
if subset:
return subset
else:
Expand Down Expand Up @@ -2064,7 +2065,7 @@ def _convert_shape(self,

return results

def _convert_shapes(self, shapes: List[CommonData.LabeledShape]) -> Iterable[dm.Annotation]:
def _convert_shapes(self, shapes: list[CommonData.LabeledShape]) -> Iterable[dm.Annotation]:
dm_anno = []

self.num_of_tracks = reduce(
Expand All @@ -2078,7 +2079,7 @@ def _convert_shapes(self, shapes: List[CommonData.LabeledShape]) -> Iterable[dm.

return dm_anno

def convert(self) -> List[dm.Annotation]:
def convert(self) -> list[dm.Annotation]:
dm_anno = []
dm_anno.extend(self._convert_tags(self.cvat_frame_anno.tags))
dm_anno.extend(self._convert_shapes(self.cvat_frame_anno.labeled_shapes))
Expand All @@ -2091,7 +2092,7 @@ def convert_cvat_anno_to_dm(
map_label,
format_name=None,
dimension=DimensionType.DIM_2D
) -> List[dm.Annotation]:
) -> list[dm.Annotation]:
converter = CvatToDmAnnotationConverter(
cvat_frame_anno=cvat_frame_anno,
label_attrs=label_attrs,
Expand Down
5 changes: 3 additions & 2 deletions cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
# SPDX-License-Identifier: MIT

import os
from collections.abc import Mapping
from tempfile import TemporaryDirectory
import rq
from typing import Any, Callable, List, Mapping, Tuple
from typing import Any, Callable
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError

from django.db import transaction
Expand Down Expand Up @@ -109,7 +110,7 @@ def split_name(file):
project_data.new_tasks.add(db_task.id)
project_data.init()

def add_labels(self, labels: List[models.Label], attributes: List[Tuple[str, models.AttributeSpec]] = None):
def add_labels(self, labels: list[models.Label], attributes: list[tuple[str, models.AttributeSpec]] = None):
for label in labels:
label.project = self.db_project
# We need label_id here, so we can't use bulk_create here
Expand Down
4 changes: 2 additions & 2 deletions cvat/apps/dataset_manager/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tempfile
import unittest
from types import TracebackType
from typing import Optional, Type
from typing import Optional

from datumaro.util.os_util import rmfile, rmtree

Expand All @@ -23,7 +23,7 @@ def __enter__(self) -> str:

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
Expand Down
3 changes: 2 additions & 1 deletion cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import os.path as osp
import re
import zipfile
from collections.abc import Generator, Sequence
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta
from threading import Lock
from typing import Any, Generator, Optional, Sequence
from typing import Any, Optional

import attrs
import django_rq
Expand Down
Loading