From 8c63d5ee3ac2c384bc6e5d46077320254315261e Mon Sep 17 00:00:00 2001 From: Roman Donchenko Date: Fri, 20 Dec 2024 18:18:35 +0200 Subject: [PATCH] engine: remove deprecated imports This is a continuation of #8626. --- cvat/apps/engine/background.py | 6 ++-- cvat/apps/engine/backup.py | 5 +-- cvat/apps/engine/cache.py | 27 +++++----------- cvat/apps/engine/cloud_provider.py | 31 ++++++++++--------- cvat/apps/engine/field_validation.py | 3 +- cvat/apps/engine/filters.py | 13 ++++---- cvat/apps/engine/frame_provider.py | 21 ++++++------- cvat/apps/engine/lazy_list.py | 3 +- cvat/apps/engine/location.py | 6 ++-- cvat/apps/engine/media_extractors.py | 22 ++++++------- .../migrations/0083_move_to_segment_chunks.py | 3 +- .../migrations/0084_honeypot_support.py | 2 +- cvat/apps/engine/mixins.py | 5 +-- cvat/apps/engine/models.py | 5 +-- cvat/apps/engine/permissions.py | 11 ++++--- cvat/apps/engine/schema.py | 3 +- cvat/apps/engine/serializers.py | 22 +++++++------ cvat/apps/engine/task.py | 27 ++++++++-------- cvat/apps/engine/task_validation.py | 3 +- cvat/apps/engine/tests/utils.py | 7 +++-- cvat/apps/engine/utils.py | 9 +++--- cvat/apps/engine/view_utils.py | 10 +++--- cvat/apps/engine/views.py | 11 ++++--- 23 files changed, 127 insertions(+), 128 deletions(-) diff --git a/cvat/apps/engine/background.py b/cvat/apps/engine/background.py index d9f9237e6d27..a3a2d34326b9 100644 --- a/cvat/apps/engine/background.py +++ b/cvat/apps/engine/background.py @@ -7,7 +7,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime -from typing import Any, Callable, Dict, Optional, Union +from typing import Any, Callable, Optional, Union import django_rq from attrs.converters import to_bool @@ -170,7 +170,7 @@ class ExportArgs: format: str filename: str save_images: bool - location_config: Dict[str, Any] + location_config: dict[str, Any] @property def location(self) -> Location: @@ -515,7 +515,7 @@ class BackupExportManager(_ResourceExportManager): @dataclass class ExportArgs: filename: str - location_config: Dict[str, Any] + location_config: dict[str, Any] @property def location(self) -> Location: diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 499700a3b4ef..3c8ba5678c24 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -10,10 +10,11 @@ import shutil import tempfile import uuid +from collections.abc import Collection, Iterable from enum import Enum from logging import Logger from tempfile import NamedTemporaryFile -from typing import Any, Collection, Dict, Iterable, Optional, Union +from typing import Any, Optional, Union from zipfile import ZipFile import django_rq @@ -650,7 +651,7 @@ def _calculate_segment_size(jobs): return segment_size, overlap @staticmethod - def _parse_segment_frames(*, jobs: Dict[str, Any]) -> JobFileMapping: + def _parse_segment_frames(*, jobs: dict[str, Any]) -> JobFileMapping: segments = [] for i, segment in enumerate(jobs): diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index f89ca741501e..43c2be7bc57e 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -13,22 +13,11 @@ import time import zipfile import zlib +from collections.abc import Collection, Generator, Iterator, Sequence from contextlib import ExitStack, closing from datetime import datetime, timezone from itertools import groupby, pairwise -from typing import ( - Any, - Callable, - Collection, - Generator, - Iterator, - Optional, - Sequence, - Tuple, - Type, - Union, - overload, -) +from typing import Any, Callable, Optional, Union, overload import attrs import av @@ -76,8 +65,8 @@ slogger = ServerLogManager(__name__) -DataWithMime = Tuple[io.BytesIO, str] -_CacheItem = Tuple[io.BytesIO, str, int, Union[datetime, None]] +DataWithMime = tuple[io.BytesIO, str] +_CacheItem = tuple[io.BytesIO, str, int, Union[datetime, None]] def enqueue_create_chunk_job( @@ -636,7 +625,7 @@ def _read_raw_images( @staticmethod def _read_raw_frames( db_task: Union[models.Task, int], frame_ids: Sequence[int] - ) -> Generator[Tuple[Union[av.VideoFrame, PIL.Image.Image], str, str], None, None]: + ) -> Generator[tuple[Union[av.VideoFrame, PIL.Image.Image], str, str], None, None]: if isinstance(db_task, int): db_task = models.Task.objects.get(pk=db_task) @@ -962,7 +951,7 @@ def prepare_preview_image(image: PIL.Image.Image) -> DataWithMime: def prepare_chunk( - task_chunk_frames: Iterator[Tuple[Any, str, int]], + task_chunk_frames: Iterator[tuple[Any, str, int]], *, quality: FrameQuality, db_task: models.Task, @@ -972,7 +961,7 @@ def prepare_chunk( db_data = db_task.data - writer_classes: dict[FrameQuality, Type[IChunkWriter]] = { + writer_classes: dict[FrameQuality, type[IChunkWriter]] = { FrameQuality.COMPRESSED: ( Mpeg4CompressedChunkWriter if db_data.compressed_chunk_type == models.DataChoice.VIDEO @@ -1005,7 +994,7 @@ def prepare_chunk( return buffer, get_chunk_mime_type_for_writer(writer_class) -def get_chunk_mime_type_for_writer(writer: Union[IChunkWriter, Type[IChunkWriter]]) -> str: +def get_chunk_mime_type_for_writer(writer: Union[IChunkWriter, type[IChunkWriter]]) -> str: if isinstance(writer, IChunkWriter): writer_class = type(writer) else: diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index f3fe3e6a28e1..b810304d73f9 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -7,11 +7,12 @@ import json import os import math -from abc import ABC, abstractmethod, abstractproperty +from abc import ABC, abstractmethod +from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor, wait, FIRST_EXCEPTION from enum import Enum from io import BytesIO -from typing import Dict, List, Optional, Any, Callable, TypeVar, Iterator -from concurrent.futures import ThreadPoolExecutor, wait, FIRST_EXCEPTION +from typing import Optional, Any, Callable, TypeVar import boto3 from azure.core.exceptions import HttpResponseError, ResourceExistsError @@ -135,7 +136,8 @@ class _CloudStorage(ABC): def __init__(self, prefix: Optional[str] = None): self.prefix = prefix - @abstractproperty + @property + @abstractmethod def name(self): pass @@ -232,7 +234,7 @@ def optimally_image_download(self, key: str, chunk_size: int = 65536) -> NamedBy def bulk_download_to_memory( self, - files: List[str], + files: list[str], *, threads_number: Optional[int] = None, _use_optimal_downloading: bool = True, @@ -246,7 +248,7 @@ def bulk_download_to_memory( def bulk_download_to_dir( self, - files: List[str], + files: list[str], upload_dir: str, *, threads_number: Optional[int] = None, @@ -274,7 +276,7 @@ def _list_raw_content_on_one_page( prefix: str = "", next_token: Optional[str] = None, page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, - ) -> Dict: + ) -> dict: pass def list_files_on_one_page( @@ -284,7 +286,7 @@ def list_files_on_one_page( page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, _use_flat_listing: bool = False, _use_sort: bool = False, - ) -> Dict: + ) -> dict: if self.prefix and prefix and not (self.prefix.startswith(prefix) or prefix.startswith(self.prefix)): return { @@ -337,7 +339,7 @@ def list_files( self, prefix: str = "", _use_flat_listing: bool = False, - ) -> List[str]: + ) -> list[str]: all_files = [] next_token = None while True: @@ -349,7 +351,8 @@ def list_files( return all_files - @abstractproperty + @property + @abstractmethod def supported_actions(self): pass @@ -365,7 +368,7 @@ def get_cloud_storage_instance( cloud_provider: CloudProviderChoice, resource: str, credentials: str, - specific_attributes: Optional[Dict[str, Any]] = None, + specific_attributes: Optional[dict[str, Any]] = None, ): instance = None if cloud_provider == CloudProviderChoice.AWS_S3: @@ -529,7 +532,7 @@ def _list_raw_content_on_one_page( prefix: str = "", next_token: Optional[str] = None, page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, - ) -> Dict: + ) -> dict: # The structure of response looks like this: # { # 'CommonPrefixes': [{'Prefix': 'sub/'}], @@ -736,7 +739,7 @@ def _list_raw_content_on_one_page( prefix: str = "", next_token: Optional[str] = None, page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, - ) -> Dict: + ) -> dict: page = self._client.walk_blobs( maxresults=page_size, results_per_page=page_size, delimiter='/', **({'name_starts_with': prefix} if prefix else {}) @@ -852,7 +855,7 @@ def _list_raw_content_on_one_page( prefix: str = "", next_token: Optional[str] = None, page_size: int = settings.BUCKET_CONTENT_MAX_PAGE_SIZE, - ) -> Dict: + ) -> dict: iterator = self._client.list_blobs( bucket_or_name=self.name, max_results=page_size, page_size=page_size, fields='items(name),nextPageToken,prefixes', # https://cloud.google.com/storage/docs/json_api/v1/parameters#fields diff --git a/cvat/apps/engine/field_validation.py b/cvat/apps/engine/field_validation.py index bbfa58b5f3ea..e411284b3cde 100644 --- a/cvat/apps/engine/field_validation.py +++ b/cvat/apps/engine/field_validation.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: MIT -from typing import Any, Sequence +from collections.abc import Sequence +from typing import Any from rest_framework import serializers diff --git a/cvat/apps/engine/filters.py b/cvat/apps/engine/filters.py index 663b6554e168..32355629d06d 100644 --- a/cvat/apps/engine/filters.py +++ b/cvat/apps/engine/filters.py @@ -3,8 +3,9 @@ # # SPDX-License-Identifier: MIT -from typing import Any, Dict, Tuple, List, Iterator, Optional, Iterable +from collections.abc import Iterator, Iterable from functools import reduce +from typing import Any, Optional import operator import json @@ -25,7 +26,7 @@ DEFAULT_FILTER_FIELDS_ATTR = 'filter_fields' DEFAULT_LOOKUP_MAP_ATTR = 'lookup_fields' -def get_lookup_fields(view, fields: Optional[Iterator[str]] = None) -> Dict[str, str]: +def get_lookup_fields(view, fields: Optional[Iterator[str]] = None) -> dict[str, str]: if fields is None: fields = getattr(view, DEFAULT_FILTER_FIELDS_ATTR, None) or [] @@ -134,7 +135,7 @@ def get_schema_operation_parameters(self, view): }] if ordering_fields else [] class JsonLogicFilter(filters.BaseFilterBackend): - Rules = Dict[str, Any] + Rules = dict[str, Any] filter_param = 'filter' filter_title = _('Filter') filter_description = _(dedent(""" @@ -191,7 +192,7 @@ def _parse_query(self, json_rules: str) -> Rules: return rules def apply_filter(self, - queryset: QuerySet, parsed_rules: Rules, *, lookup_fields: Dict[str, Any] + queryset: QuerySet, parsed_rules: Rules, *, lookup_fields: dict[str, Any] ) -> QuerySet: try: q_object = self._build_Q(parsed_rules, lookup_fields) @@ -362,7 +363,7 @@ class DotDict(dict): __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ - def __init__(self, dct: Dict): + def __init__(self, dct: dict): for key, value in dct.items(): if isinstance(value, dict): value = self.__class__(value) @@ -454,7 +455,7 @@ class NonModelOrderingFilter(OrderingFilter, _NestedAttributeHandler): ?sort=-field1,-field2 """ - def get_ordering(self, request, queryset, view) -> Tuple[List[str], bool]: + def get_ordering(self, request, queryset, view) -> tuple[list[str], bool]: ordering = super().get_ordering(request, queryset, view) result, reverse = [], False for field in ordering: diff --git a/cvat/apps/engine/frame_provider.py b/cvat/apps/engine/frame_provider.py index a004256320aa..6b756543c7f3 100644 --- a/cvat/apps/engine/frame_provider.py +++ b/cvat/apps/engine/frame_provider.py @@ -11,6 +11,7 @@ from abc import ABCMeta, abstractmethod from bisect import bisect from collections import OrderedDict +from collections.abc import Iterator, Sequence from dataclasses import dataclass from enum import Enum, auto from io import BytesIO @@ -18,11 +19,7 @@ Any, Callable, Generic, - Iterator, Optional, - Sequence, - Tuple, - Type, TypeVar, Union, overload, @@ -53,7 +50,7 @@ class _ChunkLoader(metaclass=ABCMeta): def __init__( self, - reader_class: Type[IMediaReader], + reader_class: type[IMediaReader], *, reader_params: Optional[dict] = None, ) -> None: @@ -62,7 +59,7 @@ def __init__( self.reader_class = reader_class self.reader_params = reader_params - def load(self, chunk_id: int) -> RandomAccessIterator[Tuple[Any, str, int]]: + def load(self, chunk_id: int) -> RandomAccessIterator[tuple[Any, str, int]]: if self.chunk_id != chunk_id: self.unload() @@ -88,7 +85,7 @@ def read_chunk(self, chunk_id: int) -> DataWithMime: ... class _FileChunkLoader(_ChunkLoader): def __init__( self, - reader_class: Type[IMediaReader], + reader_class: type[IMediaReader], get_chunk_path_callback: Callable[[int], str], *, reader_params: Optional[dict] = None, @@ -108,7 +105,7 @@ def read_chunk(self, chunk_id: int) -> DataWithMime: class _BufferChunkLoader(_ChunkLoader): def __init__( self, - reader_class: Type[IMediaReader], + reader_class: type[IMediaReader], get_chunk_callback: Callable[[int], DataWithMime], *, reader_params: Optional[dict] = None, @@ -154,7 +151,7 @@ def _av_frame_to_png_bytes(cls, av_frame: av.VideoFrame) -> BytesIO: return BytesIO(result.tobytes()) def _convert_frame( - self, frame: Any, reader_class: Type[IMediaReader], out_type: FrameOutputType + self, frame: Any, reader_class: type[IMediaReader], out_type: FrameOutputType ) -> AnyFrame: if out_type == FrameOutputType.BUFFER: return ( @@ -451,7 +448,7 @@ def __init__(self, db_segment: models.Segment) -> None: db_data = db_segment.task.data - reader_class: dict[models.DataChoice, Tuple[Type[IMediaReader], Optional[dict]]] = { + reader_class: dict[models.DataChoice, tuple[type[IMediaReader], Optional[dict]]] = { models.DataChoice.IMAGESET: (ZipReader, None), models.DataChoice.VIDEO: ( VideoReader, @@ -523,7 +520,7 @@ def get_frame_index(self, frame_number: int) -> Optional[int]: return frame_index - def validate_frame_number(self, frame_number: int) -> Tuple[int, int, int]: + def validate_frame_number(self, frame_number: int) -> tuple[int, int, int]: frame_index = self.get_frame_index(frame_number) if frame_index is None: raise ValidationError(f"Incorrect requested frame number: {frame_number}") @@ -576,7 +573,7 @@ def _get_raw_frame( frame_number: int, *, quality: FrameQuality = FrameQuality.ORIGINAL, - ) -> Tuple[Any, str, Type[IMediaReader]]: + ) -> tuple[Any, str, type[IMediaReader]]: _, chunk_number, frame_offset = self.validate_frame_number(frame_number) loader = self._loaders[quality] chunk_reader = loader.load(chunk_number) diff --git a/cvat/apps/engine/lazy_list.py b/cvat/apps/engine/lazy_list.py index 61d2c8956209..e8a36a09641f 100644 --- a/cvat/apps/engine/lazy_list.py +++ b/cvat/apps/engine/lazy_list.py @@ -2,9 +2,10 @@ # # SPDX-License-Identifier: MIT +from collections.abc import Iterator from functools import wraps from itertools import islice -from typing import Any, Callable, Iterator, TypeVar, overload +from typing import Any, Callable, TypeVar, overload import attrs from attr import field diff --git a/cvat/apps/engine/location.py b/cvat/apps/engine/location.py index ac6ab77dc073..c9e216e24627 100644 --- a/cvat/apps/engine/location.py +++ b/cvat/apps/engine/location.py @@ -3,7 +3,7 @@ # SPDX-License-Identifier: MIT from enum import Enum -from typing import Any, Dict, Union, Optional +from typing import Any, Union, Optional from cvat.apps.engine.models import Location, Project, Task, Job @@ -15,11 +15,11 @@ def __str__(self): return self.value def get_location_configuration( - query_params: Dict[str, Any], + query_params: dict[str, Any], field_name: str, *, db_instance: Optional[Union[Project, Task, Job]] = None, -) -> Dict[str, Any]: +) -> dict[str, Any]: location = query_params.get('location') # handle resource import diff --git a/cvat/apps/engine/media_extractors.py b/cvat/apps/engine/media_extractors.py index 3e7b8e17a31b..ae1c7b9f7da8 100644 --- a/cvat/apps/engine/media_extractors.py +++ b/cvat/apps/engine/media_extractors.py @@ -15,13 +15,11 @@ import struct from abc import ABC, abstractmethod from bisect import bisect -from contextlib import ExitStack, closing, contextmanager +from collections.abc import Generator, Iterable, Iterator, Sequence +from contextlib import AbstractContextManager, ExitStack, closing, contextmanager from dataclasses import dataclass from enum import IntEnum -from typing import ( - Any, Callable, ContextManager, Generator, Iterable, Iterator, Optional, Protocol, - Sequence, Tuple, TypeVar, Union -) +from typing import Any, Callable, Optional, Protocol, TypeVar, Union import av import av.codec @@ -612,7 +610,7 @@ def iterate_frames( *, frame_filter: Union[bool, Iterable[int]] = True, video_stream: Optional[av.video.stream.VideoStream] = None, - ) -> Iterator[Tuple[av.VideoFrame, str, int]]: + ) -> Iterator[tuple[av.VideoFrame, str, int]]: """ If provided, frame_filter must be an ordered sequence in the ascending order. 'True' means using the frames configured in the reader object. @@ -673,14 +671,14 @@ def iterate_frames( if next_frame_filter_frame is None: return - def __iter__(self) -> Iterator[Tuple[av.VideoFrame, str, int]]: + def __iter__(self) -> Iterator[tuple[av.VideoFrame, str, int]]: return self.iterate_frames() def get_progress(self, pos): duration = self._get_duration() return pos / duration if duration else None - def _read_av_container(self) -> ContextManager[av.container.InputContainer]: + def _read_av_container(self) -> AbstractContextManager[av.container.InputContainer]: return _AvVideoReading().read_av_container(self._source_path[0]) def _decode_stream( @@ -771,7 +769,7 @@ def __init__(self, manifest_path: str, source_path: str, *, allow_threading: boo self.allow_threading = allow_threading - def _read_av_container(self) -> ContextManager[av.container.InputContainer]: + def _read_av_container(self) -> AbstractContextManager[av.container.InputContainer]: return _AvVideoReading().read_av_container(self.source_path) def _decode_stream( @@ -1032,11 +1030,11 @@ def _add_video_stream(self, container: av.container.OutputContainer, w, h, rate, return video_stream - FrameDescriptor = Tuple[av.VideoFrame, Any, Any] + FrameDescriptor = tuple[av.VideoFrame, Any, Any] def _peek_first_frame( self, frame_iter: Iterator[FrameDescriptor] - ) -> Tuple[Optional[FrameDescriptor], Iterator[FrameDescriptor]]: + ) -> tuple[Optional[FrameDescriptor], Iterator[FrameDescriptor]]: "Gets the first frame and returns the same full iterator" if not hasattr(frame_iter, '__next__'): @@ -1047,7 +1045,7 @@ def _peek_first_frame( def save_as_chunk( self, images: Iterator[FrameDescriptor], chunk_path: str - ) -> Sequence[Tuple[int, int]]: + ) -> Sequence[tuple[int, int]]: first_frame, images = self._peek_first_frame(images) if not first_frame: raise Exception('no images to save') diff --git a/cvat/apps/engine/migrations/0083_move_to_segment_chunks.py b/cvat/apps/engine/migrations/0083_move_to_segment_chunks.py index 8ef887d4c54b..4138d9295c87 100644 --- a/cvat/apps/engine/migrations/0083_move_to_segment_chunks.py +++ b/cvat/apps/engine/migrations/0083_move_to_segment_chunks.py @@ -1,8 +1,9 @@ # Generated by Django 4.2.13 on 2024-08-12 09:49 import os +from collections.abc import Iterable from itertools import islice -from typing import Iterable, TypeVar +from typing import TypeVar from django.db import migrations diff --git a/cvat/apps/engine/migrations/0084_honeypot_support.py b/cvat/apps/engine/migrations/0084_honeypot_support.py index 721d400ec386..fb44839c50bd 100644 --- a/cvat/apps/engine/migrations/0084_honeypot_support.py +++ b/cvat/apps/engine/migrations/0084_honeypot_support.py @@ -1,7 +1,7 @@ # Generated by Django 4.2.15 on 2024-09-23 13:11 -from typing import Collection from collections import defaultdict +from collections.abc import Collection import django.db.models.deletion from django.db import migrations, models diff --git a/cvat/apps/engine/mixins.py b/cvat/apps/engine/mixins.py index 3e48bf85327e..39f50ed31db4 100644 --- a/cvat/apps/engine/mixins.py +++ b/cvat/apps/engine/mixins.py @@ -8,12 +8,13 @@ import os import os.path import uuid +from collections.abc import Mapping from dataclasses import asdict, dataclass from pathlib import Path from tempfile import NamedTemporaryFile from unittest import mock from textwrap import dedent -from typing import Optional, Callable, Dict, Any, Mapping +from typing import Optional, Callable, Any from urllib.parse import urljoin import django_rq @@ -424,7 +425,7 @@ def export_dataset_v1( request, save_images: bool, *, - get_data: Optional[Callable[[int], Dict[str, Any]]] = None, + get_data: Optional[Callable[[int], dict[str, Any]]] = None, ) -> Response: if request.query_params.get("format"): callback = self.get_export_callback(save_images) diff --git a/cvat/apps/engine/models.py b/cvat/apps/engine/models.py index 527741497531..c25c75404eaf 100644 --- a/cvat/apps/engine/models.py +++ b/cvat/apps/engine/models.py @@ -10,9 +10,10 @@ import re import shutil import uuid +from collections.abc import Collection, Sequence from enum import Enum from functools import cached_property -from typing import Any, ClassVar, Collection, Dict, Optional, Sequence +from typing import Any, ClassVar, Optional from django.conf import settings from django.contrib.auth.models import User @@ -824,7 +825,7 @@ def update_or_create(self, *args, **kwargs: Any): return super().update_or_create(*args, **kwargs) - def _validate_constraints(self, obj: Dict[str, Any]): + def _validate_constraints(self, obj: dict[str, Any]): if 'type' not in obj: return diff --git a/cvat/apps/engine/permissions.py b/cvat/apps/engine/permissions.py index d01036fc9004..c5ddd4799c4c 100644 --- a/cvat/apps/engine/permissions.py +++ b/cvat/apps/engine/permissions.py @@ -4,7 +4,8 @@ # SPDX-License-Identifier: MIT from collections import namedtuple -from typing import Any, Dict, List, Optional, Sequence, Union, cast +from collections.abc import Sequence +from typing import Any, Optional, Union, cast from django.shortcuts import get_object_or_404 from django.conf import settings @@ -21,7 +22,7 @@ from .models import AnnotationGuide, CloudStorage, Issue, Job, Label, Project, Task from cvat.apps.engine.utils import is_dataset_export -def _get_key(d: Dict[str, Any], key_path: Union[str, Sequence[str]]) -> Optional[Any]: +def _get_key(d: dict[str, Any], key_path: Union[str, Sequence[str]]) -> Optional[Any]: """ Like dict.get(), but supports nested fields. If the field is missing, returns None. """ @@ -466,7 +467,7 @@ def __init__(self, **kwargs): self.url = settings.IAM_OPA_DATA_URL + '/tasks/allow' @staticmethod - def get_scopes(request, view, obj) -> List[Scopes]: + def get_scopes(request, view, obj) -> list[Scopes]: Scopes = __class__.Scopes scope = { ('list', 'GET'): Scopes.LIST, @@ -1191,7 +1192,7 @@ class Scopes(StrEnum): CANCEL = 'cancel' @classmethod - def create(cls, request, view, obj: Optional[RQJob], iam_context: Dict): + def create(cls, request, view, obj: Optional[RQJob], iam_context: dict): permissions = [] if view.basename == 'request': for scope in cls.get_scopes(request, view, obj): @@ -1207,7 +1208,7 @@ def __init__(self, **kwargs): self.url = settings.IAM_OPA_DATA_URL + '/requests/allow' @staticmethod - def get_scopes(request, view, obj) -> List[Scopes]: + def get_scopes(request, view, obj) -> list[Scopes]: Scopes = __class__.Scopes return [{ ('list', 'GET'): Scopes.LIST, diff --git a/cvat/apps/engine/schema.py b/cvat/apps/engine/schema.py index 5931381b403d..f3914a03dddd 100644 --- a/cvat/apps/engine/schema.py +++ b/cvat/apps/engine/schema.py @@ -3,7 +3,6 @@ # SPDX-License-Identifier: MIT import textwrap -from typing import Type from drf_spectacular.extensions import OpenApiSerializerExtension from drf_spectacular.plumbing import build_basic_type, force_instance @@ -15,7 +14,7 @@ def _copy_serializer( instance: serializers.Serializer, *, - _new_type: Type[serializers.Serializer] = None, + _new_type: type[serializers.Serializer] = None, **kwargs ) -> serializers.Serializer: _new_type = _new_type or type(instance) diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index cf16d885163c..9f772cd24e6d 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -5,6 +5,8 @@ from __future__ import annotations +from collections import OrderedDict +from collections.abc import Iterable, Sequence from contextlib import closing import warnings from copy import copy @@ -17,7 +19,7 @@ import string from tempfile import NamedTemporaryFile import textwrap -from typing import Any, Dict, Iterable, Optional, OrderedDict, Sequence, Union +from typing import Any, Optional, Union import django_rq from django.conf import settings @@ -367,9 +369,9 @@ def check_attribute_names_unique(attrs): @transaction.atomic def update_label( cls, - validated_data: Dict[str, Any], + validated_data: dict[str, Any], svg: str, - sublabels: Iterable[Dict[str, Any]], + sublabels: Iterable[dict[str, Any]], *, parent_instance: Union[models.Project, models.Task], parent_label: Optional[models.Label] = None @@ -483,7 +485,7 @@ def update_label( @classmethod @transaction.atomic def create_labels(cls, - labels: Iterable[Dict[str, Any]], + labels: Iterable[dict[str, Any]], *, parent_instance: Union[models.Project, models.Task], parent_label: Optional[models.Label] = None @@ -534,7 +536,7 @@ def create_labels(cls, @classmethod @transaction.atomic def update_labels(cls, - labels: Iterable[Dict[str, Any]], + labels: Iterable[dict[str, Any]], *, parent_instance: Union[models.Project, models.Task], parent_label: Optional[models.Label] = None @@ -3270,7 +3272,7 @@ class Meta: def _update_related_storages( instance: Union[models.Project, models.Task], - validated_data: Dict[str, Any], + validated_data: dict[str, Any], ) -> None: for storage_type in ('source_storage', 'target_storage'): new_conf = validated_data.pop(storage_type, None) @@ -3325,7 +3327,7 @@ def _update_related_storages( storage_instance.cloud_storage_id = new_cloud_storage_id storage_instance.save() -def _configure_related_storages(validated_data: Dict[str, Any]) -> Dict[str, Optional[models.Storage]]: +def _configure_related_storages(validated_data: dict[str, Any]) -> dict[str, Optional[models.Storage]]: storages = { 'source_storage': None, 'target_storage': None, @@ -3418,7 +3420,7 @@ class RequestDataOperationSerializer(serializers.Serializer): format = serializers.CharField(required=False, allow_null=True) function_id = serializers.CharField(required=False, allow_null=True) - def to_representation(self, rq_job: RQJob) -> Dict[str, Any]: + def to_representation(self, rq_job: RQJob) -> dict[str, Any]: parsed_rq_id: RQId = rq_job.parsed_rq_id return { @@ -3459,7 +3461,7 @@ class RequestSerializer(serializers.Serializer): result_id = serializers.IntegerField(required=False, allow_null=True) @extend_schema_field(UserIdentifiersSerializer()) - def get_owner(self, rq_job: RQJob) -> Dict[str, Any]: + def get_owner(self, rq_job: RQJob) -> dict[str, Any]: return UserIdentifiersSerializer(rq_job.meta[RQJobMetaField.USER]).data @extend_schema_field( @@ -3499,7 +3501,7 @@ def get_message(self, rq_job: RQJob) -> str: return message - def to_representation(self, rq_job: RQJob) -> Dict[str, Any]: + def to_representation(self, rq_job: RQJob) -> dict[str, Any]: representation = super().to_representation(rq_job) # FUTURE-TODO: support such statuses on UI diff --git a/cvat/apps/engine/task.py b/cvat/apps/engine/task.py index 3fac8f03fe65..0f36674299fc 100644 --- a/cvat/apps/engine/task.py +++ b/cvat/apps/engine/task.py @@ -10,11 +10,12 @@ import re import rq import shutil +from collections.abc import Iterator, Sequence from copy import deepcopy from contextlib import closing from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, Iterator, List, NamedTuple, Optional, Sequence, Tuple, Union +from typing import Any, NamedTuple, Optional, Union from urllib import parse as urlparse from urllib import request as urlrequest @@ -77,7 +78,7 @@ def create( ############################# Internal implementation for server API -JobFileMapping = List[List[str]] +JobFileMapping = list[list[str]] class SegmentParams(NamedTuple): start_frame: int @@ -91,10 +92,10 @@ class SegmentsParams(NamedTuple): overlap: int def _copy_data_from_share_point( - server_files: List[str], + server_files: list[str], upload_dir: str, server_dir: Optional[str] = None, - server_files_exclude: Optional[List[str]] = None, + server_files_exclude: Optional[list[str]] = None, ): job = rq.get_current_job() job.meta['status'] = 'Data are being copied from source..' @@ -304,7 +305,7 @@ def _validate_data(counter, manifest_files=None): return counter, task_modes[0] def _validate_job_file_mapping( - db_task: models.Task, data: Dict[str, Any] + db_task: models.Task, data: dict[str, Any] ) -> Optional[JobFileMapping]: job_file_mapping = data.get('job_file_mapping', None) @@ -343,7 +344,7 @@ def _validate_job_file_mapping( return job_file_mapping def _validate_validation_params( - db_task: models.Task, data: Dict[str, Any], *, is_backup_restore: bool = False + db_task: models.Task, data: dict[str, Any], *, is_backup_restore: bool = False ) -> Optional[dict[str, Any]]: params = data.get('validation_params', {}) if not params: @@ -382,7 +383,7 @@ def _validate_validation_params( return params def _validate_manifest( - manifests: List[str], + manifests: list[str], root_dir: Optional[str], *, is_in_cloud: bool, @@ -455,7 +456,7 @@ def _download_data(urls, upload_dir): def _download_data_from_cloud_storage( db_storage: models.CloudStorage, - files: List[str], + files: list[str], upload_dir: str, ): cloud_storage_instance = db_storage_to_storage_instance(db_storage) @@ -479,7 +480,7 @@ def _read_dataset_manifest(path: str, *, create_index: bool = False) -> ImageMan def _restore_file_order_from_manifest( extractor: ImageListReader, manifest: ImageManifestManager, upload_dir: str -) -> List[str]: +) -> list[str]: """ Restores file ordering for the "predefined" file sorting method of the task creation. Checks for extra files in the input. @@ -511,7 +512,7 @@ def _restore_file_order_from_manifest( return [input_files[fn] for fn in manifest_files] def _create_task_manifest_based_on_cloud_storage_manifest( - sorted_media: List[str], + sorted_media: list[str], cloud_storage_manifest_prefix: str, cloud_storage_manifest: ImageManifestManager, manifest: ImageManifestManager, @@ -536,7 +537,7 @@ def _add_prefix(properties): def _create_task_manifest_from_cloud_data( db_storage: models.CloudStorage, - sorted_media: List[str], + sorted_media: list[str], manifest: ImageManifestManager, dimension: models.DimensionType = models.DimensionType.DIM_2D, *, @@ -557,7 +558,7 @@ def _create_task_manifest_from_cloud_data( @transaction.atomic def _create_thread( db_task: Union[int, models.Task], - data: Dict[str, Any], + data: dict[str, Any], *, is_backup_restore: bool = False, is_dataset_import: bool = False, @@ -1598,7 +1599,7 @@ def save_chunks( frame_map = {} # frame number -> extractor frame number if isinstance(media_extractor, MEDIA_TYPES['video']['extractor']): - def _get_frame_size(frame_tuple: Tuple[av.VideoFrame, Any, Any]) -> int: + def _get_frame_size(frame_tuple: tuple[av.VideoFrame, Any, Any]) -> int: # There is no need to be absolutely precise here, # just need to provide the reasonable upper boundary. # Return bytes needed for 1 frame diff --git a/cvat/apps/engine/task_validation.py b/cvat/apps/engine/task_validation.py index 3f15b7d79716..fe76b4e99408 100644 --- a/cvat/apps/engine/task_validation.py +++ b/cvat/apps/engine/task_validation.py @@ -2,7 +2,8 @@ # # SPDX-License-Identifier: MIT -from typing import Generic, Mapping, Sequence, TypeVar +from collections.abc import Mapping, Sequence +from typing import Generic, TypeVar import numpy as np diff --git a/cvat/apps/engine/tests/utils.py b/cvat/apps/engine/tests/utils.py index 3d2a533d1e97..910323cac1f7 100644 --- a/cvat/apps/engine/tests/utils.py +++ b/cvat/apps/engine/tests/utils.py @@ -2,9 +2,10 @@ # # SPDX-License-Identifier: MIT +from collections.abc import Iterator, Sequence from contextlib import contextmanager from io import BytesIO -from typing import Any, Callable, Dict, Iterator, Sequence, TypeVar +from typing import Any, Callable, TypeVar import itertools import logging import os @@ -178,6 +179,6 @@ def get_paginated_collection( def filter_dict( - d: Dict[str, Any], *, keep: Sequence[str] = None, drop: Sequence[str] = None -) -> Dict[str, Any]: + d: dict[str, Any], *, keep: Sequence[str] = None, drop: Sequence[str] = None +) -> dict[str, Any]: return {k: v for k, v in d.items() if (not keep or k in keep) and (not drop or k not in drop)} diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 13d1d354dd3d..dd4533538f5a 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -7,14 +7,13 @@ from itertools import islice import cv2 as cv from collections import namedtuple +from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence import hashlib import importlib import sys import traceback from contextlib import suppress, nullcontext -from typing import ( - Any, Callable, Dict, Generator, Iterable, Iterator, Optional, Mapping, Sequence, TypeVar, Union -) +from typing import Any, Callable, Optional, TypeVar, Union import subprocess import os import urllib.parse @@ -264,7 +263,7 @@ def get_rq_job_meta( return meta def reverse(viewname, *, args=None, kwargs=None, - query_params: Optional[Dict[str, str]] = None, + query_params: Optional[dict[str, str]] = None, request: Optional[HttpRequest] = None, ) -> str: """ @@ -283,7 +282,7 @@ def reverse(viewname, *, args=None, kwargs=None, def get_server_url(request: HttpRequest) -> str: return request.build_absolute_uri('/') -def build_field_filter_params(field: str, value: Any) -> Dict[str, str]: +def build_field_filter_params(field: str, value: Any) -> dict[str, str]: """ Builds a collection filter query params for a single field and value. """ diff --git a/cvat/apps/engine/view_utils.py b/cvat/apps/engine/view_utils.py index 2acb8bac780f..6f5dc298a7b6 100644 --- a/cvat/apps/engine/view_utils.py +++ b/cvat/apps/engine/view_utils.py @@ -4,7 +4,7 @@ # NOTE: importing in the utils.py header leads to circular importing -from typing import Optional, Type +from typing import Optional from django.db.models.query import QuerySet from django.http.request import HttpRequest @@ -23,9 +23,9 @@ def make_paginated_response( queryset: QuerySet, *, viewset: GenericViewSet, - response_type: Optional[Type[HttpResponse]] = None, - serializer_type: Optional[Type[Serializer]] = None, - request: Optional[Type[HttpRequest]] = None, + response_type: Optional[type[HttpResponse]] = None, + serializer_type: Optional[type[Serializer]] = None, + request: Optional[type[HttpRequest]] = None, **serializer_params ): # Adapted from the mixins.ListModelMixin.list() @@ -54,7 +54,7 @@ def make_paginated_response( return response_type(serializer.data) -def list_action(serializer_class: Type[Serializer], **kwargs): +def list_action(serializer_class: type[Serializer], **kwargs): params = dict( detail=True, methods=["GET"], diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 9692cfc2f750..eb39f6732c18 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -16,8 +16,9 @@ from contextlib import suppress from PIL import Image from types import SimpleNamespace -from typing import Optional, Any, Dict, List, Union, cast, Callable, Mapping, Iterable +from typing import Optional, Any, Union, cast, Callable from collections import namedtuple +from collections.abc import Mapping, Iterable from copy import copy from datetime import datetime from redis.exceptions import ConnectionError as RedisConnectionError @@ -1076,7 +1077,7 @@ def _maybe_append_upload_info_entry(self, filename: str): filename = self._prepare_upload_info_entry(filename) task_data.client_files.get_or_create(file=filename) - def _append_upload_info_entries(self, client_files: List[Dict[str, Any]]): + def _append_upload_info_entries(self, client_files: list[dict[str, Any]]): # batch version of _maybe_append_upload_info_entry() without optional insertion task_data = cast(Data, self._object.data) task_data.client_files.bulk_create([ @@ -1084,7 +1085,7 @@ def _append_upload_info_entries(self, client_files: List[Dict[str, Any]]): for cf in client_files ]) - def _sort_uploaded_files(self, uploaded_files: List[str], ordering: List[str]) -> List[str]: + def _sort_uploaded_files(self, uploaded_files: list[str], ordering: list[str]) -> list[str]: """ Applies file ordering for the "predefined" file sorting method of the task creation. @@ -3568,7 +3569,7 @@ def get_queryset(self): def queues(self) -> Iterable[DjangoRQ]: return (django_rq.get_queue(queue_name) for queue_name in self.SUPPORTED_QUEUES) - def _get_rq_jobs_from_queue(self, queue: DjangoRQ, user_id: int) -> List[RQJob]: + def _get_rq_jobs_from_queue(self, queue: DjangoRQ, user_id: int) -> list[RQJob]: job_ids = set(queue.get_job_ids() + queue.started_job_registry.get_job_ids() + queue.finished_job_registry.get_job_ids() + @@ -3588,7 +3589,7 @@ def _get_rq_jobs_from_queue(self, queue: DjangoRQ, user_id: int) -> List[RQJob]: return jobs - def _get_rq_jobs(self, user_id: int) -> List[RQJob]: + def _get_rq_jobs(self, user_id: int) -> list[RQJob]: """ Get all RQ jobs for a specific user and return them as a list of RQJob objects.