Skip to content

Commit

Permalink
add typing to aiokafka/record/* (#1001)
Browse files Browse the repository at this point in the history
* add typing to aiokafka/record/*

* add some annotations to tests/record

* fix almost all errors

* test w/o protocols

* Revert "test w/o protocols"

This reverts commit 7fa1efa.

* use TypeIs

* use dataclass

* remove timestamp/timestamp_type from cython DefaultRecord

* sync cython stubs with code

* simplify types
  • Loading branch information
dimastbk authored May 5, 2024
1 parent 1862620 commit c759664
Show file tree
Hide file tree
Showing 23 changed files with 1,021 additions and 343 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ FORMATTED_AREAS=\
aiokafka/structs.py \
aiokafka/util.py \
aiokafka/protocol/ \
aiokafka/record/ \
tests/test_codec.py \
tests/test_helpers.py
tests/test_helpers.py \
tests/record/

.PHONY: setup
setup:
Expand Down
7 changes: 4 additions & 3 deletions aiokafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import array
from typing import Iterable

# fmt: off
CRC_TABLE = (
Expand Down Expand Up @@ -97,7 +98,7 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(crc: int, data: Iterable[int]) -> int:
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
Expand All @@ -116,7 +117,7 @@ def crc_update(crc, data):
return crc ^ _MASK


def crc_finalize(crc):
def crc_finalize(crc: int) -> int:
"""Finalize CRC-32C checksum.
This function should be called as last step of crc calculation.
Args:
Expand All @@ -127,7 +128,7 @@ def crc_finalize(crc):
return crc & _MASK


def crc(data):
def crc(data: Iterable[int]) -> int:
"""Compute CRC-32C checksum of the data.
Args:
data: byte array, string or iterable over bytes.
Expand Down
8 changes: 8 additions & 0 deletions aiokafka/record/_crecords/cutil.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Callable

from typing_extensions import Buffer

def crc32c_cython(data: Buffer) -> int: ...
def decode_varint_cython(buffer: bytearray, pos: int) -> tuple[int, int]: ...
def encode_varint_cython(value: int, write: Callable[[int], None]) -> int: ...
def size_of_varint_cython(value: int) -> int: ...
4 changes: 2 additions & 2 deletions aiokafka/record/_crecords/default_records.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ cdef class DefaultRecord:

cdef:
readonly int64_t offset
int64_t timestamp
char timestamp_type
readonly int64_t timestamp
readonly char timestamp_type
readonly object key
readonly object value
readonly object headers
Expand Down
148 changes: 148 additions & 0 deletions aiokafka/record/_crecords/default_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import ClassVar, final

from typing_extensions import Literal, Self

from aiokafka.record._protocols import (
DefaultRecordBatchBuilderProtocol,
DefaultRecordBatchProtocol,
DefaultRecordMetadataProtocol,
DefaultRecordProtocol,
)
from aiokafka.record._types import (
CodecGzipT,
CodecLz4T,
CodecMaskT,
CodecNoneT,
CodecSnappyT,
CodecZstdT,
DefaultCompressionTypeT,
)

@final
class DefaultRecord(DefaultRecordProtocol):
def __init__(
self,
offset: int,
timestamp: int,
timestamp_type: int,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> None: ...
@property
def offset(self) -> int: ...
@property
def timestamp(self) -> int: ...
@property
def timestamp_type(self) -> int: ...
@property
def key(self) -> bytes | None: ...
@property
def value(self) -> bytes | None: ...
@property
def headers(self) -> list[tuple[str, bytes | None]]: ...
@property
def checksum(self) -> None: ...

@final
class DefaultRecordBatch(DefaultRecordBatchProtocol):
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_NONE: ClassVar[CodecNoneT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]
CODEC_ZSTD: ClassVar[CodecZstdT]

def __init__(self, buffer: bytes): ...
@property
def compression_type(self) -> int: ...
@property
def is_transactional(self) -> bool: ...
@property
def is_control_batch(self) -> bool: ...
@property
def next_offset(self) -> int: ...
def __iter__(self) -> Self: ...
def __next__(self) -> DefaultRecord: ...
def validate_crc(self) -> bool: ...
@property
def base_offset(self) -> int: ...
@property
def magic(self) -> int: ...
@property
def crc(self) -> int: ...
@property
def attributes(self) -> int: ...
@property
def last_offset_delta(self) -> int: ...
@property
def first_timestamp(self) -> int: ...
@property
def max_timestamp(self) -> int: ...
@property
def producer_id(self) -> int: ...
@property
def producer_epoch(self) -> int: ...
@property
def base_sequence(self) -> int: ...
@property
def timestamp_type(self) -> Literal[0, 1]: ...

@final
class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol):
producer_id: int
producer_epoch: int
base_sequence: int
def __init__(
self,
magic: int,
compression_type: DefaultCompressionTypeT,
is_transactional: int,
producer_id: int,
producer_epoch: int,
base_sequence: int,
batch_size: int,
) -> None: ...
def set_producer_state(
self, producer_id: int, producer_epoch: int, base_sequence: int
) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> DefaultRecordMetadata: ...
def build(self) -> bytearray: ...
def size(self) -> int: ...
def size_in_bytes(
self,
offset: int,
timestamp: int,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def size_of(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...
@classmethod
def estimate_size_in_bytes(
cls,
key: bytes | None,
value: bytes | None,
headers: list[tuple[str, bytes | None]],
) -> int: ...

@final
class DefaultRecordMetadata(DefaultRecordMetadataProtocol):
offset: int
size: int
timestamp: int
crc: None
def __init__(self, offset: int, size: int, timestamp: int): ...
14 changes: 0 additions & 14 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -448,20 +448,6 @@ cdef class DefaultRecord:
record.headers = headers
return record

@property
def timestamp(self):
if self.timestamp != -1:
return self.timestamp
else:
return None

@property
def timestamp_type(self):
if self.timestamp != -1:
return self.timestamp_type
else:
return None

def __repr__(self):
return (
"DefaultRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
Expand Down
95 changes: 95 additions & 0 deletions aiokafka/record/_crecords/legacy_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from typing import Any, ClassVar, Generator, final

from typing_extensions import Buffer, Literal, Never

from aiokafka.record._protocols import (
LegacyRecordBatchBuilderProtocol,
LegacyRecordBatchProtocol,
LegacyRecordMetadataProtocol,
LegacyRecordProtocol,
)
from aiokafka.record._types import (
CodecGzipT,
CodecLz4T,
CodecMaskT,
CodecSnappyT,
LegacyCompressionTypeT,
)

@final
class LegacyRecord(LegacyRecordProtocol):
def __init__(
self,
offset: int,
timestamp: int,
attributes: int,
key: bytes | None,
value: bytes | None,
crc: int,
) -> None: ...
@property
def offset(self) -> int: ...
@property
def key(self) -> bytes | None: ...
@property
def value(self) -> bytes | None: ...
@property
def headers(self) -> list[Never]: ...
@property
def timestamp(self) -> int | None: ...
@property
def timestamp_type(self) -> Literal[0, 1] | None: ...
@property
def checksum(self) -> int: ...

@final
class LegacyRecordBatch(LegacyRecordBatchProtocol):
RECORD_OVERHEAD_V0: ClassVar[int]
RECORD_OVERHEAD_V1: ClassVar[int]
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]

is_control_batch: bool
is_transactional: bool
producer_id: int | None
def __init__(self, buffer: Buffer, magic: int) -> None: ...
@property
def next_offset(self) -> int: ...
def validate_crc(self) -> bool: ...
def __iter__(self) -> Generator[LegacyRecord, None, None]: ...

@final
class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol):
CODEC_MASK: ClassVar[CodecMaskT]
CODEC_GZIP: ClassVar[CodecGzipT]
CODEC_SNAPPY: ClassVar[CodecSnappyT]
CODEC_LZ4: ClassVar[CodecLz4T]

def __init__(
self, magic: int, compression_type: LegacyCompressionTypeT, batch_size: int
) -> None: ...
def append(
self,
offset: int,
timestamp: int | None,
key: bytes | None,
value: bytes | None,
headers: Any = None,
) -> LegacyRecordMetadata: ...
def size(self) -> int: ...
def size_in_bytes(
self, offset: Any, timestamp: Any, key: Buffer | None, value: Buffer | None
) -> int: ...
@staticmethod
def record_overhead(magic: int) -> int: ...
def build(self) -> bytearray: ...

@final
class LegacyRecordMetadata(LegacyRecordMetadataProtocol):
offset: int
crc: int
size: int
timestamp: int
def __init__(self, offset: int, crc: int, size: int, timestamp: int) -> None: ...
13 changes: 13 additions & 0 deletions aiokafka/record/_crecords/memory_records.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from typing import final

from aiokafka.record._protocols import MemoryRecordsProtocol

from .default_records import DefaultRecordBatch
from .legacy_records import LegacyRecordBatch

@final
class MemoryRecords(MemoryRecordsProtocol):
def __init__(self, bytes_data: bytes) -> None: ...
def size_in_bytes(self) -> int: ...
def has_next(self) -> bool: ...
def next_batch(self) -> DefaultRecordBatch | LegacyRecordBatch | None: ...
Loading

0 comments on commit c759664

Please sign in to comment.