Skip to content

Commit

Permalink
[Serve] Modify max_concurrency in actor options to respect `max_ong…
Browse files Browse the repository at this point in the history
…oing_requests` (ray-project#47681) (ray-project#48274)

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->
This PR modifies the actor_options used when deploying replicas.
Deployment will use the configured `max_ongoing_requests` attribute of
the deployment config as the replica's `max_concurrency` if
the concurrency is not explicitly set. This is to prevent replica's
`max_concurrency` from capping
`max_ongoing_requests`.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->
Closes ray-project#47681

Signed-off-by: akyang-anyscale <[email protected]>
Signed-off-by: mohitjain2504 <[email protected]>
  • Loading branch information
akyang-anyscale authored and mohitjain2504 committed Nov 15, 2024
1 parent ad80735 commit 230ce9d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 1 deletion.
6 changes: 6 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ def env_set_by_user(key):
# tasks.
DEFAULT_TASK_MAX_RETRIES = 3

# Default max_concurrency option in @ray.remote for threaded actors.
DEFAULT_MAX_CONCURRENCY_THREADED = 1

# Default max_concurrency option in @ray.remote for async actors.
DEFAULT_MAX_CONCURRENCY_ASYNC = 1000

# Prefix for namespaces which are used internally by ray.
# Jobs within these namespaces should be hidden from users
# and should not be considered user activity.
Expand Down
6 changes: 5 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,11 @@ def _remote(self, args=None, kwargs=None, **actor_options):
is_asyncio = has_async_methods(meta.modified_class)

if actor_options.get("max_concurrency") is None:
actor_options["max_concurrency"] = 1000 if is_asyncio else 1
actor_options["max_concurrency"] = (
ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC
if is_asyncio
else ray_constants.DEFAULT_MAX_CONCURRENCY_THREADED
)

if client_mode_should_convert():
return client_mode_convert_actor(self, args, kwargs, **actor_options)
Expand Down
13 changes: 13 additions & 0 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import ray
from ray import ObjectRef, cloudpickle
from ray._private import ray_constants
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError
from ray.serve import metrics
Expand Down Expand Up @@ -474,6 +475,18 @@ def start(self, deployment_info: DeploymentInfo) -> ReplicaSchedulingRequest:
}
actor_options.update(deployment_info.replica_config.ray_actor_options)

# A replica's default `max_concurrency` value can prevent it from
# respecting the configured `max_ongoing_requests`. To avoid this
# unintentional behavior, use `max_ongoing_requests` to override
# the Actor's `max_concurrency` if it is larger.
if (
deployment_info.deployment_config.max_ongoing_requests
> ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC
):
actor_options[
"max_concurrency"
] = deployment_info.deployment_config.max_ongoing_requests

return ReplicaSchedulingRequest(
replica_id=self.replica_id,
actor_def=actor_def,
Expand Down
19 changes: 19 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

from ray._private.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC
from ray.serve._private.autoscaling_state import AutoscalingStateManager
from ray.serve._private.common import (
DeploymentHandleSource,
Expand Down Expand Up @@ -284,6 +285,7 @@ def deployment_info(
info = DeploymentInfo(
version=version,
start_time_ms=0,
actor_name="abc",
deployment_config=DeploymentConfig(
num_replicas=num_replicas, user_config=user_config, **config_opts
),
Expand Down Expand Up @@ -2914,6 +2916,23 @@ def test_default_value(self):
assert actor_replica.health_check_period_s == DEFAULT_HEALTH_CHECK_PERIOD_S
assert actor_replica.health_check_timeout_s == DEFAULT_HEALTH_CHECK_TIMEOUT_S

def test_max_concurrency_override(self):
actor_replica = ActorReplicaWrapper(
version=deployment_version("1"),
replica_id=ReplicaID(
"abc123",
deployment_id=DeploymentID(name="test_deployment", app_name="test_app"),
),
)
max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1
d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests)
replica_scheduling_request = actor_replica.start(d_info)
assert (
"max_concurrency" in replica_scheduling_request.actor_options
and replica_scheduling_request.actor_options["max_concurrency"]
== max_ongoing_requests
)


def test_get_active_node_ids(mock_deployment_state_manager):
"""Test get_active_node_ids() are collecting the correct node ids
Expand Down

0 comments on commit 230ce9d

Please sign in to comment.