diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 9a3104e9eff8..2cd1fab75796 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -445,6 +445,12 @@ def env_set_by_user(key): # tasks. DEFAULT_TASK_MAX_RETRIES = 3 +# Default max_concurrency option in @ray.remote for threaded actors. +DEFAULT_MAX_CONCURRENCY_THREADED = 1 + +# Default max_concurrency option in @ray.remote for async actors. +DEFAULT_MAX_CONCURRENCY_ASYNC = 1000 + # Prefix for namespaces which are used internally by ray. # Jobs within these namespaces should be hidden from users # and should not be considered user activity. diff --git a/python/ray/actor.py b/python/ray/actor.py index 488543c27e08..222f52c24b5f 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -965,7 +965,11 @@ def _remote(self, args=None, kwargs=None, **actor_options): is_asyncio = has_async_methods(meta.modified_class) if actor_options.get("max_concurrency") is None: - actor_options["max_concurrency"] = 1000 if is_asyncio else 1 + actor_options["max_concurrency"] = ( + ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC + if is_asyncio + else ray_constants.DEFAULT_MAX_CONCURRENCY_THREADED + ) if client_mode_should_convert(): return client_mode_convert_actor(self, args, kwargs, **actor_options) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index ca7d52097c2c..ca0fb2d446c6 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -13,6 +13,7 @@ import ray from ray import ObjectRef, cloudpickle +from ray._private import ray_constants from ray.actor import ActorHandle from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError from ray.serve import metrics @@ -474,6 +475,18 @@ def start(self, deployment_info: DeploymentInfo) -> ReplicaSchedulingRequest: } actor_options.update(deployment_info.replica_config.ray_actor_options) + # A replica's default `max_concurrency` value can prevent it from + # respecting the configured `max_ongoing_requests`. To avoid this + # unintentional behavior, use `max_ongoing_requests` to override + # the Actor's `max_concurrency` if it is larger. + if ( + deployment_info.deployment_config.max_ongoing_requests + > ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC + ): + actor_options[ + "max_concurrency" + ] = deployment_info.deployment_config.max_ongoing_requests + return ReplicaSchedulingRequest( replica_id=self.replica_id, actor_def=actor_def, diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index 8cddd0cbb371..dfeb9fc7524c 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -5,6 +5,7 @@ import pytest +from ray._private.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC from ray.serve._private.autoscaling_state import AutoscalingStateManager from ray.serve._private.common import ( DeploymentHandleSource, @@ -284,6 +285,7 @@ def deployment_info( info = DeploymentInfo( version=version, start_time_ms=0, + actor_name="abc", deployment_config=DeploymentConfig( num_replicas=num_replicas, user_config=user_config, **config_opts ), @@ -2914,6 +2916,23 @@ def test_default_value(self): assert actor_replica.health_check_period_s == DEFAULT_HEALTH_CHECK_PERIOD_S assert actor_replica.health_check_timeout_s == DEFAULT_HEALTH_CHECK_TIMEOUT_S + def test_max_concurrency_override(self): + actor_replica = ActorReplicaWrapper( + version=deployment_version("1"), + replica_id=ReplicaID( + "abc123", + deployment_id=DeploymentID(name="test_deployment", app_name="test_app"), + ), + ) + max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1 + d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests) + replica_scheduling_request = actor_replica.start(d_info) + assert ( + "max_concurrency" in replica_scheduling_request.actor_options + and replica_scheduling_request.actor_options["max_concurrency"] + == max_ongoing_requests + ) + def test_get_active_node_ids(mock_deployment_state_manager): """Test get_active_node_ids() are collecting the correct node ids