Skip to content

Commit

Permalink
chore: merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Nov 19, 2024
2 parents 736baba + 53e435e commit 9b04fe8
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1835,
"line_number": 1850,
"is_secret": false
}
],
Expand All @@ -178,5 +178,5 @@
}
]
},
"generated_at": "2024-11-08T12:39:15Z"
"generated_at": "2024-11-15T07:38:53Z"
}
15 changes: 15 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ hide:
---

# Release Notes
## 0.5.30

### What's Changed
* Introducing FastStream Guru on Gurubase.io by [@kursataktas](https://github.com/kursataktas){.external-link target="_blank"} in [#1903](https://github.com/airtai/faststream/pull/1903){.external-link target="_blank"}
* docs: add gurubase badge to the doc by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1905](https://github.com/airtai/faststream/pull/1905){.external-link target="_blank"}
* fix: allow users to pass `nkeys_seed_str` as argument for NATS broker. by [@Drakorgaur](https://github.com/Drakorgaur){.external-link target="_blank"} in [#1908](https://github.com/airtai/faststream/pull/1908){.external-link target="_blank"}
* Add more warning's to nats subscription factory by [@sheldygg](https://github.com/sheldygg){.external-link target="_blank"} in [#1907](https://github.com/airtai/faststream/pull/1907){.external-link target="_blank"}
* fix: correct working with dependencies versions by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1918](https://github.com/airtai/faststream/pull/1918){.external-link target="_blank"}

### New Contributors
* [@kursataktas](https://github.com/kursataktas){.external-link target="_blank"} made their first contribution in [#1903](https://github.com/airtai/faststream/pull/1903){.external-link target="_blank"}
* [@Drakorgaur](https://github.com/Drakorgaur){.external-link target="_blank"} made their first contribution in [#1908](https://github.com/airtai/faststream/pull/1908){.external-link target="_blank"}

**Full Changelog**: [#0.5.29...0.5.30](https://github.com/airtai/faststream/compare/0.5.29...0.5.30){.external-link target="_blank"}

## 0.5.29

### What's Changed
Expand Down
33 changes: 16 additions & 17 deletions faststream/_internal/_compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import sys
import warnings
from collections import UserString
from collections.abc import Iterable, Mapping
from importlib.metadata import version as get_version
from importlib.util import find_spec
Expand All @@ -17,8 +18,6 @@

from faststream._internal.basic_types import AnyDict

PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")

IS_WINDOWS = (
sys.platform == "win32" or sys.platform == "cygwin" or sys.platform == "msys"
)
Expand Down Expand Up @@ -76,24 +75,23 @@ def json_dumps(*a: Any, **kw: Any) -> bytes:


JsonSchemaValue = Mapping[str, Any]
major, minor, *_ = PYDANTIC_VERSION.split(".")
_PYDANTCI_MAJOR, _PYDANTIC_MINOR = int(major), int(minor)

PYDANTIC_V2 = _PYDANTCI_MAJOR >= 2

if PYDANTIC_V2:
if PYDANTIC_VERSION >= "2.4.0":
if _PYDANTIC_MINOR >= 4:
from pydantic.annotated_handlers import (
GetJsonSchemaHandler,
)
from pydantic_core.core_schema import (
with_info_plain_validator_function,
)
else:
if PYDANTIC_VERSION >= "2.10":
from pydantic.annotated_handlers import (
GetJsonSchemaHandler,
)
else:
from pydantic._internal._annotated_handlers import ( # type: ignore[no-redef]
GetJsonSchemaHandler,
)
from pydantic._internal._annotated_handlers import ( # type: ignore[no-redef]
GetJsonSchemaHandler,
)
from pydantic_core.core_schema import (
general_plain_validator_function as with_info_plain_validator_function,
)
Expand Down Expand Up @@ -175,18 +173,19 @@ def with_info_plain_validator_function( # type: ignore[misc]
return {}


anyio_major = int(get_version("anyio").split(".")[0])
ANYIO_V3 = anyio_major == 3
major, *_ = get_version("anyio").split(".")
_ANYIO_MAJOR = int(major)
ANYIO_V3 = _ANYIO_MAJOR == 3


if ANYIO_V3:
from anyio import ExceptionGroup # type: ignore[attr-defined]
elif sys.version_info < (3, 11):
elif sys.version_info >= (3, 11):
ExceptionGroup = ExceptionGroup # noqa: PLW0127
else:
from exceptiongroup import (
ExceptionGroup,
)
else:
ExceptionGroup = ExceptionGroup # noqa: PLW0127


try:
Expand All @@ -198,7 +197,7 @@ def with_info_plain_validator_function( # type: ignore[misc]
except ImportError: # pragma: no cover
# NOTE: EmailStr mock was copied from the FastAPI
# https://github.com/tiangolo/fastapi/blob/master/fastapi/openapi/models.py#24
class EmailStr(str): # type: ignore[no-redef]
class EmailStr(UserString): # type: ignore[no-redef]
"""EmailStr is a string that should be an email.
Note: EmailStr mock was copied from the FastAPI:
Expand Down
29 changes: 22 additions & 7 deletions faststream/_internal/fastapi/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,28 @@
from fastapi.requests import Request

major, minor, patch, *_ = FASTAPI_VERSION.split(".")
major = int(major)
minor = int(minor)
patch = int(patch)
FASTAPI_V2 = major > 0 or minor > 100
FASTAPI_V106 = major > 0 or minor >= 106
FASTAPI_v102_3 = major > 0 or minor > 112 or (minor == 112 and patch > 2)
FASTAPI_v102_4 = major > 0 or minor > 112 or (minor == 112 and patch > 3)

_FASTAPI_MAJOR, _FASTAPI_MINOR = int(major), int(minor)

FASTAPI_V2 = _FASTAPI_MAJOR > 0 or _FASTAPI_MINOR > 100
FASTAPI_V106 = _FASTAPI_MAJOR > 0 or _FASTAPI_MINOR >= 106

try:
_FASTAPI_PATCH = int(patch)
except ValueError:
FASTAPI_v102_3 = True
FASTAPI_v102_4 = True
else:
FASTAPI_v102_3 = (
_FASTAPI_MAJOR > 0
or _FASTAPI_MINOR > 112
or (_FASTAPI_MINOR == 112 and _FASTAPI_PATCH > 2)
)
FASTAPI_v102_4 = (
_FASTAPI_MAJOR > 0
or _FASTAPI_MINOR > 112
or (_FASTAPI_MINOR == 112 and _FASTAPI_PATCH > 3)
)

__all__ = (
"RequestValidationError",
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/subscriber/specified.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SpecificationSubscriber(LogicSubscriber[MsgType]):
"""A class to handle logic and async API operations."""

def get_name(self) -> str:
return f'{",".join(self.topics)}:{self.call_name}'
return f"{','.join(self.topics)}:{self.call_name}"

def get_schema(self) -> dict[str, Channel]:
channels = {}
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/subscriber/specified.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SpecificationSubscriber(LogicSubscriber[MsgType]):
"""A class to handle logic and async API operations."""

def get_name(self) -> str:
return f'{",".join(self.topics)}:{self.call_name}'
return f"{','.join(self.topics)}:{self.call_name}"

def get_schema(self) -> dict[str, Channel]:
channels = {}
Expand Down
5 changes: 5 additions & 0 deletions faststream/nats/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ async def ack(self) -> None:
await self.raw_message.ack()
await super().ack()

async def ack_sync(self) -> None:
if not self.raw_message._ackd:
await self.raw_message.ack_sync()
await super().ack()

async def nack(
self,
delay: Optional[float] = None,
Expand Down
104 changes: 84 additions & 20 deletions faststream/prometheus/container.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from collections.abc import Sequence
from typing import Optional
from typing import TYPE_CHECKING, Optional, cast

from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
from prometheus_client import Counter, Gauge, Histogram

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
from prometheus_client.registry import Collector


class MetricsContainer:
Expand Down Expand Up @@ -44,58 +48,118 @@ def __init__(
self._registry = registry
self._metrics_prefix = metrics_prefix

self.received_messages_total = Counter(
name=f"{metrics_prefix}_received_messages_total",
received_messages_total_name = f"{metrics_prefix}_received_messages_total"
self.received_messages_total = cast(
Counter, self._get_registered_metric(received_messages_total_name)
) or Counter(
name=received_messages_total_name,
documentation="Count of received messages by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_messages_size_bytes = Histogram(
name=f"{metrics_prefix}_received_messages_size_bytes",

received_messages_size_bytes_name = (
f"{metrics_prefix}_received_messages_size_bytes"
)
self.received_messages_size_bytes = cast(
Histogram, self._get_registered_metric(received_messages_size_bytes_name)
) or Histogram(
name=received_messages_size_bytes_name,
documentation="Histogram of received messages size in bytes by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS,
)
self.received_messages_in_process = Gauge(
name=f"{metrics_prefix}_received_messages_in_process",

received_messages_in_process_name = (
f"{metrics_prefix}_received_messages_in_process"
)
self.received_messages_in_process = cast(
Gauge, self._get_registered_metric(received_messages_in_process_name)
) or Gauge(
name=received_messages_in_process_name,
documentation="Gauge of received messages in process by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_total = Counter(
name=f"{metrics_prefix}_received_processed_messages_total",

received_processed_messages_total_name = (
f"{metrics_prefix}_received_processed_messages_total"
)
self.received_processed_messages_total = cast(
Counter, self._get_registered_metric(received_processed_messages_total_name)
) or Counter(
name=received_processed_messages_total_name,
documentation="Count of received processed messages by broker, handler and status",
labelnames=["app_name", "broker", "handler", "status"],
registry=registry,
)
self.received_processed_messages_duration_seconds = Histogram(
name=f"{metrics_prefix}_received_processed_messages_duration_seconds",

received_processed_messages_duration_seconds_name = (
f"{metrics_prefix}_received_processed_messages_duration_seconds"
)
self.received_processed_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(
received_processed_messages_duration_seconds_name
),
) or Histogram(
name=received_processed_messages_duration_seconds_name,
documentation="Histogram of received processed messages duration in seconds by broker and handler",
labelnames=["app_name", "broker", "handler"],
registry=registry,
)
self.received_processed_messages_exceptions_total = Counter(
name=f"{metrics_prefix}_received_processed_messages_exceptions_total",

received_processed_messages_exceptions_total_name = (
f"{metrics_prefix}_received_processed_messages_exceptions_total"
)
self.received_processed_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(
received_processed_messages_exceptions_total_name
),
) or Counter(
name=received_processed_messages_exceptions_total_name,
documentation="Count of received processed messages exceptions by broker, handler and exception_type",
labelnames=["app_name", "broker", "handler", "exception_type"],
registry=registry,
)
self.published_messages_total = Counter(
name=f"{metrics_prefix}_published_messages_total",

published_messages_total_name = f"{metrics_prefix}_published_messages_total"
self.published_messages_total = cast(
Counter, self._get_registered_metric(published_messages_total_name)
) or Counter(
name=published_messages_total_name,
documentation="Count of published messages by destination and status",
labelnames=["app_name", "broker", "destination", "status"],
registry=registry,
)
self.published_messages_duration_seconds = Histogram(
name=f"{metrics_prefix}_published_messages_duration_seconds",

published_messages_duration_seconds_name = (
f"{metrics_prefix}_published_messages_duration_seconds"
)
self.published_messages_duration_seconds = cast(
Histogram,
self._get_registered_metric(published_messages_duration_seconds_name),
) or Histogram(
name=published_messages_duration_seconds_name,
documentation="Histogram of published messages duration in seconds by broker and destination",
labelnames=["app_name", "broker", "destination"],
registry=registry,
)
self.published_messages_exceptions_total = Counter(
name=f"{metrics_prefix}_published_messages_exceptions_total",

published_messages_exceptions_total_name = (
f"{metrics_prefix}_published_messages_exceptions_total"
)
self.published_messages_exceptions_total = cast(
Counter,
self._get_registered_metric(published_messages_exceptions_total_name),
) or Counter(
name=published_messages_exceptions_total_name,
documentation="Count of published messages exceptions by broker, destination and exception_type",
labelnames=["app_name", "broker", "destination", "exception_type"],
registry=registry,
)

def _get_registered_metric(self, metric_name: str) -> Optional["Collector"]:
return self._registry._names_to_collectors.get(metric_name)
6 changes: 3 additions & 3 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ def __call__(
/,
*,
context: "ContextRepo",
) -> "_PrometheusMiddleware":
return _PrometheusMiddleware(
) -> "BasePrometheusMiddleware":
return BasePrometheusMiddleware(
msg,
metrics_manager=self._metrics_manager,
settings_provider_factory=self._settings_provider_factory,
context=context,
)


class _PrometheusMiddleware(BaseMiddleware):
class BasePrometheusMiddleware(BaseMiddleware):
def __init__(
self,
msg: Optional[Any],
Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(
] = None,
# logging args
logger: Annotated[
Union["LoggerProto", None, object],
Optional["LoggerProto"],
Doc("User specified logger to pass into Context and log service messages."),
] = EMPTY,
log_level: Annotated[
Expand Down
Loading

0 comments on commit 9b04fe8

Please sign in to comment.