From 0fbe18f78e489add74e01a52b5b748fd9d3a6284 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobert?= Date: Fri, 22 Sep 2023 11:49:49 +0200 Subject: [PATCH 1/5] WIP --- .../example_add_and_publish_with_key/app.py | 8 +++++- .../app_skeleton.py | 6 +++++ .../description.txt | 1 + .../test_app.py | 18 ++++++++++--- .../example_calculate_mean_temperature/app.py | 6 +++++ .../app_skeleton.py | 6 +++++ .../description.txt | 1 + .../test_app.py | 27 ++++++++++++++++--- .../example_consume_publish_with_key/app.py | 9 ++++++- .../app_skeleton.py | 7 +++++ .../description.txt | 1 + .../test_app.py | 17 +++++++++--- 12 files changed, 96 insertions(+), 11 deletions(-) diff --git a/faststream_gen_examples/example_add_and_publish_with_key/app.py b/faststream_gen_examples/example_add_and_publish_with_key/app.py index 59ea064a06..bf92ecc7dc 100644 --- a/faststream_gen_examples/example_add_and_publish_with_key/app.py +++ b/faststream_gen_examples/example_add_and_publish_with_key/app.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List from pydantic import BaseModel, Field @@ -13,6 +14,11 @@ class Point(BaseModel): y: float = Field( ..., examples=[0.5], description="The Y Coordinate in the coordinate system" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") @@ -45,5 +51,5 @@ async def on_input_data( x_sum += msg.x y_sum += msg.y - point_sum = Point(x=x_sum, y=y_sum) + point_sum = Point(x=x_sum, y=y_sum, time=datetime.now()) await to_output_data.publish(point_sum, key=key) diff --git a/faststream_gen_examples/example_add_and_publish_with_key/app_skeleton.py b/faststream_gen_examples/example_add_and_publish_with_key/app_skeleton.py index dcd9f8e685..c12312893b 100644 --- a/faststream_gen_examples/example_add_and_publish_with_key/app_skeleton.py +++ b/faststream_gen_examples/example_add_and_publish_with_key/app_skeleton.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List from pydantic import BaseModel, Field @@ -13,6 +14,11 @@ class Point(BaseModel): y: float = Field( ..., examples=[0.5], description="The Y Coordinate in the coordinate system" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_add_and_publish_with_key/description.txt b/faststream_gen_examples/example_add_and_publish_with_key/description.txt index 2516484a62..7768996a26 100644 --- a/faststream_gen_examples/example_add_and_publish_with_key/description.txt +++ b/faststream_gen_examples/example_add_and_publish_with_key/description.txt @@ -3,6 +3,7 @@ The app should consume messages from the input_data topic. The input message is a JSON encoded object including two attributes: - x: float - y: float + - time: datetime input_data topic should use partition key. diff --git a/faststream_gen_examples/example_add_and_publish_with_key/test_app.py b/faststream_gen_examples/example_add_and_publish_with_key/test_app.py index 028244f0b9..db9741af13 100644 --- a/faststream_gen_examples/example_add_and_publish_with_key/test_app.py +++ b/faststream_gen_examples/example_add_and_publish_with_key/test_app.py @@ -1,6 +1,10 @@ +from datetime import datetime + import pytest +from freezegun import freeze_time from faststream import Context, TestApp +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker from .app import Point, app, broker @@ -11,11 +15,19 @@ async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.k pass +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") @pytest.mark.asyncio async def test_point_was_incremented(): async with TestKafkaBroker(broker): async with TestApp(app): - await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key") - await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key") + time = datetime.now() + await broker.publish( + Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key" + ) + await broker.publish( + Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key" + ) - on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=4.0))) + point_json = model_to_jsonable(Point(x=2.0, y=4.0, time=time)) + on_output_data.mock.assert_called_with(point_json) diff --git a/faststream_gen_examples/example_calculate_mean_temperature/app.py b/faststream_gen_examples/example_calculate_mean_temperature/app.py index bbd6817028..1498fce584 100644 --- a/faststream_gen_examples/example_calculate_mean_temperature/app.py +++ b/faststream_gen_examples/example_calculate_mean_temperature/app.py @@ -1,3 +1,4 @@ +from datetime import datetime from statistics import mean from typing import Dict, List @@ -17,6 +18,11 @@ class Weather(BaseModel): windspeed: NonNegativeFloat = Field( ..., examples=[20], description="Wind speed in kilometers per hour" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) publisher = broker.publisher("temperature_mean") diff --git a/faststream_gen_examples/example_calculate_mean_temperature/app_skeleton.py b/faststream_gen_examples/example_calculate_mean_temperature/app_skeleton.py index a62990d3ac..e26adc758c 100644 --- a/faststream_gen_examples/example_calculate_mean_temperature/app_skeleton.py +++ b/faststream_gen_examples/example_calculate_mean_temperature/app_skeleton.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Dict, List from pydantic import BaseModel, Field, NonNegativeFloat @@ -16,6 +17,11 @@ class Weather(BaseModel): windspeed: NonNegativeFloat = Field( ..., examples=[20], description="Wind speed in kilometers per hour" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) publisher = broker.publisher("temperature_mean") diff --git a/faststream_gen_examples/example_calculate_mean_temperature/description.txt b/faststream_gen_examples/example_calculate_mean_temperature/description.txt index 5f8ccfea8b..aa70dbb616 100644 --- a/faststream_gen_examples/example_calculate_mean_temperature/description.txt +++ b/faststream_gen_examples/example_calculate_mean_temperature/description.txt @@ -4,6 +4,7 @@ This topic needs to use partition key. weather messages use JSON with two attributes: - temperature (type float) - windspeed (type float) + - timestamp (type datetime) Application should save each message to a dictionary (global variable) - partition key should be usded as a dictionary key and value should be a List of temperatures. Calculate the temperature mean of the last 5 messages for the given partition key diff --git a/faststream_gen_examples/example_calculate_mean_temperature/test_app.py b/faststream_gen_examples/example_calculate_mean_temperature/test_app.py index e8c0497c4b..ddf934ae9d 100644 --- a/faststream_gen_examples/example_calculate_mean_temperature/test_app.py +++ b/faststream_gen_examples/example_calculate_mean_temperature/test_app.py @@ -1,9 +1,13 @@ +from datetime import datetime + import pytest +from freezegun import freeze_time from faststream import Context, TestApp +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker -from .app import Weather, app, broker +from .app import Weather, app, broker, on_weather @broker.subscriber("temperature_mean") @@ -13,14 +17,31 @@ async def on_temperature_mean( pass +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") @pytest.mark.asyncio async def test_point_was_incremented(): async with TestKafkaBroker(broker): async with TestApp(app): + timestamp = datetime.now() await broker.publish( - Weather(temperature=20.5, windspeed=20), "weather", key=b"ZG" + Weather(temperature=20.5, windspeed=20, timestamp=timestamp), + "weather", + key=b"ZG", + ) + weather_json = model_to_jsonable( + Weather(temperature=20.5, windspeed=20, timestamp=timestamp) ) + on_weather.mock.assert_called_with(weather_json) + await broker.publish( - Weather(temperature=10.5, windspeed=20), "weather", key=b"ZG" + Weather(temperature=10.5, windspeed=20, timestamp=timestamp), + "weather", + key=b"ZG", ) + weather_json = model_to_jsonable( + Weather(temperature=10.5, windspeed=20, timestamp=timestamp) + ) + on_weather.mock.assert_called_with(weather_json) + on_temperature_mean.mock.assert_called_with(15.5) diff --git a/faststream_gen_examples/example_consume_publish_with_key/app.py b/faststream_gen_examples/example_consume_publish_with_key/app.py index 3d2abef618..dec320b316 100644 --- a/faststream_gen_examples/example_consume_publish_with_key/app.py +++ b/faststream_gen_examples/example_consume_publish_with_key/app.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field from faststream import Context, FastStream, Logger @@ -11,6 +13,11 @@ class Point(BaseModel): y: float = Field( ..., examples=[0.5], description="The Y Coordinate in the coordinate system" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") @@ -25,5 +32,5 @@ async def on_input_data( msg: Point, logger: Logger, key: bytes = Context("message.raw_message.key") ) -> None: logger.info(f"{msg=}") - incremented_point = Point(x=msg.x + 1, y=msg.y + 1) + incremented_point = Point(x=msg.x + 1, y=msg.y + 1, time=datetime.now()) await to_output_data.publish(incremented_point, key=key) diff --git a/faststream_gen_examples/example_consume_publish_with_key/app_skeleton.py b/faststream_gen_examples/example_consume_publish_with_key/app_skeleton.py index 7993558111..15138a15c0 100644 --- a/faststream_gen_examples/example_consume_publish_with_key/app_skeleton.py +++ b/faststream_gen_examples/example_consume_publish_with_key/app_skeleton.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field from faststream import Context, FastStream, Logger @@ -11,6 +13,11 @@ class Point(BaseModel): y: float = Field( ..., examples=[0.5], description="The Y Coordinate in the coordinate system" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_consume_publish_with_key/description.txt b/faststream_gen_examples/example_consume_publish_with_key/description.txt index 7cc03d4426..29fd9aab9a 100644 --- a/faststream_gen_examples/example_consume_publish_with_key/description.txt +++ b/faststream_gen_examples/example_consume_publish_with_key/description.txt @@ -3,6 +3,7 @@ The app should consume messages from the input_data topic. The input message is a JSON encoded object including two attributes: - x: float - y: float + - time: datetime input_data topic should use partition key. While consuming the message, increment x and y attributes by 1 and publish that message to the output_data topic. diff --git a/faststream_gen_examples/example_consume_publish_with_key/test_app.py b/faststream_gen_examples/example_consume_publish_with_key/test_app.py index b8e320648b..6e50e723b0 100644 --- a/faststream_gen_examples/example_consume_publish_with_key/test_app.py +++ b/faststream_gen_examples/example_consume_publish_with_key/test_app.py @@ -1,6 +1,10 @@ +from datetime import datetime + import pytest +from freezegun import freeze_time from faststream import Context +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker from .app import Point, broker, on_input_data @@ -11,9 +15,16 @@ async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.k pass +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") @pytest.mark.asyncio async def test_point_was_incremented(): async with TestKafkaBroker(broker): - await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"key") - on_input_data.mock.assert_called_with(dict(Point(x=1.0, y=2.0))) - on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=3.0))) + time = datetime.now() + await broker.publish(Point(x=1.0, y=2.0, time=time), "input_data", key=b"key") + + point_json = model_to_jsonable(Point(x=1.0, y=2.0, time=time)) + on_input_data.mock.assert_called_with(point_json) + + point_json = model_to_jsonable(Point(x=2.0, y=3.0, time=time)) + on_output_data.mock.assert_called_with(point_json) From 195755945e85ce9a891e2b1a43448f8ee54899ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobert?= Date: Fri, 22 Sep 2023 11:51:35 +0200 Subject: [PATCH 2/5] Add freezgun to testing requirements --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b0e30b7e8b..f07a20a267 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,6 +109,7 @@ testing = [ "httpx", "PyYAML", "requests", + "freezegun", ] publish = ["hatch==1.7.0"] From 0127153f90126c2230e3d4fc1a03da84bdcb8ac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobert?= Date: Fri, 22 Sep 2023 12:05:11 +0200 Subject: [PATCH 3/5] WIP --- .../example_course_updates/app.py | 10 +++- .../example_course_updates/app_skeleton.py | 6 +++ .../example_course_updates/description.txt | 2 +- .../example_course_updates/test_app.py | 50 ++++++++++++------- 4 files changed, 49 insertions(+), 19 deletions(-) diff --git a/faststream_gen_examples/example_course_updates/app.py b/faststream_gen_examples/example_course_updates/app.py index c30f3c482d..8516bcaaa1 100644 --- a/faststream_gen_examples/example_course_updates/app.py +++ b/faststream_gen_examples/example_course_updates/app.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Optional from pydantic import BaseModel, Field @@ -11,6 +12,11 @@ class CourseUpdates(BaseModel): new_content: Optional[str] = Field( default=None, examples=["New content"], description="Content example" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") @@ -25,6 +31,8 @@ async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates: if msg.new_content: logger.info(f"Course has new content {msg.new_content=}") msg = CourseUpdates( - course_name=("Updated: " + msg.course_name), new_content=msg.new_content + course_name=("Updated: " + msg.course_name), + new_content=msg.new_content, + timestamp=datetime.now(), ) return msg diff --git a/faststream_gen_examples/example_course_updates/app_skeleton.py b/faststream_gen_examples/example_course_updates/app_skeleton.py index e52db6b0fb..30fbe553f8 100644 --- a/faststream_gen_examples/example_course_updates/app_skeleton.py +++ b/faststream_gen_examples/example_course_updates/app_skeleton.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Optional from pydantic import BaseModel, Field @@ -11,6 +12,11 @@ class CourseUpdates(BaseModel): new_content: Optional[str] = Field( default=None, examples=["New content"], description="Content example" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_course_updates/description.txt b/faststream_gen_examples/example_course_updates/description.txt index 1399b34736..76a216b97f 100644 --- a/faststream_gen_examples/example_course_updates/description.txt +++ b/faststream_gen_examples/example_course_updates/description.txt @@ -1,4 +1,4 @@ Develop a FastStream application using localhost broker. -It should consume messages from 'course_updates' topic where the message is a JSON encoded object including two attributes: course_name and new_content. +It should consume messages from 'course_updates' topic where the message is a JSON encoded object including three attributes: course_name, new_content and timestamp. If new_content attribute is set, then construct a new message appending 'Updated: ' before the course_name attribute. Finally, publish this message to the 'notify_updates' topic. diff --git a/faststream_gen_examples/example_course_updates/test_app.py b/faststream_gen_examples/example_course_updates/test_app.py index 40e8356c03..20a79365a5 100644 --- a/faststream_gen_examples/example_course_updates/test_app.py +++ b/faststream_gen_examples/example_course_updates/test_app.py @@ -1,5 +1,9 @@ +from datetime import datetime + import pytest +from freezegun import freeze_time +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker from .app import CourseUpdates, broker, on_course_update @@ -10,39 +14,51 @@ async def on_notify_update(msg: CourseUpdates): pass +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") @pytest.mark.asyncio async def test_app_without_new_content(): async with TestKafkaBroker(broker): - await broker.publish(CourseUpdates(course_name="Biology"), "course_updates") - on_course_update.mock.assert_called_with( - dict(CourseUpdates(course_name="Biology")) + timestamp = datetime.now() + await broker.publish( + CourseUpdates(course_name="Biology", timestamp=timestamp), "course_updates" ) - on_notify_update.mock.assert_called_with( - dict(CourseUpdates(course_name="Biology")) + + course_json = model_to_jsonable( + CourseUpdates(course_name="Biology", timestamp=timestamp) ) + on_course_update.mock.assert_called_with(course_json) + on_notify_update.mock.assert_called_with(course_json) +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") @pytest.mark.asyncio async def test_app_with_new_content(): async with TestKafkaBroker(broker): + timestamp = datetime.now() await broker.publish( CourseUpdates( - course_name="Biology", new_content="We have additional classes..." + course_name="Biology", + new_content="We have additional classes...", + timestamp=timestamp, ), "course_updates", ) - on_course_update.mock.assert_called_with( - dict( - CourseUpdates( - course_name="Biology", new_content="We have additional classes..." - ) + course_json = model_to_jsonable( + CourseUpdates( + course_name="Biology", + new_content="We have additional classes...", + timestamp=timestamp, ) ) - on_notify_update.mock.assert_called_with( - dict( - CourseUpdates( - course_name="Updated: Biology", - new_content="We have additional classes...", - ) + on_course_update.mock.assert_called_with(course_json) + + on_update_json = model_to_jsonable( + CourseUpdates( + course_name="Updated: Biology", + new_content="We have additional classes...", + timestamp=timestamp, ) ) + on_notify_update.mock.assert_called_with(on_update_json) From 228bc93ae59017e2606a500209cafd573fca6e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobert?= Date: Fri, 22 Sep 2023 12:09:15 +0200 Subject: [PATCH 4/5] Use asyncio.gather for awaiting the task --- faststream_gen_examples/example_scrape_weather/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream_gen_examples/example_scrape_weather/app.py b/faststream_gen_examples/example_scrape_weather/app.py index 259b1dc5a6..85d3d6f5e4 100644 --- a/faststream_gen_examples/example_scrape_weather/app.py +++ b/faststream_gen_examples/example_scrape_weather/app.py @@ -48,7 +48,7 @@ async def shutdown(context: ContextRepo): # Get all the running tasks and wait them to finish publish_tasks = context.get("publish_tasks") - await asyncio.wait(publish_tasks) + await asyncio.gather(*publish_tasks) async def fetch_and_publish_weather( From f2bd0dfcefb7a07ea202602e3847ca5cc7dc49f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CRobert?= Date: Fri, 22 Sep 2023 15:42:16 +0200 Subject: [PATCH 5/5] Add datetime attribute to some exmples --- .../example_product_reviews/app.py | 7 ++++ .../example_product_reviews/app_skeleton.py | 7 ++++ .../example_product_reviews/description.txt | 2 +- .../example_product_reviews/test_app.py | 34 ++++++++++++++----- .../example_scram256_security/app.py | 11 +++--- .../example_scram256_security/app_skeleton.py | 9 +++-- .../example_scram256_security/description.txt | 2 +- .../example_scram256_security/test_app.py | 21 +++++++----- .../example_student_query/app.py | 7 ++++ .../example_student_query/app_skeleton.py | 7 ++++ .../example_student_query/description.txt | 2 +- .../example_student_query/test_app.py | 30 ++++++++-------- 12 files changed, 91 insertions(+), 48 deletions(-) diff --git a/faststream_gen_examples/example_product_reviews/app.py b/faststream_gen_examples/example_product_reviews/app.py index 384c9f6208..3d875bec37 100644 --- a/faststream_gen_examples/example_product_reviews/app.py +++ b/faststream_gen_examples/example_product_reviews/app.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field, NonNegativeInt from faststream import FastStream, Logger @@ -14,6 +16,11 @@ class ProductReview(BaseModel): review_grade: NonNegativeInt = Field( ..., examples=[1], description="Int data example" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_product_reviews/app_skeleton.py b/faststream_gen_examples/example_product_reviews/app_skeleton.py index b8a1d868bd..3a47d8ce63 100644 --- a/faststream_gen_examples/example_product_reviews/app_skeleton.py +++ b/faststream_gen_examples/example_product_reviews/app_skeleton.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field, NonNegativeInt from faststream import FastStream, Logger @@ -14,6 +16,11 @@ class ProductReview(BaseModel): review_grade: NonNegativeInt = Field( ..., examples=[1], description="Int data example" ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_product_reviews/description.txt b/faststream_gen_examples/example_product_reviews/description.txt index 8b2e3dc2b0..5887909c69 100644 --- a/faststream_gen_examples/example_product_reviews/description.txt +++ b/faststream_gen_examples/example_product_reviews/description.txt @@ -1,3 +1,3 @@ Create a FastStream application using the localhost broker. -The application should consume from the 'product_reviews' topic which includes JSON encoded objects with attributes: product_id, customer_id and review_grade. +The application should consume from the 'product_reviews' topic which includes JSON encoded objects with attributes: product_id, customer_id, review_grade and timestamp. If the review_grade attribute is smaller then 5, send an alert message to the 'customer_service' topic. diff --git a/faststream_gen_examples/example_product_reviews/test_app.py b/faststream_gen_examples/example_product_reviews/test_app.py index bdcc878e48..49ff26266b 100644 --- a/faststream_gen_examples/example_product_reviews/test_app.py +++ b/faststream_gen_examples/example_product_reviews/test_app.py @@ -1,5 +1,8 @@ +from datetime import datetime + import pytest +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker from .app import ProductReview, broker, on_product_reviews @@ -13,26 +16,39 @@ async def on_customer_service(msg: ProductReview) -> None: @pytest.mark.asyncio async def test_app_where_review_grade_is_grater_then_5(): async with TestKafkaBroker(broker): + timestamp = datetime.now() await broker.publish( - ProductReview(product_id=1, customer_id=1, review_grade=6), + ProductReview( + product_id=1, customer_id=1, review_grade=6, timestamp=timestamp + ), "product_reviews", ) - on_product_reviews.mock.assert_called_with( - dict(ProductReview(product_id=1, customer_id=1, review_grade=6)) + + on_product_review_json = model_to_jsonable( + ProductReview( + product_id=1, customer_id=1, review_grade=6, timestamp=timestamp + ) ) + on_product_reviews.mock.assert_called_with(on_product_review_json) on_customer_service.mock.assert_not_called() @pytest.mark.asyncio async def test_app_where_review_grade_is_less_then_5(): async with TestKafkaBroker(broker): + timestamp = datetime.now() await broker.publish( - ProductReview(product_id=1, customer_id=2, review_grade=2), + ProductReview( + product_id=1, customer_id=2, review_grade=2, timestamp=timestamp + ), "product_reviews", ) - on_product_reviews.mock.assert_called_with( - dict(ProductReview(product_id=1, customer_id=2, review_grade=2)) - ) - on_customer_service.mock.assert_called_with( - dict(ProductReview(product_id=1, customer_id=2, review_grade=2)) + + product_review_json = model_to_jsonable( + ProductReview( + product_id=1, customer_id=2, review_grade=2, timestamp=timestamp + ) ) + + on_product_reviews.mock.assert_called_with(product_review_json) + on_customer_service.mock.assert_called_with(product_review_json) diff --git a/faststream_gen_examples/example_scram256_security/app.py b/faststream_gen_examples/example_scram256_security/app.py index 9203b34b23..18e751ea22 100644 --- a/faststream_gen_examples/example_scram256_security/app.py +++ b/faststream_gen_examples/example_scram256_security/app.py @@ -1,5 +1,6 @@ import os import ssl +from datetime import date from pydantic import BaseModel, Field @@ -10,12 +11,10 @@ class Student(BaseModel): name: str = Field(..., examples=["Student Studentis"], description="Name example") - age: int = Field( + birthdate: date = Field( ..., - examples=[ - 20, - ], - description="Student age", + examples=["2023-09-05"], + description="Students birthdate", ) @@ -34,5 +33,5 @@ class Student(BaseModel): @broker.subscriber("student_application") async def on_application(msg: Student, logger: Logger) -> None: - key = str(msg.age).encode("utf-8") + key = msg.name.encode("utf-8") await to_class.publish(msg, key=key) diff --git a/faststream_gen_examples/example_scram256_security/app_skeleton.py b/faststream_gen_examples/example_scram256_security/app_skeleton.py index 3c5572bb81..9860c93556 100644 --- a/faststream_gen_examples/example_scram256_security/app_skeleton.py +++ b/faststream_gen_examples/example_scram256_security/app_skeleton.py @@ -1,5 +1,6 @@ import os import ssl +from datetime import date from pydantic import BaseModel, Field @@ -10,12 +11,10 @@ class Student(BaseModel): name: str = Field(..., examples=["Student Studentis"], description="Name example") - age: int = Field( + birthdate: date = Field( ..., - examples=[ - 20, - ], - description="Student age", + examples=["2023-09-05"], + description="Students birthdate", ) diff --git a/faststream_gen_examples/example_scram256_security/description.txt b/faststream_gen_examples/example_scram256_security/description.txt index 02a224b279..3533865de4 100644 --- a/faststream_gen_examples/example_scram256_security/description.txt +++ b/faststream_gen_examples/example_scram256_security/description.txt @@ -1,5 +1,5 @@ FastStream application that handles the incoming students from "student_application" topic. The Student is then passed to the "class" topic using student_name as key. -Student has a name and age. +Student has a name and birthdate. The communication with the broker is encrypted with ssl and uses SASL Scram256 for authorization. Username and pasword are loaded from environment variables. diff --git a/faststream_gen_examples/example_scram256_security/test_app.py b/faststream_gen_examples/example_scram256_security/test_app.py index 6311b0acbf..2cf2218282 100644 --- a/faststream_gen_examples/example_scram256_security/test_app.py +++ b/faststream_gen_examples/example_scram256_security/test_app.py @@ -1,9 +1,11 @@ import os +from datetime import date from unittest import mock import pytest from faststream import Context +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker with mock.patch.dict( @@ -23,15 +25,16 @@ async def on_class( @pytest.mark.asyncio async def test_app(): async with TestKafkaBroker(broker): + birthdate = date(2020, 9, 5) await broker.publish( - Student(name="Student Studentis", age=12), "student_application" + Student(name="Student Studentis", birthdate=birthdate), + "student_application", ) - on_application.mock.assert_called_with( - dict(Student(name="Student Studentis", age=12)) - ) - to_class.mock.assert_called_with( - dict(Student(name="Student Studentis", age=12)) - ) - on_class.mock.assert_called_with( - dict(Student(name="Student Studentis", age=12)) + + student_json = model_to_jsonable( + Student(name="Student Studentis", birthdate=birthdate) ) + + on_application.mock.assert_called_with(student_json) + to_class.mock.assert_called_with(student_json) + on_class.mock.assert_called_with(student_json) diff --git a/faststream_gen_examples/example_student_query/app.py b/faststream_gen_examples/example_student_query/app.py index a137c0b1ca..4c2f17d288 100644 --- a/faststream_gen_examples/example_student_query/app.py +++ b/faststream_gen_examples/example_student_query/app.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field, NonNegativeInt from faststream import FastStream, Logger @@ -14,6 +16,11 @@ class StudentQuery(BaseModel): query: str = Field( ..., examples=["Please help me with..."], description="Query example" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_student_query/app_skeleton.py b/faststream_gen_examples/example_student_query/app_skeleton.py index 2daecffc2f..117f2a186f 100644 --- a/faststream_gen_examples/example_student_query/app_skeleton.py +++ b/faststream_gen_examples/example_student_query/app_skeleton.py @@ -1,3 +1,5 @@ +from datetime import datetime + from pydantic import BaseModel, Field, NonNegativeInt from faststream import FastStream, Logger @@ -14,6 +16,11 @@ class StudentQuery(BaseModel): query: str = Field( ..., examples=["Please help me with..."], description="Query example" ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) broker = KafkaBroker("localhost:9092") diff --git a/faststream_gen_examples/example_student_query/description.txt b/faststream_gen_examples/example_student_query/description.txt index 85ebc4ccf0..bc9c90d767 100644 --- a/faststream_gen_examples/example_student_query/description.txt +++ b/faststream_gen_examples/example_student_query/description.txt @@ -1,4 +1,4 @@ -Create a FastStream application using localhost as a broker. Consume from 'student_query' topic, which includes attributes: student_id, department and query. +Create a FastStream application using localhost as a broker. Consume from 'student_query' topic, which includes attributes: student_id, department and query, time. Each query should then be forwarded to the corresponding department based on the department attribute. The relevant department topics could be 'finance_department', 'academic_department', or 'admissions_department'. If department is not one of these topics, forward the message to the 'unclassified_query' topic. diff --git a/faststream_gen_examples/example_student_query/test_app.py b/faststream_gen_examples/example_student_query/test_app.py index b037853334..a4c13a82c2 100644 --- a/faststream_gen_examples/example_student_query/test_app.py +++ b/faststream_gen_examples/example_student_query/test_app.py @@ -1,5 +1,8 @@ +from datetime import datetime + import pytest +from faststream._compat import model_to_jsonable from faststream.kafka import TestKafkaBroker from .app import StudentQuery, broker, on_student_query @@ -28,33 +31,28 @@ async def on_unclassified_query(msg: StudentQuery) -> None: @pytest.mark.asyncio async def test_message_published_to_correct_topic(): async with TestKafkaBroker(broker): + time = datetime.now() await broker.publish( StudentQuery( student_id=1, department="admissions_department", query="Help me with...", + time=time, ), "student_query", ) - on_student_query.mock.assert_called_with( - dict( - StudentQuery( - student_id=1, - department="admissions_department", - query="Help me with...", - ) - ) - ) - on_admissions_department.mock.assert_called_with( - dict( - StudentQuery( - student_id=1, - department="admissions_department", - query="Help me with...", - ) + student_query_json = model_to_jsonable( + StudentQuery( + student_id=1, + department="admissions_department", + query="Help me with...", + time=time, ) ) + on_student_query.mock.assert_called_with(student_query_json) + on_admissions_department.mock.assert_called_with(student_query_json) + on_finance_department.mock.assert_not_called() on_academic_department.mock.assert_not_called() on_unclassified_query.mock.assert_not_called()