From 9a4533426adcc20f18ff8f89a35b0e7685ba88ce Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 8 Nov 2024 10:51:00 -0800 Subject: [PATCH 1/4] refactor Signed-off-by: Cindy Zhang --- python/ray/serve/_private/default_impl.py | 4 +- python/ray/serve/_private/router.py | 51 +++++++++++++++++++-- python/ray/serve/_private/utils.py | 56 ++++------------------- 3 files changed, 56 insertions(+), 55 deletions(-) diff --git a/python/ray/serve/_private/default_impl.py b/python/ray/serve/_private/default_impl.py index 3b2abf6e0829..45314de071a8 100644 --- a/python/ray/serve/_private/default_impl.py +++ b/python/ray/serve/_private/default_impl.py @@ -27,7 +27,7 @@ get_current_actor_id, get_head_node_id, inside_ray_client_context, - resolve_request_args, + resolve_deployment_response, ) # NOTE: Please read carefully before changing! @@ -124,7 +124,7 @@ def create_router( not is_inside_ray_client_context and RAY_SERVE_ENABLE_STRICT_MAX_ONGOING_REQUESTS ), - resolve_request_args_func=resolve_request_args, + resolve_request_arg_func=resolve_deployment_response, ) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 2b83ff2d86d7..b29479a9f9cf 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -8,7 +8,7 @@ from collections import defaultdict from contextlib import contextmanager from functools import partial -from typing import Any, Coroutine, DefaultDict, List, Optional, Tuple, Union +from typing import Any, Coroutine, DefaultDict, Dict, List, Optional, Tuple, Union import ray from ray.actor import ActorHandle @@ -31,7 +31,7 @@ from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher from ray.serve._private.replica_result import ReplicaResult from ray.serve._private.replica_scheduler import PendingRequest, ReplicaScheduler -from ray.serve._private.utils import resolve_request_args +from ray.serve._private.utils import resolve_deployment_response from ray.serve.config import AutoscalingConfig from ray.serve.exceptions import BackPressureError from ray.util import metrics @@ -342,7 +342,7 @@ def __init__( event_loop: asyncio.BaseEventLoop, replica_scheduler: Optional[ReplicaScheduler], enable_strict_max_ongoing_requests: bool, - resolve_request_args_func: Coroutine = resolve_request_args, + resolve_request_arg_func: Coroutine = resolve_deployment_response, ): """Used to assign requests to downstream replicas for a deployment. @@ -355,7 +355,7 @@ def __init__( self._enable_strict_max_ongoing_requests = enable_strict_max_ongoing_requests self._replica_scheduler: ReplicaScheduler = replica_scheduler - self._resolve_request_args = resolve_request_args_func + self._resolve_request_arg_func = resolve_request_arg_func # Flipped to `True` once the router has received a non-empty # replica set at least once. @@ -429,6 +429,47 @@ def update_deployment_config(self, deployment_config: DeploymentConfig): curr_num_replicas=len(self._replica_scheduler.curr_replicas), ) + async def _resolve_request_arguments( + self, request_args: Tuple[Any], request_kwargs: Dict[str, Any] + ) -> Tuple[Tuple[Any], Dict[str, Any]]: + """Replaces top-level `DeploymentResponse` objects with resolved object refs. + + This enables composition without explicitly calling `_to_object_ref`. + """ + + new_args = list(request_args) + new_kwargs = request_kwargs.copy() + + # Map from index -> task to resolve positional arg + resolve_arg_tasks = {} + for i, obj in enumerate(request_args): + task = self._resolve_request_arg_func(obj) + if task is not None: + resolve_arg_tasks[i] = task + + # Map from key -> task to resolve key-word arg + resolve_kwarg_tasks = {} + for k, obj in request_kwargs.items(): + task = self._resolve_request_arg_func(obj) + if task is not None: + resolve_kwarg_tasks[k] = task + + # Gather all argument resolution tasks concurrently. + if resolve_arg_tasks or resolve_kwarg_tasks: + all_tasks = list(resolve_arg_tasks.values()) + list( + resolve_kwarg_tasks.values() + ) + await asyncio.wait(all_tasks) + + # Update new args and new kwargs with resolved object refs + for index, task in resolve_arg_tasks.items(): + new_args[index] = task.result() + for key, task in resolve_kwarg_tasks.items(): + new_kwargs[key] = task.result() + + # Return new args and new kwargs + return new_args, new_kwargs + def _process_finished_request( self, replica_id: ReplicaID, @@ -548,7 +589,7 @@ async def assign_request( replica_result = None try: - request_args, request_kwargs = await self._resolve_request_args( + request_args, request_kwargs = await self._resolve_request_arguments( request_args, request_kwargs ) replica_result, replica_id = await self.schedule_and_send_request( diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index b57b8518b22f..ee25dbe004ce 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -39,13 +39,13 @@ np = None MESSAGE_PACK_OFFSET = 9 + GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR = RuntimeError( "Streaming deployment handle results cannot be passed to " "downstream handle calls. If you have a use case requiring " "this feature, please file a feature request on GitHub." ) - # Use a global singleton enum to emulate default options. We cannot use None # for those option because None is a valid new value. class DEFAULT(Enum): @@ -594,52 +594,12 @@ def validate_route_prefix(route_prefix: Union[DEFAULT, None, str]): ) -async def resolve_request_args( - request_args: Tuple[Any], request_kwargs: Dict[str, Any] -) -> Tuple[Tuple[Any], Dict[str, Any]]: - """Replaces top-level `DeploymentResponse` objects with resolved object refs. - - This enables composition without explicitly calling `_to_object_ref`. - """ +def resolve_deployment_response(obj: Any): from ray.serve.handle import DeploymentResponse, DeploymentResponseGenerator - new_args = [None for _ in range(len(request_args))] - new_kwargs = {} - - arg_tasks = [] - response_indices = [] - for i, obj in enumerate(request_args): - if isinstance(obj, DeploymentResponseGenerator): - raise GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR - elif isinstance(obj, DeploymentResponse): - # Launch async task to convert DeploymentResponse to an object ref, and - # keep track of the argument index in the original `request_args` - response_indices.append(i) - arg_tasks.append(asyncio.create_task(obj._to_object_ref())) - else: - new_args[i] = obj - - kwarg_tasks = [] - response_keys = [] - for k, obj in request_kwargs.items(): - if isinstance(obj, DeploymentResponseGenerator): - raise GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR - elif isinstance(obj, DeploymentResponse): - # Launch async task to convert DeploymentResponse to an object ref, and - # keep track of the corresponding key in the original `request_kwargs` - response_keys.append(k) - kwarg_tasks.append(asyncio.create_task(obj._to_object_ref())) - else: - new_kwargs[k] = obj - - # Gather `DeploymentResponse` object refs concurrently. - arg_obj_refs = await asyncio.gather(*arg_tasks) - kwarg_obj_refs = await asyncio.gather(*kwarg_tasks) - - # Update new args and new kwargs with resolved object refs - for index, obj_ref in zip(response_indices, arg_obj_refs): - new_args[index] = obj_ref - new_kwargs.update((zip(response_keys, kwarg_obj_refs))) - - # Return new args and new kwargs - return new_args, new_kwargs + if isinstance(obj, DeploymentResponseGenerator): + raise GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR + elif isinstance(obj, DeploymentResponse): + # Launch async task to convert DeploymentResponse to an object ref, and + # keep track of the argument index in the original `request_args` + return asyncio.create_task(obj._to_object_ref()) From 5d0635bfab6643d5468a5df685e62c1f7063a4e9 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 8 Nov 2024 10:54:54 -0800 Subject: [PATCH 2/4] clean up Signed-off-by: Cindy Zhang --- python/ray/serve/_private/router.py | 10 +++------- python/ray/serve/_private/utils.py | 6 +++++- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index b29479a9f9cf..6d68596faa53 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -432,25 +432,21 @@ def update_deployment_config(self, deployment_config: DeploymentConfig): async def _resolve_request_arguments( self, request_args: Tuple[Any], request_kwargs: Dict[str, Any] ) -> Tuple[Tuple[Any], Dict[str, Any]]: - """Replaces top-level `DeploymentResponse` objects with resolved object refs. - - This enables composition without explicitly calling `_to_object_ref`. - """ - + """Asynchronously resolve and replace top-level request args and kwargs.""" new_args = list(request_args) new_kwargs = request_kwargs.copy() # Map from index -> task to resolve positional arg resolve_arg_tasks = {} for i, obj in enumerate(request_args): - task = self._resolve_request_arg_func(obj) + task = await self._resolve_request_arg_func(obj) if task is not None: resolve_arg_tasks[i] = task # Map from key -> task to resolve key-word arg resolve_kwarg_tasks = {} for k, obj in request_kwargs.items(): - task = self._resolve_request_arg_func(obj) + task = await self._resolve_request_arg_func(obj) if task is not None: resolve_kwarg_tasks[k] = task diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index ee25dbe004ce..74b34e897ad5 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -594,7 +594,11 @@ def validate_route_prefix(route_prefix: Union[DEFAULT, None, str]): ) -def resolve_deployment_response(obj: Any): +async def resolve_deployment_response(obj: Any): + """Resolve `DeploymentResponse` objects to underlying object references. + + This enables composition without explicitly calling `_to_object_ref`. + """ from ray.serve.handle import DeploymentResponse, DeploymentResponseGenerator if isinstance(obj, DeploymentResponseGenerator): From 2aa57a7791201615c42a905c037b330b65e62b73 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 8 Nov 2024 11:18:15 -0800 Subject: [PATCH 3/4] lint Signed-off-by: Cindy Zhang --- python/ray/serve/_private/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 74b34e897ad5..7dbb8aa0b914 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -12,7 +12,7 @@ from decimal import ROUND_HALF_UP, Decimal from enum import Enum from functools import wraps -from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Dict, List, Optional, TypeVar, Union import requests @@ -39,13 +39,13 @@ np = None MESSAGE_PACK_OFFSET = 9 - GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR = RuntimeError( "Streaming deployment handle results cannot be passed to " "downstream handle calls. If you have a use case requiring " "this feature, please file a feature request on GitHub." ) + # Use a global singleton enum to emulate default options. We cannot use None # for those option because None is a valid new value. class DEFAULT(Enum): From 778dafd0698f0bc23992af6b87abfb6672e6697b Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 8 Nov 2024 11:45:52 -0800 Subject: [PATCH 4/4] fix comments Signed-off-by: Cindy Zhang --- python/ray/serve/_private/router.py | 6 +++--- python/ray/serve/_private/utils.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index 6d68596faa53..9cd8c10f5f82 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -436,14 +436,14 @@ async def _resolve_request_arguments( new_args = list(request_args) new_kwargs = request_kwargs.copy() - # Map from index -> task to resolve positional arg + # Map from index -> task for resolving positional arg resolve_arg_tasks = {} for i, obj in enumerate(request_args): task = await self._resolve_request_arg_func(obj) if task is not None: resolve_arg_tasks[i] = task - # Map from key -> task to resolve key-word arg + # Map from key -> task for resolving key-word arg resolve_kwarg_tasks = {} for k, obj in request_kwargs.items(): task = await self._resolve_request_arg_func(obj) @@ -457,7 +457,7 @@ async def _resolve_request_arguments( ) await asyncio.wait(all_tasks) - # Update new args and new kwargs with resolved object refs + # Update new args and new kwargs with resolved arguments for index, task in resolve_arg_tasks.items(): new_args[index] = task.result() for key, task in resolve_kwarg_tasks.items(): diff --git a/python/ray/serve/_private/utils.py b/python/ray/serve/_private/utils.py index 7dbb8aa0b914..1193f7722b63 100644 --- a/python/ray/serve/_private/utils.py +++ b/python/ray/serve/_private/utils.py @@ -604,6 +604,5 @@ async def resolve_deployment_response(obj: Any): if isinstance(obj, DeploymentResponseGenerator): raise GENERATOR_COMPOSITION_NOT_SUPPORTED_ERROR elif isinstance(obj, DeploymentResponse): - # Launch async task to convert DeploymentResponse to an object ref, and - # keep track of the argument index in the original `request_args` + # Launch async task to convert DeploymentResponse to an object ref return asyncio.create_task(obj._to_object_ref())