diff --git a/aiokafka/record/_crecords/default_records.pyi b/aiokafka/record/_crecords/default_records.pyi index 5e5b1826..1392aff0 100644 --- a/aiokafka/record/_crecords/default_records.pyi +++ b/aiokafka/record/_crecords/default_records.pyi @@ -1,8 +1,16 @@ -from typing import ClassVar +from typing import ClassVar, final from typing_extensions import Literal, Self -class DefaultRecord: +from aiokafka.record._protocols import ( + DefaultRecordBatchBuilderProtocol, + DefaultRecordBatchProtocol, + DefaultRecordMetadataProtocol, + DefaultRecordProtocol, +) + +@final +class DefaultRecord(DefaultRecordProtocol): offset: int key: bytes | None value: bytes | None @@ -23,7 +31,8 @@ class DefaultRecord: @property def timestamp_type(self) -> int | None: ... -class DefaultRecordBatch: +@final +class DefaultRecordBatch(DefaultRecordBatchProtocol): CODEC_NONE: ClassVar[int] CODEC_MASK: ClassVar[int] CODEC_GZIP: ClassVar[int] @@ -58,7 +67,8 @@ class DefaultRecordBatch: def __next__(self) -> DefaultRecord: ... def validate_crc(self) -> bool: ... -class DefaultRecordBatchBuilder: +@final +class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol): producer_id: int producer_epoch: int base_sequence: int @@ -108,7 +118,8 @@ class DefaultRecordBatchBuilder: headers: list[tuple[str, bytes | None]], ) -> int: ... -class DefaultRecordMetadata: +@final +class DefaultRecordMetadata(DefaultRecordMetadataProtocol): offset: int size: int timestamp: int diff --git a/aiokafka/record/_crecords/legacy_records.pyi b/aiokafka/record/_crecords/legacy_records.pyi index e58cc1d1..a78d071e 100644 --- a/aiokafka/record/_crecords/legacy_records.pyi +++ b/aiokafka/record/_crecords/legacy_records.pyi @@ -1,8 +1,16 @@ -from typing import Any, ClassVar, Generator +from typing import Any, ClassVar, Generator, final from typing_extensions import Buffer, Literal, Never -class LegacyRecord: +from aiokafka.record._protocols import ( + LegacyRecordBatchBuilderProtocol, + LegacyRecordBatchProtocol, + LegacyRecordMetadataProtocol, + LegacyRecordProtocol, +) + +@final +class LegacyRecord(LegacyRecordProtocol): offset: int attributes: int key: bytes | None @@ -26,7 +34,8 @@ class LegacyRecord: @property def checksum(self) -> int: ... -class LegacyRecordBatch: +@final +class LegacyRecordBatch(LegacyRecordBatchProtocol): RECORD_OVERHEAD_V0: ClassVar[int] RECORD_OVERHEAD_V1: ClassVar[int] CODEC_MASK: ClassVar[int] @@ -43,7 +52,8 @@ class LegacyRecordBatch: def validate_crc(self) -> bool: ... def __iter__(self) -> Generator[LegacyRecord, None, None]: ... -class LegacyRecordBatchBuilder: +@final +class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol): CODEC_MASK: ClassVar[int] CODEC_GZIP: ClassVar[int] CODEC_SNAPPY: ClassVar[int] @@ -66,7 +76,8 @@ class LegacyRecordBatchBuilder: def record_overhead(magic: int) -> int: ... def build(self) -> bytearray: ... -class LegacyRecordMetadata: +@final +class LegacyRecordMetadata(LegacyRecordMetadataProtocol): offset: int crc: int size: int diff --git a/aiokafka/record/_crecords/memory_records.pyi b/aiokafka/record/_crecords/memory_records.pyi index 5f02beee..d8dfa409 100644 --- a/aiokafka/record/_crecords/memory_records.pyi +++ b/aiokafka/record/_crecords/memory_records.pyi @@ -1,7 +1,12 @@ +from typing import final + +from aiokafka.record._protocols import MemoryRecordsProtocol + from .default_records import DefaultRecordBatch from .legacy_records import LegacyRecordBatch -class MemoryRecords: +@final +class MemoryRecords(MemoryRecordsProtocol): def __init__(self, bytes_data: bytes) -> None: ... def size_in_bytes(self) -> int: ... def has_next(self) -> bool: ... diff --git a/aiokafka/record/_protocols.py b/aiokafka/record/_protocols.py new file mode 100644 index 00000000..8dfc4d0e --- /dev/null +++ b/aiokafka/record/_protocols.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +from typing import ( + Any, + ClassVar, + Iterable, + Iterator, + List, + Optional, + Protocol, + Tuple, + Union, + runtime_checkable, +) + +from typing_extensions import Literal, Never + + +class DefaultRecordBatchBuilderProtocol(Protocol): + def __init__( + self, + magic: int, + compression_type: int, + is_transactional: int, + producer_id: int, + producer_epoch: int, + base_sequence: int, + batch_size: int, + ): ... + def append( + self, + offset: int, + timestamp: Optional[int], + key: Optional[bytes], + value: Optional[bytes], + headers: List[Tuple[str, Optional[bytes]]], + ) -> Optional[DefaultRecordMetadataProtocol]: ... + def build(self) -> bytearray: ... + def size(self) -> int: ... + def size_in_bytes( + self, + offset: int, + timestamp: int, + key: Optional[bytes], + value: Optional[bytes], + headers: List[Tuple[str, Optional[bytes]]], + ) -> int: ... + @classmethod + def size_of( + cls, + key: Optional[bytes], + value: Optional[bytes], + headers: List[Tuple[str, Optional[bytes]]], + ) -> int: ... + @classmethod + def estimate_size_in_bytes( + cls, + key: Optional[bytes], + value: Optional[bytes], + headers: List[Tuple[str, Optional[bytes]]], + ) -> int: ... + def set_producer_state( + self, producer_id: int, producer_epoch: int, base_sequence: int + ) -> None: ... + @property + def producer_id(self) -> int: ... + @property + def producer_epoch(self) -> int: ... + @property + def base_sequence(self) -> int: ... + + +class DefaultRecordMetadataProtocol(Protocol): + def __init__(self, offset: int, size: int, timestamp: int) -> None: ... + @property + def offset(self) -> int: ... + @property + def crc(self) -> None: ... + @property + def size(self) -> int: ... + @property + def timestamp(self) -> int: ... + + +class DefaultRecordBatchProtocol(Iterator["DefaultRecordProtocol"], Protocol): + CODEC_MASK: ClassVar[int] + CODEC_NONE: ClassVar[int] + CODEC_GZIP: ClassVar[int] + CODEC_SNAPPY: ClassVar[int] + CODEC_LZ4: ClassVar[int] + CODEC_ZSTD: ClassVar[int] + + def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: ... + @property + def base_offset(self) -> int: ... + @property + def magic(self) -> int: ... + @property + def crc(self) -> int: ... + @property + def attributes(self) -> int: ... + @property + def compression_type(self) -> int: ... + @property + def timestamp_type(self) -> int: ... + @property + def is_transactional(self) -> bool: ... + @property + def is_control_batch(self) -> bool: ... + @property + def last_offset_delta(self) -> int: ... + @property + def first_timestamp(self) -> int: ... + @property + def max_timestamp(self) -> int: ... + @property + def producer_id(self) -> int: ... + @property + def producer_epoch(self) -> int: ... + @property + def base_sequence(self) -> int: ... + @property + def next_offset(self) -> int: ... + def validate_crc(self) -> bool: ... + + +@runtime_checkable +class DefaultRecordProtocol(Protocol): + def __init__( + self, + offset: int, + timestamp: int, + timestamp_type: int, + key: Optional[bytes], + value: Optional[bytes], + headers: List[Tuple[str, Optional[bytes]]], + ) -> None: ... + @property + def offset(self) -> int: ... + @property + def timestamp(self) -> int: + """Epoch milliseconds""" + + @property + def timestamp_type(self) -> int: + """CREATE_TIME(0) or APPEND_TIME(1)""" + + @property + def key(self) -> Optional[bytes]: + """Bytes key or None""" + + @property + def value(self) -> Optional[bytes]: + """Bytes value or None""" + + @property + def headers(self) -> List[Tuple[str, Optional[bytes]]]: ... + @property + def checksum(self) -> None: ... + + +class LegacyRecordBatchBuilderProtocol(Protocol): + def __init__( + self, magic: Literal[0, 1], compression_type: int, batch_size: int + ) -> None: ... + def append( + self, + offset: int, + timestamp: Optional[int], + key: Optional[bytes], + value: Optional[bytes], + headers: Any = None, + ) -> Optional[LegacyRecordMetadataProtocol]: ... + def build(self) -> bytearray: + """Compress batch to be ready for send""" + + def size(self) -> int: + """Return current size of data written to buffer""" + + def size_in_bytes( + self, + offset: int, + timestamp: int, + key: Optional[bytes], + value: Optional[bytes], + ) -> int: + """Actual size of message to add""" + + @classmethod + def record_overhead(cls, magic: int) -> int: ... + + +class LegacyRecordMetadataProtocol(Protocol): + def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ... + @property + def offset(self) -> int: ... + @property + def crc(self) -> int: ... + @property + def size(self) -> int: ... + @property + def timestamp(self) -> int: ... + + +class LegacyRecordBatchProtocol(Iterable["LegacyRecordProtocol"], Protocol): + CODEC_MASK: ClassVar[int] + CODEC_GZIP: ClassVar[int] + CODEC_SNAPPY: ClassVar[int] + CODEC_LZ4: ClassVar[int] + + is_control_batch: bool + is_transactional: bool + producer_id: Optional[int] + + def __init__(self, buffer: Union[bytes, bytearray, memoryview], magic: int): ... + @property + def next_offset(self) -> int: ... + def validate_crc(self) -> bool: ... + + +@runtime_checkable +class LegacyRecordProtocol(Protocol): + def __init__( + self, + offset: int, + timestamp: Optional[int], + timestamp_type: Optional[Literal[0, 1]], + key: Optional[bytes], + value: Optional[bytes], + crc: int, + ) -> None: ... + @property + def offset(self) -> int: ... + @property + def timestamp(self) -> Optional[int]: + """Epoch milliseconds""" + + @property + def timestamp_type(self) -> Optional[Literal[0, 1]]: + """CREATE_TIME(0) or APPEND_TIME(1)""" + + @property + def key(self) -> Optional[bytes]: + """Bytes key or None""" + + @property + def value(self) -> Optional[bytes]: + """Bytes value or None""" + + @property + def headers(self) -> List[Never]: ... + @property + def checksum(self) -> int: ... + + +class MemoryRecordsProtocol(Protocol): + def __init__(self, bytes_data: bytes) -> None: ... + def size_in_bytes(self) -> int: ... + def has_next(self) -> bool: ... + def next_batch( + self, + ) -> Optional[Union[DefaultRecordBatchProtocol, LegacyRecordBatchProtocol]]: ... diff --git a/aiokafka/record/default_records.py b/aiokafka/record/default_records.py index eb385d7d..a210eb2e 100644 --- a/aiokafka/record/default_records.py +++ b/aiokafka/record/default_records.py @@ -53,21 +53,10 @@ # * Transactional (4) # * Timestamp Type (3) # * Compression Type (0-2) -from __future__ import annotations import struct import time -from typing import ( - TYPE_CHECKING, - Any, - Callable, - List, - Optional, - Sized, - Tuple, - Type, - Union, -) +from typing import Any, Callable, List, Optional, Sized, Tuple, Type, Union, final from typing_extensions import Self @@ -85,20 +74,14 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS +from ._protocols import ( + DefaultRecordBatchBuilderProtocol, + DefaultRecordBatchProtocol, + DefaultRecordMetadataProtocol, + DefaultRecordProtocol, +) from .util import calc_crc32c, decode_varint, encode_varint, size_of_varint -if TYPE_CHECKING: - DefaultRecordBatchBuilder: Union[ - Type[_DefaultRecordBatchBuilderPy], Type[_DefaultRecordBatchBuilderCython] - ] - DefaultRecordMetadata: Union[ - Type[_DefaultRecordMetadataPy], Type[_DefaultRecordMetadataCython] - ] - DefaultRecordBatch: Union[ - Type[_DefaultRecordBatchPy], Type[_DefaultRecordBatchCython] - ] - DefaultRecord: Union[Type[_DefaultRecordPy], Type[_DefaultRecordCython]] - class DefaultRecordBase: __slots__ = () @@ -157,7 +140,8 @@ def _assert_has_codec(self, compression_type: int) -> None: ) -class _DefaultRecordBatchPy(DefaultRecordBase): +@final +class _DefaultRecordBatchPy(DefaultRecordBase, DefaultRecordBatchProtocol): def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: self._buffer = bytearray(buffer) self._header_data: Tuple[ @@ -248,7 +232,7 @@ def _maybe_uncompress(self) -> None: def _read_msg( self, decode_varint: Callable[[bytearray, int], Tuple[int, int]] = decode_varint - ) -> _DefaultRecordPy: + ) -> "_DefaultRecordPy": # Record => # Length => Varint # Attributes => Int8 @@ -332,7 +316,7 @@ def __iter__(self) -> Self: self._maybe_uncompress() return self - def __next__(self) -> _DefaultRecordPy: + def __next__(self) -> "_DefaultRecordPy": if self._next_record_index >= self._num_records: if self._pos != len(self._buffer): raise CorruptRecordException( @@ -359,7 +343,8 @@ def validate_crc(self) -> bool: return crc == verify_crc -class _DefaultRecordPy: +@final +class _DefaultRecordPy(DefaultRecordProtocol): __slots__ = ( "_offset", "_timestamp", @@ -425,7 +410,10 @@ def __repr__(self) -> str: ) -class _DefaultRecordBatchBuilderPy(DefaultRecordBase): +@final +class _DefaultRecordBatchBuilderPy( + DefaultRecordBase, DefaultRecordBatchBuilderProtocol +): # excluding key, value and headers: # 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes MAX_RECORD_OVERHEAD = 21 @@ -488,7 +476,7 @@ def append( bytearray_type: Type[bytearray] = bytearray, len_func: Callable[[Sized], int] = len, zero_len_varint: int = 1, - ) -> Optional[_DefaultRecordMetadataPy]: + ) -> Optional["_DefaultRecordMetadataPy"]: """Write message to messageset buffer with MsgVersion 2""" # Check types if get_type(offset) != type_int: @@ -713,7 +701,8 @@ def base_sequence(self) -> int: return self._base_sequence -class _DefaultRecordMetadataPy: +@final +class _DefaultRecordMetadataPy(DefaultRecordMetadataProtocol): __slots__ = ("_size", "_timestamp", "_offset") def __init__(self, offset: int, size: int, timestamp: int) -> None: @@ -744,6 +733,11 @@ def __repr__(self) -> str: ) +DefaultRecordBatchBuilder: Type[DefaultRecordBatchBuilderProtocol] +DefaultRecordMetadata: Type[DefaultRecordMetadataProtocol] +DefaultRecordBatch: Type[DefaultRecordBatchProtocol] +DefaultRecord: Type[DefaultRecordProtocol] + if NO_EXTENSIONS: DefaultRecordBatchBuilder = _DefaultRecordBatchBuilderPy DefaultRecordMetadata = _DefaultRecordMetadataPy diff --git a/aiokafka/record/legacy_records.py b/aiokafka/record/legacy_records.py index 3336ffac..f7e1e804 100644 --- a/aiokafka/record/legacy_records.py +++ b/aiokafka/record/legacy_records.py @@ -3,17 +3,7 @@ import struct import time from binascii import crc32 -from typing import ( - TYPE_CHECKING, - Any, - Generator, - List, - Optional, - Tuple, - Type, - Union, - final, -) +from typing import Any, Generator, List, Optional, Tuple, Type, Union, final from typing_extensions import Literal, Never @@ -29,15 +19,12 @@ from aiokafka.errors import CorruptRecordException, UnsupportedCodecError from aiokafka.util import NO_EXTENSIONS -if TYPE_CHECKING: - LegacyRecordBatchBuilder: Union[ - Type[_LegacyRecordBatchBuilderPy], Type[_LegacyRecordBatchBuilderCython] - ] - LegacyRecordMetadata: Union[ - Type[_LegacyRecordMetadataPy], Type[_LegacyRecordMetadataCython] - ] - LegacyRecordBatch: Union[Type[_LegacyRecordBatchPy], Type[_LegacyRecordBatchCython]] - LegacyRecord: Union[Type[_LegacyRecordPy], Type[_LegacyRecordCython]] +from ._protocols import ( + LegacyRecordBatchBuilderProtocol, + LegacyRecordBatchProtocol, + LegacyRecordMetadataProtocol, + LegacyRecordProtocol, +) NoneType = type(None) @@ -118,7 +105,7 @@ def _assert_has_codec(self, compression_type: int) -> None: @final -class _LegacyRecordBatchPy(LegacyRecordBase): +class _LegacyRecordBatchPy(LegacyRecordBase, LegacyRecordBatchProtocol): is_control_batch: bool = False is_transactional: bool = False producer_id: Optional[int] = None @@ -290,7 +277,7 @@ def __iter__(self) -> Generator[_LegacyRecordPy, None, None]: @final -class _LegacyRecordPy: +class _LegacyRecordPy(LegacyRecordProtocol): __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", "_crc") def __init__( @@ -350,7 +337,7 @@ def __repr__(self) -> str: @final -class _LegacyRecordBatchBuilderPy(LegacyRecordBase): +class _LegacyRecordBatchBuilderPy(LegacyRecordBase, LegacyRecordBatchBuilderProtocol): _buffer: Optional[bytearray] = None def __init__( @@ -564,7 +551,7 @@ def record_overhead(cls, magic: int) -> int: @final -class _LegacyRecordMetadataPy: +class _LegacyRecordMetadataPy(LegacyRecordMetadataProtocol): __slots__ = ("_crc", "_size", "_timestamp", "_offset") def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: @@ -597,6 +584,11 @@ def __repr__(self) -> str: ) +LegacyRecordBatchBuilder: Type[LegacyRecordBatchBuilderProtocol] +LegacyRecordMetadata: Type[LegacyRecordMetadataProtocol] +LegacyRecordBatch: Type[LegacyRecordBatchProtocol] +LegacyRecord: Type[LegacyRecordProtocol] + if NO_EXTENSIONS: LegacyRecordBatchBuilder = _LegacyRecordBatchBuilderPy LegacyRecordMetadata = _LegacyRecordMetadataPy diff --git a/aiokafka/record/memory_records.py b/aiokafka/record/memory_records.py index 5782728f..b618d4a8 100644 --- a/aiokafka/record/memory_records.py +++ b/aiokafka/record/memory_records.py @@ -18,22 +18,24 @@ # # So we can iterate over batches just by knowing offsets of Length. Magic is # used to construct the correct class for Batch itself. -from __future__ import annotations import struct -from typing import TYPE_CHECKING, Optional, Type, Union +from typing import Optional, Type, Union, final from aiokafka.errors import CorruptRecordException from aiokafka.util import NO_EXTENSIONS -from .default_records import _DefaultRecordBatchPy -from .legacy_records import _LegacyRecordBatchPy +from ._protocols import ( + DefaultRecordBatchProtocol, + LegacyRecordBatchProtocol, + MemoryRecordsProtocol, +) +from .default_records import DefaultRecordBatch +from .legacy_records import LegacyRecordBatch, _LegacyRecordBatchPy -if TYPE_CHECKING: - MemoryRecords: Union[Type[_MemoryRecordsPy], Type[_MemoryRecordsCython]] - -class _MemoryRecordsPy: +@final +class _MemoryRecordsPy(MemoryRecordsProtocol): LENGTH_OFFSET = struct.calcsize(">q") LOG_OVERHEAD = struct.calcsize(">qi") MAGIC_OFFSET = struct.calcsize(">qii") @@ -85,7 +87,7 @@ def has_next(self) -> bool: # NOTE: same cache for LOAD_FAST as above def next_batch( self, _min_slice: int = MIN_SLICE, _magic_offset: int = MAGIC_OFFSET - ) -> Optional[Union[_DefaultRecordBatchPy, _LegacyRecordBatchPy]]: + ) -> Optional[Union[DefaultRecordBatchProtocol, LegacyRecordBatchProtocol]]: next_slice = self._next_slice if next_slice is None: return None @@ -97,10 +99,12 @@ def next_batch( self._cache_next() magic = next_slice[_magic_offset] if magic >= 2: # pragma: no cover - return _DefaultRecordBatchPy(next_slice) + return DefaultRecordBatch(next_slice) else: - return _LegacyRecordBatchPy(next_slice, magic) + return LegacyRecordBatch(next_slice, magic) + +MemoryRecords: Type[MemoryRecordsProtocol] if NO_EXTENSIONS: MemoryRecords = _MemoryRecordsPy diff --git a/tests/record/test_records.py b/tests/record/test_records.py index b7c23f8b..3323d159 100644 --- a/tests/record/test_records.py +++ b/tests/record/test_records.py @@ -2,6 +2,7 @@ from aiokafka.errors import CorruptRecordException from aiokafka.record import MemoryRecords +from aiokafka.record._protocols import DefaultRecordProtocol, LegacyRecordProtocol # This is real live data from Kafka 11 broker record_batch_data_v2 = [ @@ -67,6 +68,7 @@ def test_memory_records_v2() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 + assert isinstance(recs[0], DefaultRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp == 1503229838908 @@ -93,6 +95,7 @@ def test_memory_records_v1() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 + assert isinstance(recs[0], LegacyRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp == 1503648000942 @@ -121,6 +124,7 @@ def test_memory_records_v0() -> None: assert batch is not None recs = tuple(batch) assert len(recs) == 1 + assert isinstance(recs[0], LegacyRecordProtocol) assert recs[0].value == b"123" assert recs[0].key is None assert recs[0].timestamp is None