diff --git a/src/slurm_plugin/common.py b/src/slurm_plugin/common.py index dbf17a79f..0647052d2 100644 --- a/src/slurm_plugin/common.py +++ b/src/slurm_plugin/common.py @@ -14,6 +14,7 @@ import logging from concurrent.futures import Future from datetime import datetime +from enum import Enum from typing import Callable, Optional, Protocol, TypedDict from common.utils import check_command_output, time_is_up, validate_absolute_path @@ -34,6 +35,23 @@ ) +class ScalingStrategy(Enum): + ALL_OR_NOTHING = "all-or-nothing" + BEST_EFFORT = "best-effort" + + @classmethod + def _missing_(cls, strategy): + # Ref: https://docs.python.org/3/library/enum.html#enum.Enum._missing_ + _strategy = str(strategy).lower() + for member in cls: + if member.value == _strategy: + return member + return cls.ALL_OR_NOTHING # Default to All-Or-Nothing + + def __str__(self): + return str(self.value) + + class TaskController(Protocol): class TaskShutdownError(RuntimeError): """Exception raised if shutdown has been requested.""" diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index 1a99df3bd..d073fe5a2 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -25,7 +25,7 @@ from common.ec2_utils import get_private_ip_address_and_dns_name from common.schedulers.slurm_commands import get_nodes_info, update_nodes from common.utils import grouper, setup_logging_filter -from slurm_plugin.common import ComputeInstanceDescriptor, log_exception, print_with_count +from slurm_plugin.common import ComputeInstanceDescriptor, ScalingStrategy, log_exception, print_with_count from slurm_plugin.fleet_manager import EC2Instance, FleetManagerFactory from slurm_plugin.slurm_resources import ( EC2_HEALTH_STATUS_UNHEALTHY_STATES, @@ -165,7 +165,7 @@ def add_instances( node_list: List[str], launch_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = False, + scaling_strategy: ScalingStrategy = ScalingStrategy.BEST_EFFORT, slurm_resume: Dict[str, any] = None, assign_node_batch_size: int = None, terminate_batch_size: int = None, @@ -531,7 +531,7 @@ def add_instances( node_list: List[str], launch_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = False, + scaling_strategy: ScalingStrategy = ScalingStrategy.BEST_EFFORT, slurm_resume: Dict[str, any] = None, assign_node_batch_size: int = None, terminate_batch_size: int = None, @@ -550,7 +550,7 @@ def add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) else: logger.error( @@ -563,7 +563,7 @@ def add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) self._terminate_unassigned_launched_instances(terminate_batch_size) @@ -574,7 +574,7 @@ def _scaling_for_jobs( launch_batch_size: int, assign_node_batch_size: int, update_node_address: bool, - all_or_nothing_batch: bool, + scaling_strategy: ScalingStrategy, ) -> None: """Scaling for job list.""" # Setup custom logging filter @@ -591,7 +591,7 @@ def _scaling_for_jobs( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) def _terminate_unassigned_launched_instances(self, terminate_batch_size): @@ -616,7 +616,7 @@ def _scaling_for_jobs_single_node( launch_batch_size: int, assign_node_batch_size: int, update_node_address: bool, - all_or_nothing_batch: bool, + scaling_strategy: ScalingStrategy, ) -> None: """Scaling for job single node list.""" if job_list: @@ -627,7 +627,7 @@ def _scaling_for_jobs_single_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) else: # Batch all single node jobs in a single best-effort EC2 launch request @@ -639,7 +639,7 @@ def _scaling_for_jobs_single_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=False, + scaling_strategy=ScalingStrategy.BEST_EFFORT, ) def _add_instances_for_resume_file( @@ -649,7 +649,7 @@ def _add_instances_for_resume_file( launch_batch_size: int, assign_node_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = False, + scaling_strategy: ScalingStrategy = ScalingStrategy.BEST_EFFORT, ): """Launch requested EC2 instances for resume file.""" slurm_resume_data = self._get_slurm_resume_data(slurm_resume=slurm_resume, node_list=node_list) @@ -663,7 +663,7 @@ def _add_instances_for_resume_file( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) self._scaling_for_jobs_multi_node( @@ -673,7 +673,7 @@ def _add_instances_for_resume_file( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) def _scaling_for_jobs_multi_node( @@ -683,7 +683,7 @@ def _scaling_for_jobs_multi_node( launch_batch_size, assign_node_batch_size, update_node_address, - all_or_nothing_batch: bool, + scaling_strategy: ScalingStrategy, ): # Optimize job level scaling with preliminary scale-all nodes attempt self._update_dict( @@ -691,7 +691,7 @@ def _scaling_for_jobs_multi_node( self._launch_instances( nodes_to_launch=self._parse_nodes_resume_list(node_list), launch_batch_size=launch_batch_size, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ), ) @@ -700,7 +700,7 @@ def _scaling_for_jobs_multi_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[str]) -> SlurmResumeData: @@ -835,7 +835,7 @@ def _add_instances_for_nodes( launch_batch_size: int, assign_node_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = True, + scaling_strategy: ScalingStrategy = ScalingStrategy.ALL_OR_NOTHING, node_list: List[str] = None, job: SlurmResumeJob = None, ): @@ -858,7 +858,7 @@ def _add_instances_for_nodes( job=job if job else None, nodes_to_launch=nodes_resume_mapping, launch_batch_size=launch_batch_size, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) # instances launched, e.g. # { @@ -873,7 +873,7 @@ def _add_instances_for_nodes( q_cr_instances_launched_length = len(instances_launched.get(queue, {}).get(compute_resource, [])) successful_launched_nodes += slurm_node_list[:q_cr_instances_launched_length] failed_launch_nodes += slurm_node_list[q_cr_instances_launched_length:] - if all_or_nothing_batch: + if scaling_strategy == ScalingStrategy.ALL_OR_NOTHING: self.all_or_nothing_node_assignment( assign_node_batch_size=assign_node_batch_size, instances_launched=instances_launched, @@ -992,7 +992,7 @@ def _launch_instances( self, nodes_to_launch: Dict[str, any], launch_batch_size: int, - all_or_nothing_batch: bool, + scaling_strategy: ScalingStrategy, job: SlurmResumeJob = None, ): instances_launched = defaultdict(lambda: defaultdict(list)) @@ -1008,9 +1008,13 @@ def _launch_instances( if slurm_node_list: logger.info( "Launching %s instances for nodes %s", - "all-or-nothing" if all_or_nothing_batch else "best-effort", + scaling_strategy, print_with_count(slurm_node_list), ) + # At instance launch level, the various scaling strategies can be grouped based on the actual + # launch behaviour i.e. all-or-nothing or best-effort + all_or_nothing_batch = scaling_strategy in [ScalingStrategy.ALL_OR_NOTHING] + fleet_manager = self._get_fleet_manager(all_or_nothing_batch, compute_resource, queue) for batch_nodes in grouper(slurm_node_list, launch_batch_size): @@ -1203,7 +1207,8 @@ def add_instances( node_list: List[str], launch_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = False, + # Default to BEST_EFFORT since clustermgtd is not yet adapted for Job Level Scaling + scaling_strategy: ScalingStrategy = ScalingStrategy.BEST_EFFORT, slurm_resume: Dict[str, any] = None, assign_node_batch_size: int = None, terminate_batch_size: int = None, @@ -1217,7 +1222,7 @@ def add_instances( node_list=node_list, launch_batch_size=launch_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) def _add_instances_for_nodes( @@ -1225,9 +1230,13 @@ def _add_instances_for_nodes( node_list: List[str], launch_batch_size: int, update_node_address: bool = True, - all_or_nothing_batch: bool = False, + scaling_strategy: ScalingStrategy = ScalingStrategy.BEST_EFFORT, ): """Launch requested EC2 instances for nodes.""" + # At fleet management level, the scaling strategies can be grouped based on the actual + # launch behaviour i.e. all-or-nothing or best-effort + all_or_nothing_batch = scaling_strategy in [ScalingStrategy.ALL_OR_NOTHING] + nodes_to_launch = self._parse_nodes_resume_list(node_list) for queue, compute_resources in nodes_to_launch.items(): for compute_resource, slurm_node_list in compute_resources.items(): diff --git a/src/slurm_plugin/resume.py b/src/slurm_plugin/resume.py index 8f77a76be..bb9e9a993 100644 --- a/src/slurm_plugin/resume.py +++ b/src/slurm_plugin/resume.py @@ -21,7 +21,7 @@ from common.schedulers.slurm_commands import get_nodes_info, set_nodes_down from common.utils import read_json from slurm_plugin.cluster_event_publisher import ClusterEventPublisher -from slurm_plugin.common import is_clustermgtd_heartbeat_valid, print_with_count +from slurm_plugin.common import ScalingStrategy, is_clustermgtd_heartbeat_valid, print_with_count from slurm_plugin.instance_manager import InstanceManagerFactory from slurm_plugin.slurm_resources import CONFIG_FILE_DIR @@ -45,8 +45,8 @@ class SlurmResumeConfig: "run_instances_overrides": "/opt/slurm/etc/pcluster/run_instances_overrides.json", "create_fleet_overrides": "/opt/slurm/etc/pcluster/create_fleet_overrides.json", "fleet_config_file": "/etc/parallelcluster/slurm_plugin/fleet-config.json", - "all_or_nothing_batch": True, "job_level_scaling": True, + "scaling_strategy": "all-or-nothing", } def __init__(self, config_file_path): @@ -92,6 +92,9 @@ def _get_config(self, config_file_path): self.all_or_nothing_batch = config.getboolean( "slurm_resume", "all_or_nothing_batch", fallback=self.DEFAULTS.get("all_or_nothing_batch") ) + self.scaling_strategy = config.get( + "slurm_resume", "scaling_strategy", fallback=self.DEFAULTS.get("scaling_strategy") + ) # TODO: Check if it's a valid scaling strategy before calling expensive downstream APIs self.job_level_scaling = config.getboolean( "slurm_resume", "job_level_scaling", fallback=self.DEFAULTS.get("job_level_scaling") ) @@ -213,7 +216,7 @@ def _resume(arg_nodes, resume_config, slurm_resume): assign_node_batch_size=resume_config.assign_node_max_batch_size, terminate_batch_size=resume_config.terminate_max_batch_size, update_node_address=resume_config.update_node_address, - all_or_nothing_batch=resume_config.all_or_nothing_batch, + scaling_strategy=ScalingStrategy(resume_config.scaling_strategy), ) failed_nodes = set().union(*instance_manager.failed_nodes.values()) success_nodes = [node for node in node_list if node not in failed_nodes] diff --git a/tests/slurm_plugin/test_common.py b/tests/slurm_plugin/test_common.py index 5dd1ea854..1599e17dc 100644 --- a/tests/slurm_plugin/test_common.py +++ b/tests/slurm_plugin/test_common.py @@ -14,7 +14,7 @@ import pytest from assertpy import assert_that from common.utils import read_json, time_is_up -from slurm_plugin.common import TIMESTAMP_FORMAT, get_clustermgtd_heartbeat +from slurm_plugin.common import TIMESTAMP_FORMAT, ScalingStrategy, get_clustermgtd_heartbeat @pytest.mark.parametrize( @@ -106,3 +106,17 @@ def test_read_json(test_datadir, caplog, json_file, default_value, raises_except assert_that(caplog.text).matches(message_in_log) else: assert_that(caplog.text).does_not_match("exception") + + +@pytest.mark.parametrize( + "strategy_as_value, expected_strategy_enum", + [ + ("best-effort", ScalingStrategy.BEST_EFFORT), + ("all-or-nothing", ScalingStrategy.ALL_OR_NOTHING), + ("", ScalingStrategy.ALL_OR_NOTHING), + ("invalid-strategy", ScalingStrategy.ALL_OR_NOTHING), + ], +) +def test_scaling_strategies_enum_from_value(strategy_as_value, expected_strategy_enum): + strategy_enum = ScalingStrategy(strategy_as_value) + assert_that(strategy_enum).is_equal_to(expected_strategy_enum) diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index c24a84b7e..213510747 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -22,6 +22,7 @@ import pytest import slurm_plugin from assertpy import assert_that +from slurm_plugin.common import ScalingStrategy from slurm_plugin.fleet_manager import EC2Instance from slurm_plugin.instance_manager import ( HostnameDnsStoreError, @@ -1265,7 +1266,7 @@ def test_instance_retrieval_partitioning(self, node_count, max_retrieval_count, "assign_node_batch_size", "terminate_batch_size", "update_node_address", - "all_or_nothing_batch", + "scaling_strategy", ), [ ( @@ -1284,7 +1285,7 @@ def test_instance_retrieval_partitioning(self, node_count, max_retrieval_count, 30, 40, True, - False, + ScalingStrategy.BEST_EFFORT, ), ( { @@ -1302,7 +1303,7 @@ def test_instance_retrieval_partitioning(self, node_count, max_retrieval_count, 20, 30, True, - False, + ScalingStrategy.BEST_EFFORT, ), ( {}, @@ -1311,7 +1312,7 @@ def test_instance_retrieval_partitioning(self, node_count, max_retrieval_count, 40, 20, True, - False, + ScalingStrategy.BEST_EFFORT, ), ( {}, @@ -1320,7 +1321,7 @@ def test_instance_retrieval_partitioning(self, node_count, max_retrieval_count, 40, 20, True, - False, + ScalingStrategy.BEST_EFFORT, ), ], ) @@ -1332,7 +1333,7 @@ def test_add_instances( assign_node_batch_size, terminate_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, instance_manager, mocker, ): @@ -1346,7 +1347,7 @@ def test_add_instances( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) assert_that(instance_manager.failed_nodes).is_empty() @@ -1359,7 +1360,7 @@ def test_add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) else: instance_manager._add_instances_for_nodes.assert_called_once_with( @@ -1367,7 +1368,7 @@ def test_add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) @@ -1395,7 +1396,7 @@ def instance_manager(self, mocker): @pytest.mark.parametrize( ( - "node_list, launch_batch_size, update_node_address, all_or_nothing, slurm_resume, " + "node_list, launch_batch_size, update_node_address, scaling_strategy, slurm_resume, " "assign_node_batch_size, terminate_batch_size" ), [ @@ -1403,7 +1404,7 @@ def instance_manager(self, mocker): ["queue1-st-c5xlarge-2", "queue2-dy-c5xlarge-10"], 10, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {}, 30, 40, @@ -1412,7 +1413,7 @@ def instance_manager(self, mocker): ["queue1-st-c5xlarge-2", "queue2-dy-c5xlarge-10"], 40, True, - False, + ScalingStrategy.BEST_EFFORT, { "jobs": [ { @@ -1444,7 +1445,7 @@ def test_add_instances( node_list, launch_batch_size, update_node_address, - all_or_nothing, + scaling_strategy, slurm_resume, assign_node_batch_size, terminate_batch_size, @@ -1458,7 +1459,7 @@ def test_add_instances( node_list=node_list, launch_batch_size=launch_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing, + scaling_strategy=scaling_strategy, slurm_resume=slurm_resume, assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, @@ -1474,7 +1475,7 @@ def test_add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing, + scaling_strategy=scaling_strategy, ) assert_that(caplog.text).contains( "Not possible to perform job level scaling " "because Slurm resume file content is empty." @@ -1487,7 +1488,7 @@ def test_add_instances( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing, + scaling_strategy=scaling_strategy, ) @pytest.mark.parametrize( @@ -1497,7 +1498,7 @@ def test_add_instances( "launch_batch_size", "assign_node_batch_size", "update_node_address", - "all_or_nothing_batch", + "scaling_strategy", "expected_jobs_multi_node_oversubscribe", "expected_multi_node_oversubscribe", "expected_jobs_single_node_oversubscribe", @@ -1558,7 +1559,7 @@ def test_add_instances( 10, 30, True, - False, + ScalingStrategy.BEST_EFFORT, [ SlurmResumeJob( job_id=140814, @@ -1624,7 +1625,7 @@ def test_add_instances( 5, 25, False, - False, + ScalingStrategy.BEST_EFFORT, [ SlurmResumeJob( job_id=140814, @@ -1659,7 +1660,7 @@ def test_add_instances( 8, 28, True, - False, + ScalingStrategy.BEST_EFFORT, [], [], [], @@ -1707,7 +1708,7 @@ def test_add_instances( 8, 28, True, - False, + ScalingStrategy.BEST_EFFORT, [], [], [ @@ -1745,7 +1746,7 @@ def test_add_instances_for_resume_file( launch_batch_size, assign_node_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, expected_jobs_multi_node_oversubscribe, expected_multi_node_oversubscribe, expected_jobs_single_node_oversubscribe, @@ -1766,7 +1767,7 @@ def test_add_instances_for_resume_file( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) instance_manager._scaling_for_jobs_single_node.assert_any_call( @@ -1774,7 +1775,7 @@ def test_add_instances_for_resume_file( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) instance_manager._scaling_for_jobs_multi_node.assert_any_call( job_list=expected_jobs_multi_node_no_oversubscribe + expected_jobs_multi_node_oversubscribe, @@ -1782,7 +1783,7 @@ def test_add_instances_for_resume_file( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) assert_that(instance_manager.unused_launched_instances).is_empty() assert_that(instance_manager._scaling_for_jobs_single_node.call_count).is_equal_to(1) @@ -2655,7 +2656,7 @@ def test_update_slurm_node_addrs( assert_that(instance_manager.failed_nodes).is_empty() @pytest.mark.parametrize( - "job, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch, " + "job, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy, " "expected_nodes_to_launch, mock_instances_launched, initial_unused_launched_instances, " "expected_unused_launched_instances, expect_assign_instances_to_nodes_called, " "expect_assign_instances_to_nodes_failure, expected_failed_nodes", @@ -2665,7 +2666,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - False, + ScalingStrategy.BEST_EFFORT, {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, {}, @@ -2679,7 +2680,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, {}, @@ -2693,7 +2694,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, { "queue4": { @@ -2715,7 +2716,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, { "queue4": { @@ -2742,7 +2743,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, { "queue4": { @@ -2777,7 +2778,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, {}, @@ -2796,7 +2797,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}}, {}, {}, @@ -2815,7 +2816,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}}, { "queue1": { @@ -2842,7 +2843,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, { "queue4": { @@ -2892,7 +2893,7 @@ def test_update_slurm_node_addrs( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-1"]}, "queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, { "queue4": { @@ -2938,7 +2939,7 @@ def test_add_instances_for_nodes( launch_batch_size, assign_node_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, expected_nodes_to_launch, mock_instances_launched, initial_unused_launched_instances, @@ -2960,14 +2961,14 @@ def test_add_instances_for_nodes( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) instance_manager._launch_instances.assert_called_once_with( job=job, nodes_to_launch=expected_nodes_to_launch, launch_batch_size=launch_batch_size, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) @@ -3285,7 +3286,7 @@ def test_launch_instances( job=job, nodes_to_launch=nodes_to_launch, launch_batch_size=launch_batch_size, - all_or_nothing_batch=True, + scaling_strategy=ScalingStrategy.ALL_OR_NOTHING, ) assert_that(instances_launched).is_equal_to(expected_instances_launched) @@ -3293,7 +3294,7 @@ def test_launch_instances( @pytest.mark.parametrize( "job_list, launch_batch_size, assign_node_batch_size, update_node_address, " - "expected_single_nodes_no_oversubscribe, all_or_nothing_batch", + "expected_single_nodes_no_oversubscribe, scaling_strategy", [ ( [], @@ -3301,7 +3302,7 @@ def test_launch_instances( 2, True, [], - True, + ScalingStrategy.ALL_OR_NOTHING, ), ( [ @@ -3311,7 +3312,7 @@ def test_launch_instances( 2, True, [], - False, + ScalingStrategy.BEST_EFFORT, ), ( [ @@ -3322,7 +3323,7 @@ def test_launch_instances( 2, True, ["queue4-st-c5xlarge-1", "queue4-st-c5xlarge-2"], - False, + ScalingStrategy.BEST_EFFORT, ), ( [ @@ -3333,7 +3334,7 @@ def test_launch_instances( 2, True, ["queue4-st-c5xlarge-1", "queue4-st-c5xlarge-2"], - True, + ScalingStrategy.ALL_OR_NOTHING, ), ], ) @@ -3346,7 +3347,7 @@ def test_scaling_for_jobs_single_node( assign_node_batch_size, update_node_address, expected_single_nodes_no_oversubscribe, - all_or_nothing_batch, + scaling_strategy, ): # patch internal functions instance_manager._scaling_for_jobs = mocker.MagicMock() @@ -3357,7 +3358,7 @@ def test_scaling_for_jobs_single_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) if not job_list: instance_manager._scaling_for_jobs.assert_not_called() @@ -3368,7 +3369,7 @@ def test_scaling_for_jobs_single_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) instance_manager._add_instances_for_nodes.assert_not_called() if len(job_list) > 1: @@ -3378,11 +3379,11 @@ def test_scaling_for_jobs_single_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=False, + scaling_strategy=ScalingStrategy.BEST_EFFORT, ) @pytest.mark.parametrize( - "job_list, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch", + "job_list, launch_batch_size, assign_node_batch_size, update_node_address, scaling_strategy", [ ([], 1, 2, True, False), ( @@ -3397,7 +3398,7 @@ def test_scaling_for_jobs_single_node( 3, 2, True, - True, + ScalingStrategy.ALL_OR_NOTHING, ), ( [ @@ -3417,7 +3418,7 @@ def test_scaling_for_jobs_single_node( 2, 1, False, - True, + ScalingStrategy.ALL_OR_NOTHING, ), ], ) @@ -3429,7 +3430,7 @@ def test_scaling_for_jobs( launch_batch_size, assign_node_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, ): # patch internal functions instance_manager._terminate_unassigned_launched_instances = mocker.MagicMock() @@ -3443,7 +3444,7 @@ def test_scaling_for_jobs( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) if not job_list: @@ -3456,7 +3457,7 @@ def test_scaling_for_jobs( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) setup_logging_filter.return_value.__enter__.return_value.set_custom_value.assert_any_call(job.job_id) assert_that( @@ -4099,7 +4100,7 @@ def test_all_or_nothing_node_assignment( "launch_batch_size, " "assign_node_batch_size, " "update_node_address, " - "all_or_nothing_batch, " + "scaling_strategy, " "unused_launched_instances, " "mock_launch_instances, " "expected_unused_launched_instances", @@ -4110,7 +4111,7 @@ def test_all_or_nothing_node_assignment( 1, 2, False, - False, + ScalingStrategy.BEST_EFFORT, {}, {}, {}, @@ -4121,7 +4122,7 @@ def test_all_or_nothing_node_assignment( 1, 2, True, - False, + ScalingStrategy.BEST_EFFORT, { "q1": { "c1": [ @@ -4148,7 +4149,7 @@ def test_all_or_nothing_node_assignment( 1, 2, False, - True, + ScalingStrategy.ALL_OR_NOTHING, {}, { "q1": { @@ -4175,7 +4176,7 @@ def test_all_or_nothing_node_assignment( 1, 2, True, - True, + ScalingStrategy.ALL_OR_NOTHING, { "q1": { "c1": [ @@ -4220,7 +4221,7 @@ def test_all_or_nothing_node_assignment( 3, 2, True, - True, + ScalingStrategy.ALL_OR_NOTHING, { "q1": { "c1": [ @@ -4267,7 +4268,7 @@ def test_scaling_for_jobs_multi_node( launch_batch_size, assign_node_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, unused_launched_instances, mock_launch_instances, expected_unused_launched_instances, @@ -4283,7 +4284,7 @@ def test_scaling_for_jobs_multi_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) instance_manager._scaling_for_jobs.assert_called_once_with( @@ -4291,7 +4292,7 @@ def test_scaling_for_jobs_multi_node( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) @@ -4368,18 +4369,18 @@ def instance_manager(self, mocker): return instance_manager @pytest.mark.parametrize( - "node_list, launch_batch_size, update_node_address, all_or_nothing", + "node_list, launch_batch_size, update_node_address, scaling_strategy", [ ( ["queue1-st-c5xlarge-2", "queue2-dy-c5xlarge-10"], 10, False, - True, + ScalingStrategy.ALL_OR_NOTHING, ) ], ) def test_add_instances( - self, instance_manager, mocker, node_list, launch_batch_size, update_node_address, all_or_nothing + self, instance_manager, mocker, node_list, launch_batch_size, update_node_address, scaling_strategy ): # patch internal functions instance_manager._add_instances_for_nodes = mocker.MagicMock() @@ -4388,7 +4389,7 @@ def test_add_instances( node_list=node_list, launch_batch_size=launch_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing, + scaling_strategy=scaling_strategy, ) assert_that(instance_manager.failed_nodes).is_empty() @@ -4396,7 +4397,7 @@ def test_add_instances( node_list=node_list, launch_batch_size=launch_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing, + scaling_strategy=scaling_strategy, ) @pytest.mark.parametrize( @@ -4404,7 +4405,7 @@ def test_add_instances( "nodes_to_launch", "launch_batch_size", "update_node_address", - "all_or_nothing_batch", + "scaling_strategy", "launched_instances", "expected_failed_nodes", "expected_update_nodes_calls", @@ -4422,7 +4423,7 @@ def test_add_instances( }, 10, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -4536,7 +4537,7 @@ def test_add_instances( }, 10, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -4621,7 +4622,7 @@ def test_add_instances( {"queue1": {"c5xlarge": ["queue1-st-c5xlarge-2"]}}, 10, False, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -4665,7 +4666,7 @@ def test_add_instances( }, 3, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -4719,7 +4720,7 @@ def test_add_instances( }, 1, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -4814,7 +4815,7 @@ def test_add_instances( }, 10, True, - False, + ScalingStrategy.BEST_EFFORT, # Simulate the case that only a part of the requested capacity is launched [ { @@ -4863,7 +4864,7 @@ def test_add_instances( }, 3, True, - True, + ScalingStrategy.ALL_OR_NOTHING, [ { "Instances": [ @@ -4950,7 +4951,7 @@ def test_add_instances( }, 10, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -5088,7 +5089,7 @@ def test_add_instances( }, 18, True, - False, + ScalingStrategy.BEST_EFFORT, [Exception()], { "Exception": { @@ -5112,7 +5113,7 @@ def test_add_instances( }, 18, True, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -5203,7 +5204,7 @@ def test_add_instances( "batch_size1", "batch_size2", "partial_launch", - "all_or_nothing", + "scaling_strategy", "override_runinstances", "launch_exception", "dns_or_table_exception", @@ -5214,7 +5215,7 @@ def test_add_instances_for_nodes( nodes_to_launch, launch_batch_size, update_node_address, - all_or_nothing_batch, + scaling_strategy, launched_instances, expected_failed_nodes, expected_update_nodes_calls, @@ -5244,7 +5245,7 @@ def test_add_instances_for_nodes( node_list=["placeholder_node_list"], launch_batch_size=launch_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, ) if expected_update_nodes_calls: instance_manager._update_slurm_node_addrs_and_failed_nodes.assert_has_calls(expected_update_nodes_calls) diff --git a/tests/slurm_plugin/test_resume.py b/tests/slurm_plugin/test_resume.py index 4004fba71..d2e9007ba 100644 --- a/tests/slurm_plugin/test_resume.py +++ b/tests/slurm_plugin/test_resume.py @@ -23,6 +23,7 @@ from slurm_plugin.fleet_manager import EC2Instance from slurm_plugin.resume import SlurmResumeConfig, _get_slurm_resume, _handle_failed_nodes, _resume +from src.slurm_plugin.common import ScalingStrategy from tests.common import FLEET_CONFIG, LAUNCH_OVERRIDES, client_error @@ -50,7 +51,7 @@ def boto3_stubber_path(): "logging_config": os.path.join( os.path.dirname(slurm_plugin.__file__), "logging", "parallelcluster_resume_logging.conf" ), - "all_or_nothing_batch": True, + "scaling_strategy": "all-or-nothing", "clustermgtd_timeout": 300, "clustermgtd_heartbeat_file_path": "/home/ec2-user/clustermgtd_heartbeat", "job_level_scaling": True, @@ -70,7 +71,7 @@ def boto3_stubber_path(): "proxies": {"https": "my.resume.proxy"}, }, "logging_config": "/path/to/resume_logging/config", - "all_or_nothing_batch": True, + "scaling_strategy": "all-or-nothing", "clustermgtd_timeout": 5, "clustermgtd_heartbeat_file_path": "alternate/clustermgtd_heartbeat", "job_level_scaling": False, @@ -91,7 +92,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): ( "mock_node_lists", "batch_size", - "all_or_nothing_batch", + "scaling_strategy", "launched_instances", "expected_failed_nodes", "expected_update_node_calls", @@ -109,7 +110,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - True, + ScalingStrategy.ALL_OR_NOTHING, [ { "Instances": [ @@ -195,7 +196,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - True, + ScalingStrategy.ALL_OR_NOTHING, [ { "Instances": [ @@ -281,7 +282,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -330,7 +331,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -387,7 +388,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - True, + ScalingStrategy.ALL_OR_NOTHING, [ { "Instances": [ @@ -458,7 +459,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - True, + ScalingStrategy.ALL_OR_NOTHING, [ { "Instances": [ @@ -560,7 +561,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -609,7 +610,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -655,7 +656,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): SimpleNamespace(name="queue1-st-c5xlarge-2", state_string="ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP"), ], 3, - False, + ScalingStrategy.BEST_EFFORT, [ { "Instances": [ @@ -765,7 +766,7 @@ def test_resume_config(config_file, expected_attributes, test_datadir, mocker): def test_resume_launch( mock_node_lists, batch_size, - all_or_nothing_batch, + scaling_strategy, launched_instances, expected_failed_nodes, expected_update_node_calls, @@ -779,7 +780,7 @@ def test_resume_launch( mock_resume_config = SimpleNamespace( launch_max_batch_size=batch_size, update_node_address=True, - all_or_nothing_batch=all_or_nothing_batch, + scaling_strategy=scaling_strategy, dynamodb_table="some_table", region="us-east-2", cluster_name="hit",