Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Reduce cardinality for the route metric tag #48290

Merged
merged 19 commits into from
Oct 28, 2024
27 changes: 25 additions & 2 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import json
import pickle
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Awaitable, Callable, List, Optional
from typing import Any, Awaitable, Callable, Dict, List, Optional

from starlette.types import Scope

from ray.actor import ActorHandle
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
Expand Down Expand Up @@ -602,10 +605,30 @@ def is_grpc_request(self) -> bool:
class StreamingHTTPRequest:
"""Sent from the HTTP proxy to replicas on the streaming codepath."""

pickled_asgi_scope: bytes
asgi_scope: Scope
# Takes request metadata, returns a pickled list of ASGI messages.
receive_asgi_messages: Callable[[RequestMetadata], Awaitable[bytes]]

def __getstate__(self) -> Dict[str, Any]:
"""Custom serializer to use vanilla `pickle` for the ASGI scope.

This is possible because we know the scope is a dictionary containing
only Python primitive types. Vanilla `pickle` is much faster than cloudpickle.
"""
return {
"pickled_asgi_scope": pickle.dumps(self.asgi_scope),
"receive_asgi_messages": self.receive_asgi_messages,
}

def __setstate__(self, state: Dict[str, Any]):
"""Custom deserializer to use vanilla `pickle` for the ASGI scope.

This is possible because we know the scope is a dictionary containing
only Python primitive types. Vanilla `pickle` is much faster than cloudpickle.
"""
self.asgi_scope = pickle.loads(state["pickled_asgi_scope"])
self.receive_asgi_messages = state["receive_asgi_messages"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker: We can probably create a named tuple for state object to ensure those required keys exist and accessible.



class TargetCapacityDirection(str, Enum):
"""Determines what direction the target capacity is scaling."""
Expand Down
4 changes: 4 additions & 0 deletions python/ray/serve/_private/http_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ def __init__(self, app: ASGIApp):
# Replace uvicorn logger with our own.
self._serve_asgi_lifespan.logger = logger

@property
def app(self) -> ASGIApp:
return self._asgi_app

async def _run_asgi_lifespan_startup(self):
# LifespanOn's logger logs in INFO level thus becomes spammy
# Within this block we temporarily uplevel for cleaner logging
Expand Down
37 changes: 24 additions & 13 deletions python/ray/serve/_private/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,10 @@ def _get_response_handler_info(
return ResponseHandlerInfo(
response_generator=self.not_found_response(proxy_request),
metadata=HandlerMetadata(
route=proxy_request.route_path,
# Don't include the invalid route prefix because it can blow up our
# metrics' cardinality.
# See: https://github.com/ray-project/ray/issues/47999
route="",
),
should_record_access_log=True,
should_increment_ongoing_requests=False,
Expand All @@ -404,11 +407,19 @@ def _get_response_handler_info(
if version.parse(starlette.__version__) < version.parse("0.33.0"):
proxy_request.set_path(route_path.replace(route_prefix, "", 1))

# NOTE(edoakes): we use the route_prefix instead of the full HTTP path
# for logs & metrics to avoid high cardinality.
# See: https://github.com/ray-project/ray/issues/47999
logs_and_metrics_route = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker: We can probably refactor this into ProxyRequest and implement them in ASGIProxyRequest and gRPCProxyRequest differently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was highly confused at the fact that our "route" was just an app name for gRPC. the "method" field also seems to be getting overwritten to something else that actually looks like a route..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think it depends on the context. HTTP we have methods such as "GET", "POST"...etc. gRPC has concept of service methods so we used in those in metrics/ logs. Also, in gRPC we use the app name to route so that's why the equivalent route_path was set to app names.

route_prefix
if self.protocol == RequestProtocol.HTTP
else handle.deployment_id.app_name
)
internal_request_id = generate_request_id()
handle, request_id = self.setup_request_context_and_handle(
app_name=handle.deployment_id.app_name,
handle=handle,
route_path=route_path,
route=logs_and_metrics_route,
proxy_request=proxy_request,
internal_request_id=internal_request_id,
)
Expand All @@ -426,7 +437,7 @@ def _get_response_handler_info(
metadata=HandlerMetadata(
application_name=handle.deployment_id.app_name,
deployment_name=handle.deployment_id.name,
route=route_path,
route=logs_and_metrics_route,
),
should_record_access_log=True,
should_increment_ongoing_requests=True,
Expand Down Expand Up @@ -486,8 +497,8 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:
self.processing_latency_tracker.observe(
latency_ms,
tags={
"method": proxy_request.method,
"route": response_handler_info.metadata.route,
"method": proxy_request.method,
"application": response_handler_info.metadata.application_name,
"status_code": str(status.code),
},
Expand All @@ -496,18 +507,18 @@ async def proxy_request(self, proxy_request: ProxyRequest) -> ResponseGenerator:
self.request_error_counter.inc(
tags={
"route": response_handler_info.metadata.route,
"error_code": str(status.code),
"method": proxy_request.method,
"application": response_handler_info.metadata.application_name,
"error_code": str(status.code),
}
)
self.deployment_request_error_counter.inc(
tags={
"deployment": response_handler_info.metadata.deployment_name,
"error_code": str(status.code),
"method": proxy_request.method,
"route": response_handler_info.metadata.route,
"method": proxy_request.method,
"application": response_handler_info.metadata.application_name,
"error_code": str(status.code),
"deployment": response_handler_info.metadata.deployment_name,
}
)

Expand All @@ -516,7 +527,7 @@ def setup_request_context_and_handle(
self,
app_name: str,
handle: DeploymentHandle,
route_path: str,
route: str,
proxy_request: ProxyRequest,
internal_request_id: str,
) -> Tuple[DeploymentHandle, str]:
Expand Down Expand Up @@ -672,7 +683,7 @@ def setup_request_context_and_handle(
self,
app_name: str,
handle: DeploymentHandle,
route_path: str,
route: str,
proxy_request: ProxyRequest,
internal_request_id: str,
) -> Tuple[DeploymentHandle, str]:
Expand All @@ -694,7 +705,7 @@ def setup_request_context_and_handle(
)

request_context_info = {
"route": route_path,
"route": route,
"request_id": request_id,
"_internal_request_id": internal_request_id,
"app_name": app_name,
Expand Down Expand Up @@ -902,7 +913,7 @@ def setup_request_context_and_handle(
self,
app_name: str,
handle: DeploymentHandle,
route_path: str,
route: str,
proxy_request: ProxyRequest,
internal_request_id: str,
) -> Tuple[DeploymentHandle, str]:
Expand All @@ -912,7 +923,7 @@ def setup_request_context_and_handle(
handle.
"""
request_context_info = {
"route": route_path,
"route": route,
"app_name": app_name,
"_internal_request_id": internal_request_id,
"is_http_request": True,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/proxy_request_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def request_object(
self, receive_asgi_messages: Callable[[str], Awaitable[bytes]]
) -> StreamingHTTPRequest:
return StreamingHTTPRequest(
pickled_asgi_scope=pickle.dumps(self.scope),
asgi_scope=self.scope,
receive_asgi_messages=receive_asgi_messages,
)

Expand Down
67 changes: 57 additions & 10 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Tuple, Union

import starlette.responses
from starlette.types import ASGIApp

import ray
from ray import cloudpickle
Expand Down Expand Up @@ -57,6 +58,7 @@
from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher
from ray.serve._private.utils import get_component_file_name # noqa: F401
from ray.serve._private.utils import parse_import_path, wrap_to_ray_error
from ray.serve._private.vendored.get_asgi_route_name import get_asgi_route_name
from ray.serve._private.version import DeploymentVersion
from ray.serve.config import AutoscalingConfig
from ray.serve.deployment import Deployment
Expand Down Expand Up @@ -279,6 +281,10 @@ async def __init__(
self._user_callable_initialized_lock = asyncio.Lock()
self._initialization_latency: Optional[float] = None

# Will be populated with the wrapped ASGI app if the user callable is an
# `ASGIAppReplicaWrapper` (i.e., they are using the FastAPI integration).
self._user_callable_asgi_app: Optional[ASGIApp] = None

# Set metadata for logs and metrics.
# servable_object will be populated in `initialize_and_get_metadata`.
self._set_internal_replica_context(servable_object=None)
Expand Down Expand Up @@ -334,17 +340,45 @@ def get_num_ongoing_requests(self) -> int:
"""
return self._metrics_manager.get_num_ongoing_requests()

def _maybe_get_asgi_route(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
) -> Optional[str]:
"""Get the matched route string for ASGI apps to be used in logs & metrics.

If this replica does not wrap an ASGI app or there is no matching for the
request, returns the existing route from the request metadata.
"""
route = request_metadata.route
if (
request_metadata.is_http_request
and self._user_callable_asgi_app is not None
):
req: StreamingHTTPRequest = request_args[0]
matched_route = get_asgi_route_name(
self._user_callable_asgi_app, req.asgi_scope
)

# If there is no match in the ASGI app, don't overwrite the route_prefix
# from the proxy.
if matched_route is not None:
route = matched_route

return route

@contextmanager
def _wrap_user_method_call(self, request_metadata: RequestMetadata):
def _wrap_user_method_call(
self, request_metadata: RequestMetadata, request_args: Tuple[Any]
):
"""Context manager that wraps user method calls.

1) Sets the request context var with appropriate metadata.
2) Records the access log message (if not disabled).
3) Records per-request metrics via the metrics manager.
"""
route = self._maybe_get_asgi_route(request_metadata, request_args)
ray.serve.context._serve_request_context.set(
ray.serve.context._RequestContext(
route=request_metadata.route,
route=route,
request_id=request_metadata.request_id,
app_name=self._deployment_id.app_name,
multiplexed_model_id=request_metadata.multiplexed_model_id,
Expand Down Expand Up @@ -384,7 +418,7 @@ def _wrap_user_method_call(self, request_metadata: RequestMetadata):
extra={"serve_access_log": True},
)
self._metrics_manager.record_request_metrics(
route=request_metadata.route,
route=route,
status_str=status_str,
latency_ms=latency_ms,
was_error=user_exception is not None,
Expand Down Expand Up @@ -468,7 +502,7 @@ async def handle_request(
) -> Tuple[bytes, Any]:
"""Entrypoint for `stream=False` calls."""
request_metadata = pickle.loads(pickled_request_metadata)
with self._wrap_user_method_call(request_metadata):
with self._wrap_user_method_call(request_metadata, request_args):
return await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
Expand All @@ -481,7 +515,7 @@ async def handle_request_streaming(
) -> AsyncGenerator[Any, None]:
"""Generator that is the entrypoint for all `stream=True` handle calls."""
request_metadata = pickle.loads(pickled_request_metadata)
with self._wrap_user_method_call(request_metadata):
with self._wrap_user_method_call(request_metadata, request_args):
async for result in self._call_user_generator(
request_metadata,
request_args,
Expand Down Expand Up @@ -523,7 +557,7 @@ async def handle_request_with_rejection(
)
return

with self._wrap_user_method_call(request_metadata):
with self._wrap_user_method_call(request_metadata, request_args):
yield pickle.dumps(
ReplicaQueueLengthInfo(
accepted=True,
Expand Down Expand Up @@ -564,7 +598,7 @@ async def handle_request_from_java(
multiplexed_model_id=proto.multiplexed_model_id,
route=proto.route,
)
with self._wrap_user_method_call(request_metadata):
with self._wrap_user_method_call(request_metadata, request_args):
return await self._user_callable_wrapper.call_user_method(
request_metadata, request_args, request_kwargs
)
Expand Down Expand Up @@ -611,7 +645,9 @@ async def initialize_and_get_metadata(
async with self._user_callable_initialized_lock:
initialization_start_time = time.time()
if not self._user_callable_initialized:
await self._user_callable_wrapper.initialize_callable()
self._user_callable_asgi_app = (
await self._user_callable_wrapper.initialize_callable()
)
self._user_callable_initialized = True
self._set_internal_replica_context(
servable_object=self._user_callable_wrapper.user_callable
Expand Down Expand Up @@ -881,7 +917,12 @@ def user_callable(self) -> Optional[Callable]:
return self._callable

@_run_on_user_code_event_loop
async def initialize_callable(self):
async def initialize_callable(self) -> Optional[ASGIApp]:
"""Initialize the user callable.

If the callable is an ASGI app wrapper (e.g., using @serve.ingress), returns
the ASGI app object, which may be used *read only* by the caller.
"""
if self._callable is not None:
raise RuntimeError("initialize_callable should only be called once.")

Expand Down Expand Up @@ -920,6 +961,12 @@ async def initialize_callable(self):
extra={"log_to_stderr": False},
)

return (
self._callable.app
if isinstance(self._callable, ASGIAppReplicaWrapper)
else None
)

@_run_on_user_code_event_loop
async def _call_user_health_check(self):
await self._call_func_or_gen(self._user_health_check)
Expand Down Expand Up @@ -979,7 +1026,7 @@ def _prepare_args_for_http_request(

The returned `receive_task` should be cancelled when the user method exits.
"""
scope = pickle.loads(request.pickled_asgi_scope)
scope = request.asgi_scope
receive = ASGIReceiveProxy(
scope,
request_metadata,
Expand Down
Empty file.
Loading