Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add kafka concurrent subscriber #1912

Merged
merged 11 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: |
3.8
3.9
3.10
python-version: "3.12"
- name: Set $PY environment variable
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- uses: actions/cache@v4
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,14 @@ search:
- subscriber
- asyncapi
- [AsyncAPIBatchSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIBatchSubscriber.md)
- [AsyncAPIConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md)
- [AsyncAPIDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md)
- [AsyncAPISubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPISubscriber.md)
- factory
- [create_subscriber](api/faststream/kafka/subscriber/factory/create_subscriber.md)
- usecase
- [BatchSubscriber](api/faststream/kafka/subscriber/usecase/BatchSubscriber.md)
- [ConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentDefaultSubscriber.md)
- [DefaultSubscriber](api/faststream/kafka/subscriber/usecase/DefaultSubscriber.md)
- [LogicSubscriber](api/faststream/kafka/subscriber/usecase/LogicSubscriber.md)
- testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentDefaultSubscriber
4 changes: 2 additions & 2 deletions faststream/broker/fastapi/get_dependant.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
lambda x: isinstance(x, FieldInfo),
p.field_info.metadata or (),
),
Field(**field_data), # type: ignore[pydantic-field]
Field(**field_data),
)

else:
Expand All @@ -109,7 +109,7 @@ def _patch_fastapi_dependent(dependant: "Dependant") -> "Dependant":
"le": info.field_info.le,
}
)
f = Field(**field_data) # type: ignore[pydantic-field]
f = Field(**field_data)

params_unique[p.name] = (
info.annotation,
Expand Down
8 changes: 4 additions & 4 deletions faststream/broker/subscriber/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@


class TasksMixin(SubscriberUsecase[Any]):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.tasks: List[asyncio.Task[Any]] = []

def add_task(self, coro: Coroutine[Any, Any, Any]) -> None:
Expand All @@ -40,7 +40,7 @@ class ConcurrentMixin(TasksMixin):

def __init__(
self,
*,
*args: Any,
max_workers: int,
**kwargs: Any,
) -> None:
Expand All @@ -51,7 +51,7 @@ def __init__(
)
self.limiter = anyio.Semaphore(max_workers)

super().__init__(**kwargs)
super().__init__(*args, **kwargs)

def start_consume_task(self) -> None:
self.add_task(self._serve_consume_queue())
Expand Down
20 changes: 1 addition & 19 deletions faststream/broker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)

import anyio
from typing_extensions import Literal, Self, overload
from typing_extensions import Self

from faststream.broker.acknowledgement_watcher import WatcherContext, get_watcher
from faststream.broker.types import MsgType
Expand All @@ -35,24 +35,6 @@
from faststream.types import LoggerProto


@overload
async def process_msg(
msg: Literal[None],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> None: ...


@overload
async def process_msg(
msg: MsgType,
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> "StreamMessage[MsgType]": ...


async def process_msg(
msg: Optional[MsgType],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
Partition = TypeVar("Partition")


class KafkaBroker(
class KafkaBroker( # type: ignore[misc]
KafkaRegistrator,
KafkaLoggingBroker,
):
Expand Down
108 changes: 54 additions & 54 deletions faststream/confluent/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ class KafkaRegistrator(
):
"""Includable to KafkaBroker router."""

_subscribers: Dict[
_subscribers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber"]
]
_publishers: Dict[int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"]]
_publishers: Dict[ # type: ignore[assignment]
int, Union["AsyncAPIBatchPublisher", "AsyncAPIDefaultPublisher"]
]

@overload # type: ignore[override]
def subscriber(
Expand Down Expand Up @@ -1193,60 +1195,56 @@ def subscriber(
if not auto_commit and not group_id:
raise SetupError("You should install `group_id` with manual commit mode")

subscriber = super().subscriber(
create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_middlewares=self._middlewares,
broker_dependencies=self._dependencies,
# AsyncAPI
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
)
subscriber = create_subscriber(
*topics,
polling_interval=polling_interval,
partitions=partitions,
batch=batch,
max_records=max_records,
group_id=group_id,
connection_data={
"group_instance_id": group_instance_id,
"fetch_max_wait_ms": fetch_max_wait_ms,
"fetch_max_bytes": fetch_max_bytes,
"fetch_min_bytes": fetch_min_bytes,
"max_partition_fetch_bytes": max_partition_fetch_bytes,
"auto_offset_reset": auto_offset_reset,
"enable_auto_commit": auto_commit,
"auto_commit_interval_ms": auto_commit_interval_ms,
"check_crcs": check_crcs,
"partition_assignment_strategy": partition_assignment_strategy,
"max_poll_interval_ms": max_poll_interval_ms,
"session_timeout_ms": session_timeout_ms,
"heartbeat_interval_ms": heartbeat_interval_ms,
"isolation_level": isolation_level,
},
is_manual=not auto_commit,
# subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_middlewares=self._middlewares,
broker_dependencies=self._dependencies,
# AsyncAPI
title_=title,
description_=description,
include_in_schema=self._solve_include_in_schema(include_in_schema),
)

if batch:
return cast("AsyncAPIBatchSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
subscriber = cast("AsyncAPIBatchSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
subscriber = cast("AsyncAPIDefaultSubscriber", subscriber)

subscriber = super().subscriber(subscriber) # type: ignore[arg-type,assignment]

return subscriber.add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)

@overload # type: ignore[override]
def publisher(
Expand Down Expand Up @@ -1577,6 +1575,8 @@ def publisher(
)

if batch:
return cast("AsyncAPIBatchPublisher", super().publisher(publisher))
publisher = cast("AsyncAPIBatchPublisher", publisher)
else:
return cast("AsyncAPIDefaultPublisher", super().publisher(publisher))
publisher = cast("AsyncAPIDefaultPublisher", publisher)

return super().publisher(publisher) # type: ignore[return-value,arg-type]
6 changes: 3 additions & 3 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
}
)

self.producer = Producer(final_config, logger=self.logger)
self.producer = Producer(final_config)

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())
Expand Down Expand Up @@ -312,7 +312,7 @@ def __init__(
)

self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger)
self.consumer = Consumer(final_config)

@property
def topics_to_create(self) -> List[str]:
Expand Down Expand Up @@ -381,7 +381,7 @@ async def getmany(
) -> Tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume,
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def __init__(
graceful_timeout=graceful_timeout,
decoder=decoder,
parser=parser,
middlewares=middlewares,
middlewares=middlewares, # type: ignore[arg-type]
schema_url=schema_url,
setup_state=setup_state,
# logger options
Expand Down
Loading
Loading