Skip to content

Commit

Permalink
Use monkeytype to create some semblance of typing (dpkp#173)
Browse files Browse the repository at this point in the history
* Add typing

* define types as Struct for simplicity's sake
  • Loading branch information
wbarnha authored and bradenneal1 committed May 16, 2024
1 parent 2199bf8 commit 03898de
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 363 deletions.
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class AbstractPartitionAssignor(object):
partition counts which are always needed in assignors).
"""

@abc.abstractproperty
@abc.abstractmethod
def name(self):
""".name should be a string identifying the assignor"""
pass
Expand Down
1 change: 0 additions & 1 deletion kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.cluster import ClusterMetadata
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
Expand Down
9 changes: 5 additions & 4 deletions kafka/errors.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import inspect
import sys
from typing import Any


class KafkaError(RuntimeError):
retriable = False
# whether metadata should be refreshed on error
invalid_metadata = False

def __str__(self):
def __str__(self) -> str:
if not self.args:
return self.__class__.__name__
return '{}: {}'.format(self.__class__.__name__,
Expand Down Expand Up @@ -65,7 +66,7 @@ class IncompatibleBrokerVersion(KafkaError):


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(
"""Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
Expand All @@ -92,7 +93,7 @@ class BrokerResponseError(KafkaError):
message = None
description = None

def __str__(self):
def __str__(self) -> str:
"""Add errno to standard KafkaError str"""
return '[Error {}] {}'.format(
self.errno,
Expand Down Expand Up @@ -509,7 +510,7 @@ def _iter_broker_errors():
kafka_errors = {x.errno: x for x in _iter_broker_errors()}


def for_code(error_code):
def for_code(error_code: int) -> Any:
return kafka_errors.get(error_code, UnknownError)


Expand Down
14 changes: 7 additions & 7 deletions kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ class Request(Struct):

FLEXIBLE_VERSION = False

@abc.abstractproperty
@abc.abstractmethod
def API_KEY(self):
"""Integer identifier for api request"""
pass

@abc.abstractproperty
@abc.abstractmethod
def API_VERSION(self):
"""Integer of api request version"""
pass

@abc.abstractproperty
@abc.abstractmethod
def SCHEMA(self):
"""An instance of Schema() representing the request structure"""
pass

@abc.abstractproperty
@abc.abstractmethod
def RESPONSE_TYPE(self):
"""The Response class associated with the api request"""
pass
Expand All @@ -93,17 +93,17 @@ def parse_response_header(self, read_buffer):
class Response(Struct):
__metaclass__ = abc.ABCMeta

@abc.abstractproperty
@abc.abstractmethod
def API_KEY(self):
"""Integer identifier for api request/response"""
pass

@abc.abstractproperty
@abc.abstractmethod
def API_VERSION(self):
"""Integer of api request/response version"""
pass

@abc.abstractproperty
@abc.abstractmethod
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass
Expand Down
14 changes: 8 additions & 6 deletions kafka/protocol/struct.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from io import BytesIO
from typing import List, Union

from kafka.protocol.abstract import AbstractType
from kafka.protocol.types import Schema


from kafka.util import WeakMethod


class Struct(AbstractType):
SCHEMA = Schema()

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
if len(args) == len(self.SCHEMA.fields):
for i, name in enumerate(self.SCHEMA.names):
self.__dict__[name] = args[i]
Expand All @@ -36,23 +38,23 @@ def encode(cls, item): # pylint: disable=E0202
bits.append(field.encode(item[i]))
return b''.join(bits)

def _encode_self(self):
def _encode_self(self) -> bytes:
return self.SCHEMA.encode(
[self.__dict__[name] for name in self.SCHEMA.names]
)

@classmethod
def decode(cls, data):
def decode(cls, data: Union[BytesIO, bytes]) -> "Struct":
if isinstance(data, bytes):
data = BytesIO(data)
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])

def get_item(self, name):
def get_item(self, name: str) -> Union[int, List[List[Union[int, str, bool, List[List[Union[int, List[int]]]]]]], str, List[List[Union[int, str]]]]:
if name not in self.SCHEMA.names:
raise KeyError("%s is not in the schema" % name)
return self.__dict__[name]

def __repr__(self):
def __repr__(self) -> str:
key_vals = []
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
key_vals.append(f'{name}={field.repr(self.__dict__[name])}')
Expand All @@ -61,7 +63,7 @@ def __repr__(self):
def __hash__(self):
return hash(self.encode())

def __eq__(self, other):
def __eq__(self, other: "Struct") -> bool:
if self.SCHEMA != other.SCHEMA:
return False
for attr in self.SCHEMA.names:
Expand Down
6 changes: 3 additions & 3 deletions kafka/record/_crc32c.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
_MASK = 0xFFFFFFFF


def crc_update(crc, data):
def crc_update(crc: int, data: bytes) -> int:
"""Update CRC-32C checksum with data.
Args:
crc: 32-bit checksum to update as long.
Expand All @@ -116,7 +116,7 @@ def crc_update(crc, data):
return crc ^ _MASK


def crc_finalize(crc):
def crc_finalize(crc: int) -> int:
"""Finalize CRC-32C checksum.
This function should be called as last step of crc calculation.
Args:
Expand All @@ -127,7 +127,7 @@ def crc_finalize(crc):
return crc & _MASK


def crc(data):
def crc(data: bytes) -> int:
"""Compute CRC-32C checksum of the data.
Args:
data: byte array, string or iterable over bytes.
Expand Down
14 changes: 7 additions & 7 deletions kafka/record/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ class ABCRecord:
__metaclass__ = abc.ABCMeta
__slots__ = ()

@abc.abstractproperty
@abc.abstractmethod
def offset(self):
""" Absolute offset of record
"""

@abc.abstractproperty
@abc.abstractmethod
def timestamp(self):
""" Epoch milliseconds
"""

@abc.abstractproperty
@abc.abstractmethod
def timestamp_type(self):
""" CREATE_TIME(0) or APPEND_TIME(1)
"""

@abc.abstractproperty
@abc.abstractmethod
def key(self):
""" Bytes key or None
"""

@abc.abstractproperty
@abc.abstractmethod
def value(self):
""" Bytes value or None
"""

@abc.abstractproperty
@abc.abstractmethod
def checksum(self):
""" Prior to v2 format CRC was contained in every message. This will
be the checksum for v0 and v1 and None for v2 and above.
"""

@abc.abstractproperty
@abc.abstractmethod
def headers(self):
""" If supported by version list of key-value tuples, or empty list if
not supported by format.
Expand Down
Loading

0 comments on commit 03898de

Please sign in to comment.