Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

696 add example to faststream gen examples which uses datetime attribute #714

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import List

from pydantic import BaseModel, Field
Expand All @@ -13,6 +14,11 @@ class Point(BaseModel):
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand Down Expand Up @@ -45,5 +51,5 @@ async def on_input_data(
x_sum += msg.x
y_sum += msg.y

point_sum = Point(x=x_sum, y=y_sum)
point_sum = Point(x=x_sum, y=y_sum, time=datetime.now())
await to_output_data.publish(point_sum, key=key)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import List

from pydantic import BaseModel, Field
Expand All @@ -13,6 +14,11 @@ class Point(BaseModel):
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ The app should consume messages from the input_data topic.
The input message is a JSON encoded object including two attributes:
- x: float
- y: float
- time: datetime

input_data topic should use partition key.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime

import pytest
from freezegun import freeze_time

from faststream import Context, TestApp
from faststream._compat import model_to_jsonable
from faststream.kafka import TestKafkaBroker

from .app import Point, app, broker
Expand All @@ -11,11 +15,19 @@ async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.k
pass


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_point_was_incremented():
async with TestKafkaBroker(broker):
async with TestApp(app):
await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key")
await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key")
time = datetime.now()
await broker.publish(
Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key"
)
await broker.publish(
Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key"
)

on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=4.0)))
point_json = model_to_jsonable(Point(x=2.0, y=4.0, time=time))
on_output_data.mock.assert_called_with(point_json)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from statistics import mean
from typing import Dict, List

Expand All @@ -17,6 +18,11 @@ class Weather(BaseModel):
windspeed: NonNegativeFloat = Field(
..., examples=[20], description="Wind speed in kilometers per hour"
)
timestamp: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


publisher = broker.publisher("temperature_mean")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Dict, List

from pydantic import BaseModel, Field, NonNegativeFloat
Expand All @@ -16,6 +17,11 @@ class Weather(BaseModel):
windspeed: NonNegativeFloat = Field(
..., examples=[20], description="Wind speed in kilometers per hour"
)
timestamp: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


publisher = broker.publisher("temperature_mean")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ This topic needs to use partition key.
weather messages use JSON with two attributes:
- temperature (type float)
- windspeed (type float)
- timestamp (type datetime)

Application should save each message to a dictionary (global variable) - partition key should be usded as a dictionary key and value should be a List of temperatures.
Calculate the temperature mean of the last 5 messages for the given partition key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from datetime import datetime

import pytest
from freezegun import freeze_time

from faststream import Context, TestApp
from faststream._compat import model_to_jsonable
from faststream.kafka import TestKafkaBroker

from .app import Weather, app, broker
from .app import Weather, app, broker, on_weather


@broker.subscriber("temperature_mean")
Expand All @@ -13,14 +17,31 @@ async def on_temperature_mean(
pass


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_point_was_incremented():
async with TestKafkaBroker(broker):
async with TestApp(app):
timestamp = datetime.now()
await broker.publish(
Weather(temperature=20.5, windspeed=20), "weather", key=b"ZG"
Weather(temperature=20.5, windspeed=20, timestamp=timestamp),
"weather",
key=b"ZG",
)
weather_json = model_to_jsonable(
Weather(temperature=20.5, windspeed=20, timestamp=timestamp)
)
on_weather.mock.assert_called_with(weather_json)

await broker.publish(
Weather(temperature=10.5, windspeed=20), "weather", key=b"ZG"
Weather(temperature=10.5, windspeed=20, timestamp=timestamp),
"weather",
key=b"ZG",
)
weather_json = model_to_jsonable(
Weather(temperature=10.5, windspeed=20, timestamp=timestamp)
)
on_weather.mock.assert_called_with(weather_json)

on_temperature_mean.mock.assert_called_with(15.5)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

from pydantic import BaseModel, Field

from faststream import Context, FastStream, Logger
Expand All @@ -11,6 +13,11 @@ class Point(BaseModel):
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand All @@ -25,5 +32,5 @@ async def on_input_data(
msg: Point, logger: Logger, key: bytes = Context("message.raw_message.key")
) -> None:
logger.info(f"{msg=}")
incremented_point = Point(x=msg.x + 1, y=msg.y + 1)
incremented_point = Point(x=msg.x + 1, y=msg.y + 1, time=datetime.now())
await to_output_data.publish(incremented_point, key=key)
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

from pydantic import BaseModel, Field

from faststream import Context, FastStream, Logger
Expand All @@ -11,6 +13,11 @@ class Point(BaseModel):
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ The app should consume messages from the input_data topic.
The input message is a JSON encoded object including two attributes:
- x: float
- y: float
- time: datetime

input_data topic should use partition key.
While consuming the message, increment x and y attributes by 1 and publish that message to the output_data topic.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime

import pytest
from freezegun import freeze_time

from faststream import Context
from faststream._compat import model_to_jsonable
from faststream.kafka import TestKafkaBroker

from .app import Point, broker, on_input_data
Expand All @@ -11,9 +15,16 @@ async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.k
pass


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_point_was_incremented():
async with TestKafkaBroker(broker):
await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"key")
on_input_data.mock.assert_called_with(dict(Point(x=1.0, y=2.0)))
on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=3.0)))
time = datetime.now()
await broker.publish(Point(x=1.0, y=2.0, time=time), "input_data", key=b"key")

point_json = model_to_jsonable(Point(x=1.0, y=2.0, time=time))
on_input_data.mock.assert_called_with(point_json)

point_json = model_to_jsonable(Point(x=2.0, y=3.0, time=time))
on_output_data.mock.assert_called_with(point_json)
10 changes: 9 additions & 1 deletion faststream_gen_examples/example_course_updates/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel, Field
Expand All @@ -11,6 +12,11 @@ class CourseUpdates(BaseModel):
new_content: Optional[str] = Field(
default=None, examples=["New content"], description="Content example"
)
timestamp: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand All @@ -25,6 +31,8 @@ async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates:
if msg.new_content:
logger.info(f"Course has new content {msg.new_content=}")
msg = CourseUpdates(
course_name=("Updated: " + msg.course_name), new_content=msg.new_content
course_name=("Updated: " + msg.course_name),
new_content=msg.new_content,
timestamp=datetime.now(),
)
return msg
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel, Field
Expand All @@ -11,6 +12,11 @@ class CourseUpdates(BaseModel):
new_content: Optional[str] = Field(
default=None, examples=["New content"], description="Content example"
)
timestamp: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Develop a FastStream application using localhost broker.
It should consume messages from 'course_updates' topic where the message is a JSON encoded object including two attributes: course_name and new_content.
It should consume messages from 'course_updates' topic where the message is a JSON encoded object including three attributes: course_name, new_content and timestamp.
If new_content attribute is set, then construct a new message appending 'Updated: ' before the course_name attribute.
Finally, publish this message to the 'notify_updates' topic.
50 changes: 33 additions & 17 deletions faststream_gen_examples/example_course_updates/test_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from datetime import datetime

import pytest
from freezegun import freeze_time

from faststream._compat import model_to_jsonable
from faststream.kafka import TestKafkaBroker

from .app import CourseUpdates, broker, on_course_update
Expand All @@ -10,39 +14,51 @@ async def on_notify_update(msg: CourseUpdates):
pass


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_app_without_new_content():
async with TestKafkaBroker(broker):
await broker.publish(CourseUpdates(course_name="Biology"), "course_updates")
on_course_update.mock.assert_called_with(
dict(CourseUpdates(course_name="Biology"))
timestamp = datetime.now()
await broker.publish(
CourseUpdates(course_name="Biology", timestamp=timestamp), "course_updates"
)
on_notify_update.mock.assert_called_with(
dict(CourseUpdates(course_name="Biology"))

course_json = model_to_jsonable(
CourseUpdates(course_name="Biology", timestamp=timestamp)
)
on_course_update.mock.assert_called_with(course_json)
on_notify_update.mock.assert_called_with(course_json)


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_app_with_new_content():
async with TestKafkaBroker(broker):
timestamp = datetime.now()
await broker.publish(
CourseUpdates(
course_name="Biology", new_content="We have additional classes..."
course_name="Biology",
new_content="We have additional classes...",
timestamp=timestamp,
),
"course_updates",
)
on_course_update.mock.assert_called_with(
dict(
CourseUpdates(
course_name="Biology", new_content="We have additional classes..."
)
course_json = model_to_jsonable(
CourseUpdates(
course_name="Biology",
new_content="We have additional classes...",
timestamp=timestamp,
)
)
on_notify_update.mock.assert_called_with(
dict(
CourseUpdates(
course_name="Updated: Biology",
new_content="We have additional classes...",
)
on_course_update.mock.assert_called_with(course_json)

on_update_json = model_to_jsonable(
CourseUpdates(
course_name="Updated: Biology",
new_content="We have additional classes...",
timestamp=timestamp,
)
)
on_notify_update.mock.assert_called_with(on_update_json)
Loading