Skip to content

Commit

Permalink
Revert "test w/o protocols"
Browse files Browse the repository at this point in the history
This reverts commit 7fa1efa.
  • Loading branch information
dimastbk committed Apr 25, 2024
1 parent 7fa1efa commit 58bc26c
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 77 deletions.
21 changes: 16 additions & 5 deletions aiokafka/record/_crecords/default_records.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from typing import ClassVar
from typing import ClassVar, final

from typing_extensions import Literal, Self

class DefaultRecord:
from aiokafka.record._protocols import (
DefaultRecordBatchBuilderProtocol,
DefaultRecordBatchProtocol,
DefaultRecordMetadataProtocol,
DefaultRecordProtocol,
)

@final
class DefaultRecord(DefaultRecordProtocol):
offset: int
key: bytes | None
value: bytes | None
Expand All @@ -23,7 +31,8 @@ class DefaultRecord:
@property
def timestamp_type(self) -> int | None: ...

class DefaultRecordBatch:
@final
class DefaultRecordBatch(DefaultRecordBatchProtocol):
CODEC_NONE: ClassVar[int]
CODEC_MASK: ClassVar[int]
CODEC_GZIP: ClassVar[int]
Expand Down Expand Up @@ -58,7 +67,8 @@ class DefaultRecordBatch:
def __next__(self) -> DefaultRecord: ...
def validate_crc(self) -> bool: ...

class DefaultRecordBatchBuilder:
@final
class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol):
producer_id: int
producer_epoch: int
base_sequence: int
Expand Down Expand Up @@ -108,7 +118,8 @@ class DefaultRecordBatchBuilder:
headers: list[tuple[str, bytes | None]],
) -> int: ...

class DefaultRecordMetadata:
@final
class DefaultRecordMetadata(DefaultRecordMetadataProtocol):
offset: int
size: int
timestamp: int
Expand Down
21 changes: 16 additions & 5 deletions aiokafka/record/_crecords/legacy_records.pyi
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from typing import Any, ClassVar, Generator
from typing import Any, ClassVar, Generator, final

from typing_extensions import Buffer, Literal, Never

class LegacyRecord:
from aiokafka.record._protocols import (
LegacyRecordBatchBuilderProtocol,
LegacyRecordBatchProtocol,
LegacyRecordMetadataProtocol,
LegacyRecordProtocol,
)

@final
class LegacyRecord(LegacyRecordProtocol):
offset: int
attributes: int
key: bytes | None
Expand All @@ -26,7 +34,8 @@ class LegacyRecord:
@property
def checksum(self) -> int: ...

class LegacyRecordBatch:
@final
class LegacyRecordBatch(LegacyRecordBatchProtocol):
RECORD_OVERHEAD_V0: ClassVar[int]
RECORD_OVERHEAD_V1: ClassVar[int]
CODEC_MASK: ClassVar[int]
Expand All @@ -43,7 +52,8 @@ class LegacyRecordBatch:
def validate_crc(self) -> bool: ...
def __iter__(self) -> Generator[LegacyRecord, None, None]: ...

class LegacyRecordBatchBuilder:
@final
class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol):
CODEC_MASK: ClassVar[int]
CODEC_GZIP: ClassVar[int]
CODEC_SNAPPY: ClassVar[int]
Expand All @@ -66,7 +76,8 @@ class LegacyRecordBatchBuilder:
def record_overhead(magic: int) -> int: ...
def build(self) -> bytearray: ...

class LegacyRecordMetadata:
@final
class LegacyRecordMetadata(LegacyRecordMetadataProtocol):
offset: int
crc: int
size: int
Expand Down
7 changes: 6 additions & 1 deletion aiokafka/record/_crecords/memory_records.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import final

from aiokafka.record._protocols import MemoryRecordsProtocol

from .default_records import DefaultRecordBatch
from .legacy_records import LegacyRecordBatch

class MemoryRecords:
@final
class MemoryRecords(MemoryRecordsProtocol):
def __init__(self, bytes_data: bytes) -> None: ...
def size_in_bytes(self) -> int: ...
def has_next(self) -> bool: ...
Expand Down
262 changes: 262 additions & 0 deletions aiokafka/record/_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
from __future__ import annotations

from typing import (
Any,
ClassVar,
Iterable,
Iterator,
List,
Optional,
Protocol,
Tuple,
Union,
runtime_checkable,
)

from typing_extensions import Literal, Never


class DefaultRecordBatchBuilderProtocol(Protocol):
def __init__(
self,
magic: int,
compression_type: int,
is_transactional: int,
producer_id: int,
producer_epoch: int,
base_sequence: int,
batch_size: int,
): ...
def append(
self,
offset: int,
timestamp: Optional[int],
key: Optional[bytes],
value: Optional[bytes],
headers: List[Tuple[str, Optional[bytes]]],
) -> Optional[DefaultRecordMetadataProtocol]: ...
def build(self) -> bytearray: ...
def size(self) -> int: ...
def size_in_bytes(
self,
offset: int,
timestamp: int,
key: Optional[bytes],
value: Optional[bytes],
headers: List[Tuple[str, Optional[bytes]]],
) -> int: ...
@classmethod
def size_of(
cls,
key: Optional[bytes],
value: Optional[bytes],
headers: List[Tuple[str, Optional[bytes]]],
) -> int: ...
@classmethod
def estimate_size_in_bytes(
cls,
key: Optional[bytes],
value: Optional[bytes],
headers: List[Tuple[str, Optional[bytes]]],
) -> int: ...
def set_producer_state(
self, producer_id: int, producer_epoch: int, base_sequence: int
) -> None: ...
@property
def producer_id(self) -> int: ...
@property
def producer_epoch(self) -> int: ...
@property
def base_sequence(self) -> int: ...


class DefaultRecordMetadataProtocol(Protocol):
def __init__(self, offset: int, size: int, timestamp: int) -> None: ...
@property
def offset(self) -> int: ...
@property
def crc(self) -> None: ...
@property
def size(self) -> int: ...
@property
def timestamp(self) -> int: ...


class DefaultRecordBatchProtocol(Iterator["DefaultRecordProtocol"], Protocol):
CODEC_MASK: ClassVar[int]
CODEC_NONE: ClassVar[int]
CODEC_GZIP: ClassVar[int]
CODEC_SNAPPY: ClassVar[int]
CODEC_LZ4: ClassVar[int]
CODEC_ZSTD: ClassVar[int]

def __init__(self, buffer: Union[bytes, bytearray, memoryview]) -> None: ...
@property
def base_offset(self) -> int: ...
@property
def magic(self) -> int: ...
@property
def crc(self) -> int: ...
@property
def attributes(self) -> int: ...
@property
def compression_type(self) -> int: ...
@property
def timestamp_type(self) -> int: ...
@property
def is_transactional(self) -> bool: ...
@property
def is_control_batch(self) -> bool: ...
@property
def last_offset_delta(self) -> int: ...
@property
def first_timestamp(self) -> int: ...
@property
def max_timestamp(self) -> int: ...
@property
def producer_id(self) -> int: ...
@property
def producer_epoch(self) -> int: ...
@property
def base_sequence(self) -> int: ...
@property
def next_offset(self) -> int: ...
def validate_crc(self) -> bool: ...


@runtime_checkable
class DefaultRecordProtocol(Protocol):
def __init__(
self,
offset: int,
timestamp: int,
timestamp_type: int,
key: Optional[bytes],
value: Optional[bytes],
headers: List[Tuple[str, Optional[bytes]]],
) -> None: ...
@property
def offset(self) -> int: ...
@property
def timestamp(self) -> int:
"""Epoch milliseconds"""

@property
def timestamp_type(self) -> int:
"""CREATE_TIME(0) or APPEND_TIME(1)"""

@property
def key(self) -> Optional[bytes]:
"""Bytes key or None"""

@property
def value(self) -> Optional[bytes]:
"""Bytes value or None"""

@property
def headers(self) -> List[Tuple[str, Optional[bytes]]]: ...
@property
def checksum(self) -> None: ...


class LegacyRecordBatchBuilderProtocol(Protocol):
def __init__(
self, magic: Literal[0, 1], compression_type: int, batch_size: int
) -> None: ...
def append(
self,
offset: int,
timestamp: Optional[int],
key: Optional[bytes],
value: Optional[bytes],
headers: Any = None,
) -> Optional[LegacyRecordMetadataProtocol]: ...
def build(self) -> bytearray:
"""Compress batch to be ready for send"""

def size(self) -> int:
"""Return current size of data written to buffer"""

def size_in_bytes(
self,
offset: int,
timestamp: int,
key: Optional[bytes],
value: Optional[bytes],
) -> int:
"""Actual size of message to add"""

@classmethod
def record_overhead(cls, magic: int) -> int: ...


class LegacyRecordMetadataProtocol(Protocol):
def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ...
@property
def offset(self) -> int: ...
@property
def crc(self) -> int: ...
@property
def size(self) -> int: ...
@property
def timestamp(self) -> int: ...


class LegacyRecordBatchProtocol(Iterable["LegacyRecordProtocol"], Protocol):
CODEC_MASK: ClassVar[int]
CODEC_GZIP: ClassVar[int]
CODEC_SNAPPY: ClassVar[int]
CODEC_LZ4: ClassVar[int]

is_control_batch: bool
is_transactional: bool
producer_id: Optional[int]

def __init__(self, buffer: Union[bytes, bytearray, memoryview], magic: int): ...
@property
def next_offset(self) -> int: ...
def validate_crc(self) -> bool: ...


@runtime_checkable
class LegacyRecordProtocol(Protocol):
def __init__(
self,
offset: int,
timestamp: Optional[int],
timestamp_type: Optional[Literal[0, 1]],
key: Optional[bytes],
value: Optional[bytes],
crc: int,
) -> None: ...
@property
def offset(self) -> int: ...
@property
def timestamp(self) -> Optional[int]:
"""Epoch milliseconds"""

@property
def timestamp_type(self) -> Optional[Literal[0, 1]]:
"""CREATE_TIME(0) or APPEND_TIME(1)"""

@property
def key(self) -> Optional[bytes]:
"""Bytes key or None"""

@property
def value(self) -> Optional[bytes]:
"""Bytes value or None"""

@property
def headers(self) -> List[Never]: ...
@property
def checksum(self) -> int: ...


class MemoryRecordsProtocol(Protocol):
def __init__(self, bytes_data: bytes) -> None: ...
def size_in_bytes(self) -> int: ...
def has_next(self) -> bool: ...
def next_batch(
self,
) -> Optional[Union[DefaultRecordBatchProtocol, LegacyRecordBatchProtocol]]: ...
Loading

0 comments on commit 58bc26c

Please sign in to comment.