Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Kafka acknowledgement #793

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4b01088
docs: fix GH README
Lancetnik Sep 17, 2023
dc0d5b8
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 17, 2023
b66e6d3
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 17, 2023
a967edd
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 17, 2023
62fd3b2
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 18, 2023
9e06127
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 20, 2023
f0815de
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 21, 2023
3d261ed
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 22, 2023
3f5ee9b
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 22, 2023
ef5b65c
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 25, 2023
ae49c41
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 25, 2023
73e6255
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 26, 2023
ae9b2b4
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 26, 2023
0be9832
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 26, 2023
b3cc126
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 26, 2023
c96c307
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Sep 27, 2023
71ffc66
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Oct 2, 2023
263429e
Merge branch 'main' of github.com:airtai/faststream into main
Lancetnik Oct 3, 2023
77d720c
refactor: create ABC TestBroker with shared logic
Lancetnik Oct 3, 2023
a750e73
feat: support Kafka manual ack
Lancetnik Oct 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions examples/kafka/ack_after_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber(
"test",
group_id="group",
auto_commit=False,
)
async def handler(msg: str, logger: Logger):
logger.info(msg)


@app.after_startup
async def test():
await broker.publish("Hi!", "test")
15 changes: 5 additions & 10 deletions faststream/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@
)
from fast_depends._compat import FieldInfo
from pydantic import BaseModel
from typing_extensions import TypedDict as TypedDict
from typing_extensions import override as override

# TODO: uncomment with py3.12 release 2023-10-02
# if sys.version_info < (3, 12):
# from typing_extensions import override as override
# from typing_extensions import TypedDict as TypedDict
# else:
# from typing import override
# from typing import TypedDict as TypedDict

if sys.version_info < (3, 12):
from typing_extensions import TypedDict as TypedDict
from typing_extensions import override as override
else:
from typing import TypedDict as TypedDict

if sys.version_info < (3, 11):
from typing_extensions import Never as Never
Expand Down
4 changes: 3 additions & 1 deletion faststream/broker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
async with AsyncExitStack() as stack:
gl_middlewares: List[BaseMiddleware] = []

stack.enter_context(context.scope("handler_", self))

for m in self.global_middlewares:
gl_middlewares.append(await stack.enter_async_context(m(msg)))

Expand Down Expand Up @@ -308,7 +310,7 @@ async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[overri
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_msg)
m_pub.publish_scope(result_to_send)
)
)

Expand Down
120 changes: 118 additions & 2 deletions faststream/broker/test.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
from types import TracebackType
from typing import Any, Dict, Optional, Type
from abc import abstractmethod
from contextlib import asynccontextmanager
from functools import partial
from types import MethodType, TracebackType
from typing import Any, AsyncGenerator, Dict, Generic, Optional, Type, TypeVar
from unittest.mock import AsyncMock

import anyio
from anyio.abc._tasks import TaskGroup

from faststream.app import FastStream
from faststream.broker.core.abc import BrokerUsecase
from faststream.broker.core.asyncronous import BrokerAsyncUsecase
from faststream.broker.handler import AsyncHandler
from faststream.broker.middlewares import CriticalLogMiddleware
from faststream.broker.wrapper import HandlerCallWrapper
from faststream.types import SendableMessage, SettingField
from faststream.utils.functions import timeout_scope

Broker = TypeVar("Broker", bound=BrokerAsyncUsecase[Any, Any])


class TestApp:
# make sure pytest doesn't try to collect this class as a test class
Expand Down Expand Up @@ -87,6 +95,114 @@ async def __aexit__(
await self._task.__aexit__(None, None, None)


class TestBroker(Generic[Broker]):
# This is set so pytest ignores this class
__test__ = False

def __init__(
self,
broker: Broker,
with_real: bool = False,
connect_only: bool = False,
):
self.with_real = with_real
self.broker = broker
self.connect_only = connect_only

async def __aenter__(self) -> Broker:
self._ctx = self._create_ctx()
return await self._ctx.__aenter__()

async def __aexit__(self, *args: Any) -> None:
await self._ctx.__aexit__(*args)

@asynccontextmanager
async def _create_ctx(self) -> AsyncGenerator[Broker, None]:
if not self.with_real:
self._patch_test_broker(self.broker)
else:
self._fake_start(self.broker)

async with self.broker:
try:
if not self.connect_only:
await self.broker.start()
yield self.broker
finally:
self._fake_close(self.broker)

@classmethod
def _patch_test_broker(cls, broker: Broker) -> None:
broker.start = AsyncMock(wraps=partial(cls._fake_start, broker)) # type: ignore[method-assign]
broker._connect = MethodType(cls._fake_connect, broker) # type: ignore[method-assign]
broker.close = AsyncMock() # type: ignore[method-assign]

@classmethod
def _fake_start(cls, broker: Broker, *args: Any, **kwargs: Any) -> None:
for key, p in broker._publishers.items():
if getattr(p, "_fake_handler", False):
continue

handler = broker.handlers.get(key)
if handler is not None:
for f, _, _, _, _, _ in handler.calls:
f.mock.side_effect = p.mock
else:
p._fake_handler = True
f = cls.create_publisher_fake_subscriber(broker, p)
p.mock = f.mock

cls.patch_publisher(broker, p)

patch_broker_calls(broker)

@classmethod
def _fake_close(
cls,
broker: Broker,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exec_tb: Optional[TracebackType] = None,
) -> None:
broker.middlewares = [
CriticalLogMiddleware(broker.logger, broker.log_level),
*broker.middlewares,
]

for p in broker._publishers.values():
p.mock.reset_mock()
if getattr(p, "_fake_handler", False):
cls.remove_publisher_fake_subscriber(broker, p)
p._fake_handler = False
p.mock.reset_mock()

for h in broker.handlers.values():
for f, _, _, _, _, _ in h.calls:
f.refresh(with_mock=True)

@staticmethod
@abstractmethod
def create_publisher_fake_subscriber(
broker: Broker, publisher: Any
) -> HandlerCallWrapper[Any, Any, Any]:
raise NotImplementedError()

@staticmethod
@abstractmethod
def remove_publisher_fake_subscriber(broker: Broker, publisher: Any) -> None:
raise NotImplementedError()

@staticmethod
@abstractmethod
async def _fake_connect(broker: Broker, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError()

@staticmethod
@abstractmethod
def patch_publisher(broker: Broker, publisher: Any) -> None:
raise NotImplementedError()


def patch_broker_calls(broker: BrokerUsecase[Any, Any]) -> None:
"""Patch broker calls.

Expand Down
10 changes: 7 additions & 3 deletions faststream/kafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def subscriber( # type: ignore[override]
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down Expand Up @@ -336,7 +336,7 @@ def subscriber( # type: ignore[override]
fetch_min_bytes (int): The minimum number of bytes to fetch.
max_partition_fetch_bytes (int): The maximum bytes to fetch for a partition.
auto_offset_reset (Literal["latest", "earliest", "none"]): Auto offset reset policy.
enable_auto_commit (bool): Whether to enable auto-commit.
auto_commit (bool): Whether to enable auto-commit.
auto_commit_interval_ms (int): Auto-commit interval in milliseconds.
check_crcs (bool): Whether to check CRCs.
partition_assignment_strategy (Sequence[AbstractPartitionAssignor]): Partition assignment strategy.
Expand Down Expand Up @@ -367,6 +367,9 @@ def subscriber( # type: ignore[override]

self._setup_log_context(topics, group_id)

if not auto_commit and not group_id:
raise ValueError("You should install `group_id` with manual commit mode")

key = Handler.get_routing_hash(topics, group_id)
builder = partial(
aiokafka.AIOKafkaConsumer,
Expand All @@ -377,7 +380,7 @@ def subscriber( # type: ignore[override]
fetch_min_bytes=fetch_min_bytes,
max_partition_fetch_bytes=max_partition_fetch_bytes,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
enable_auto_commit=auto_commit,
auto_commit_interval_ms=auto_commit_interval_ms,
check_crcs=check_crcs,
partition_assignment_strategy=partition_assignment_strategy,
Expand All @@ -399,6 +402,7 @@ def subscriber( # type: ignore[override]
topics=topics,
group_id=group_id,
),
is_manual=not auto_commit,
group_id=group_id,
client_id=self.client_id,
builder=builder,
Expand Down
4 changes: 2 additions & 2 deletions faststream/kafka/broker.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class KafkaBroker(
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down Expand Up @@ -266,7 +266,7 @@ class KafkaBroker(
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down
8 changes: 4 additions & 4 deletions faststream/kafka/fastapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]):
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down Expand Up @@ -215,7 +215,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]):
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down Expand Up @@ -275,7 +275,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]):
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down Expand Up @@ -337,7 +337,7 @@ class KafkaRouter(StreamRouter[ConsumerRecord]):
"earliest",
"none",
] = "latest",
enable_auto_commit: bool = True,
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (
Expand Down
2 changes: 2 additions & 0 deletions faststream/kafka/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
group_id: Optional[str] = None,
client_id: str = "faststream-" + __version__,
builder: Callable[..., AIOKafkaConsumer],
is_manual: bool = False,
batch: bool = False,
batch_timeout_ms: int = 200,
max_records: Optional[int] = None,
Expand Down Expand Up @@ -101,6 +102,7 @@ def __init__(
self.batch = batch
self.batch_timeout_ms = batch_timeout_ms
self.max_records = max_records
self.is_manual = is_manual

self.builder = builder
self.task = None
Expand Down
21 changes: 18 additions & 3 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ class KafkaMessage(StreamMessage[aiokafka.ConsumerRecord]):
Reject the Kafka message.
"""

def __init__(
self,
*args: Any,
consumer: aiokafka.AIOKafkaConsumer,
is_manual: bool = False,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)

self.is_manual = is_manual
self.consumer = consumer
self.commited = False

async def ack(self, **kwargs: Any) -> None:
"""
Acknowledge the Kafka message.
Expand All @@ -32,7 +45,9 @@ async def ack(self, **kwargs: Any) -> None:
Returns:
None: This method does not return a value.
"""
return None
if self.is_manual and not self.commited:
await self.consumer.commit()
self.commited = True

async def nack(self, **kwargs: Any) -> None:
"""
Expand All @@ -44,7 +59,7 @@ async def nack(self, **kwargs: Any) -> None:
Returns:
None: This method does not return a value.
"""
return None
self.commited = True

async def reject(self, **kwargs: Any) -> None:
"""
Expand All @@ -56,4 +71,4 @@ async def reject(self, **kwargs: Any) -> None:
Returns:
None: This method does not return a value.
"""
return None
self.commited = True
Loading