diff --git a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py index 96aec03744b..0bf61b31112 100644 --- a/python/ray/autoscaler/_private/kuberay/autoscaling_config.py +++ b/python/ray/autoscaler/_private/kuberay/autoscaling_config.py @@ -219,7 +219,7 @@ def _node_type_from_group_spec( resources = _get_ray_resources_from_group_spec(group_spec, is_head) - return { + node_type = { "min_workers": min_workers, "max_workers": max_workers, # `node_config` is a legacy field required for compatibility. @@ -228,6 +228,12 @@ def _node_type_from_group_spec( "resources": resources, } + idle_timeout_s = group_spec.get(IDLE_SECONDS_KEY) + if idle_timeout_s is not None: + node_type["idle_timeout_s"] = float(idle_timeout_s) + + return node_type + def _get_ray_resources_from_group_spec( group_spec: Dict[str, Any], is_head: bool diff --git a/python/ray/autoscaler/ray-schema.json b/python/ray/autoscaler/ray-schema.json index ad5da68ea2a..2e07dadac91 100644 --- a/python/ray/autoscaler/ray-schema.json +++ b/python/ray/autoscaler/ray-schema.json @@ -345,6 +345,7 @@ }, "min_workers": {"type": "integer"}, "max_workers": {"type": "integer"}, + "idle_timeout_s": {"type": "number", "nullable": true}, "resources": { "type": "object", "patternProperties": { diff --git a/python/ray/autoscaler/v2/instance_manager/config.py b/python/ray/autoscaler/v2/instance_manager/config.py index c9597eef6c7..a7e582eacdb 100644 --- a/python/ray/autoscaler/v2/instance_manager/config.py +++ b/python/ray/autoscaler/v2/instance_manager/config.py @@ -128,6 +128,8 @@ class NodeTypeConfig: min_worker_nodes: int # The maximal number of worker nodes can be launched for this node type. max_worker_nodes: int + # Idle timeout seconds for worker nodes of this node type. + idle_timeout_s: Optional[float] = None # The total resources on the node. resources: Dict[str, float] = field(default_factory=dict) # The labels on the node. @@ -346,6 +348,7 @@ def get_node_type_configs(self) -> Dict[NodeType, NodeTypeConfig]: name=node_type, min_worker_nodes=node_config.get("min_workers", 0), max_worker_nodes=max_workers_nodes, + idle_timeout_s=node_config.get("idle_timeout_s", None), resources=node_config.get("resources", {}), labels=node_config.get("labels", {}), launch_config_hash=launch_config_hash, diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 3732a628263..2d5a7006506 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -1584,6 +1584,11 @@ def _enforce_idle_termination( continue idle_timeout_s = ctx.get_idle_timeout_s() + # Override the scheduler idle_timeout_s if set for this node_type. + node_type = node.node_type + if node_type in node_type_configs: + if node_type_configs[node_type].idle_timeout_s is not None: + idle_timeout_s = node_type_configs[node_type].idle_timeout_s if idle_timeout_s is None: # No idle timeout is set, skip the idle termination. continue @@ -1606,7 +1611,6 @@ def _enforce_idle_termination( # Honor the min_worker_nodes setting for the node type. min_count = 0 - node_type = node.node_type if node_type in node_type_configs: min_count = node_type_configs[node_type].min_worker_nodes if ( diff --git a/python/ray/autoscaler/v2/tests/test_scheduler.py b/python/ray/autoscaler/v2/tests/test_scheduler.py index e6d6cb71978..3a188bdaf2c 100644 --- a/python/ray/autoscaler/v2/tests/test_scheduler.py +++ b/python/ray/autoscaler/v2/tests/test_scheduler.py @@ -1434,6 +1434,82 @@ def test_idle_termination_with_min_worker(min_workers): assert len(to_terminate) == 0 +@pytest.mark.parametrize("node_type_idle_timeout_s", [1, 2, 10]) +def test_idle_termination_with_node_type_idle_timeout(node_type_idle_timeout_s): + """ + Test that idle nodes are terminated when idle_timeout_s is set for node type. + """ + scheduler = ResourceDemandScheduler(event_logger) + + node_type_configs = { + "type_cpu_with_idle_timeout": NodeTypeConfig( + name="type_cpu", + resources={"CPU": 1}, + min_worker_nodes=0, + max_worker_nodes=5, + idle_timeout_s=node_type_idle_timeout_s, + launch_config_hash="hash1", + ), + } + + idle_time_s = 5 + constraints = [] + + request = sched_request( + node_type_configs=node_type_configs, + instances=[ + make_autoscaler_instance( + im_instance=Instance( + instance_type="type_cpu_with_idle_timeout", + status=Instance.RAY_RUNNING, + launch_config_hash="hash1", + instance_id="i-1", + node_id="r-1", + ), + ray_node=NodeState( + node_id=b"r-1", + ray_node_type_name="type_cpu_with_idle_timeout", + available_resources={"CPU": 0}, + total_resources={"CPU": 1}, + idle_duration_ms=0, # Non idle + status=NodeStatus.RUNNING, + ), + cloud_instance_id="c-1", + ), + make_autoscaler_instance( + im_instance=Instance( + instance_id="i-2", + instance_type="type_cpu_with_idle_timeout", + status=Instance.RAY_RUNNING, + launch_config_hash="hash1", + node_id="r-2", + ), + ray_node=NodeState( + ray_node_type_name="type_cpu_with_idle_timeout", + node_id=b"r-2", + available_resources={"CPU": 1}, + total_resources={"CPU": 1}, + idle_duration_ms=idle_time_s * 1000, + status=NodeStatus.IDLE, + ), + cloud_instance_id="c-2", + ), + ], + # Set autoscaler idle_timeout_s to a value greater than + # node_type_idle_timeout_s and idle_time_s. + idle_timeout_s=idle_time_s * 1000, + cluster_resource_constraints=constraints, + ) + + reply = scheduler.schedule(request) + _, to_terminate = _launch_and_terminate(reply) + if node_type_idle_timeout_s <= idle_time_s: + assert len(to_terminate) == 1 + assert to_terminate == [("i-2", "r-2", TerminationRequest.Cause.IDLE)] + else: + assert len(to_terminate) == 0 + + def test_gang_scheduling(): """ Test that gang scheduling works.