Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[develop] Job Level Scaling for Node Sharing #564

Merged
merged 6 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def get_manager(
run_instances_overrides: dict = None,
create_fleet_overrides: dict = None,
job_level_scaling: bool = False,
temp_jls_for_node_sharing: bool = False,
):
if job_level_scaling:
return JobLevelScalingInstanceManager(
Expand All @@ -95,7 +94,6 @@ def get_manager(
fleet_config=fleet_config,
run_instances_overrides=run_instances_overrides,
create_fleet_overrides=create_fleet_overrides,
temp_jls_for_node_sharing=temp_jls_for_node_sharing,
)
else:
return NodeListScalingInstanceManager(
Expand Down Expand Up @@ -493,7 +491,6 @@ def __init__(
fleet_config: Dict[str, any] = None,
run_instances_overrides: dict = None,
create_fleet_overrides: dict = None,
temp_jls_for_node_sharing: bool = False,
):
super().__init__(
region=region,
Expand All @@ -510,7 +507,6 @@ def __init__(
create_fleet_overrides=create_fleet_overrides,
)
self.unused_launched_instances = {}
self.temp_jls_for_node_sharing = temp_jls_for_node_sharing

def _clear_unused_launched_instances(self):
"""Clear and reset unused launched instances list."""
Expand Down Expand Up @@ -634,12 +630,14 @@ def _scaling_for_jobs_single_node(
all_or_nothing_batch=all_or_nothing_batch,
)
else:
# Batch all single node no oversubscribe jobs in a single best-effort EC2 launch request
# Batch all single node jobs in a single best-effort EC2 launch request
# This to reduce scaling time and save launch API calls
single_nodes_no_oversubscribe = [job.nodes_resume[0] for job in job_list]
# Remove duplicated node entries (possible in oversubscribe case)
single_nodes = list(dict.fromkeys([job.nodes_resume[0] for job in job_list]))
self._add_instances_for_nodes(
Comment on lines +636 to 637
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use list(dict.fromkeys(...)) this instead of list(set(...))? We're expecting only a single node, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to preserve the order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have only a single node. dict.fromkeys is fine, I just wanted to understand if there was another reason other than the order.

node_list=single_nodes_no_oversubscribe,
node_list=single_nodes,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=False,
)
Expand All @@ -660,36 +658,24 @@ def _add_instances_for_resume_file(
self._clear_unused_launched_instances()

self._scaling_for_jobs_single_node(
job_list=slurm_resume_data.jobs_single_node_no_oversubscribe,
job_list=slurm_resume_data.jobs_single_node_no_oversubscribe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this stage, can SlurmResumeData contain a property jobs_single_node that already has combines both "oversubscribe" and "no oversubscribe".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely, I think we can drop the distinction between "oversubscribe" and "no oversubscribe", now that we are able to manage both types. I'm considering this for next PR

+ slurm_resume_data.jobs_single_node_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)

self._scaling_for_jobs_multi_node(
job_list=slurm_resume_data.jobs_multi_node_no_oversubscribe,
node_list=slurm_resume_data.multi_node_no_oversubscribe,
job_list=slurm_resume_data.jobs_multi_node_no_oversubscribe
+ slurm_resume_data.jobs_multi_node_oversubscribe,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, having a jobs_multi_node property in SlurmResumeData.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, see other comment

node_list=slurm_resume_data.multi_node_no_oversubscribe + slurm_resume_data.multi_node_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)

if not self.temp_jls_for_node_sharing:
# node scaling for oversubscribe nodes
node_list = list(
dict.fromkeys(slurm_resume_data.single_node_oversubscribe + slurm_resume_data.multi_node_oversubscribe)
)
if node_list:
self._add_instances_for_nodes(
node_list=node_list,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)

def _scaling_for_jobs_multi_node(
self,
job_list,
Expand Down Expand Up @@ -907,6 +893,12 @@ def _add_instances_for_nodes(
update_node_address=update_node_address,
)

def _reset_failed_nodes(self, nodeset):
"""Remove nodeset from failed nodes dict."""
if nodeset:
for error_code in self.failed_nodes:
self.failed_nodes[error_code] = self.failed_nodes.get(error_code, set()).difference(nodeset)

def best_effort_node_assignment(
self,
assign_node_batch_size,
Expand Down Expand Up @@ -935,6 +927,7 @@ def best_effort_node_assignment(
print_with_count(successful_launched_nodes),
)
self._update_dict(self.nodes_assigned_to_instances, nodes_resume_mapping)
self._reset_failed_nodes(set(successful_launched_nodes))
if len(successful_launched_nodes) < len(nodes_resume_list):
# set limited capacity on the failed to launch nodes
self._update_failed_nodes(set(failed_launch_nodes), "LimitedInstanceCapacity", override=False)
Expand Down Expand Up @@ -968,6 +961,7 @@ def all_or_nothing_node_assignment(
print_with_count(nodes_resume_list),
)
self._update_dict(self.nodes_assigned_to_instances, nodes_resume_mapping)
self._reset_failed_nodes(set(nodes_resume_list))
Comment on lines 963 to +964
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: It seems that both best-effort and a successful all-or-nothing handle the successfully launched nodes roughly the same way. Maybe we can have shared behaviour for both cases?

def handle_successfully_launched_nodes:
    - Update the node mapping dictionary
    - Reset the failed nodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really, they are different. Let's sync on this

except InstanceToNodeAssignmentError:
# Failed to assign EC2 instances to nodes
# EC2 Instances already assigned, are going to be terminated by
Expand Down
30 changes: 13 additions & 17 deletions src/slurm_plugin/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class SlurmResumeConfig:
"fleet_config_file": "/etc/parallelcluster/slurm_plugin/fleet-config.json",
"all_or_nothing_batch": True,
"job_level_scaling": True,
"temp_jls_for_node_sharing": False,
}

def __init__(self, config_file_path):
Expand Down Expand Up @@ -96,9 +95,6 @@ def _get_config(self, config_file_path):
self.job_level_scaling = config.getboolean(
"slurm_resume", "job_level_scaling", fallback=self.DEFAULTS.get("job_level_scaling")
)
self.temp_jls_for_node_sharing = config.getboolean(
"slurm_resume", "temp_jls_for_node_sharing", fallback=self.DEFAULTS.get("temp_jls_for_node_sharing")
)
fleet_config_file = config.get(
"slurm_resume", "fleet_config_file", fallback=self.DEFAULTS.get("fleet_config_file")
)
Expand Down Expand Up @@ -157,18 +153,19 @@ def _handle_failed_nodes(node_list, reason="Failure when resuming nodes"):
To save time, should explicitly set nodes to DOWN in ResumeProgram so clustermgtd can maintain failed nodes.
Clustermgtd will be responsible for running full DOWN -> POWER_DOWN process.
"""
try:
log.info(
"Setting following failed nodes into DOWN state %s with reason: %s", print_with_count(node_list), reason
)
set_nodes_down(node_list, reason=reason)
except Exception as e:
log.error(
"Failed to place nodes %s into DOWN for reason %s with exception: %s",
print_with_count(node_list),
reason,
e,
)
if node_list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, which code path is resulting in us calling the _handle_failed_nodes with an empty nodelist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we have reset the node failures with _reset_failed_nodes, it could happen that the error key is still there but there are no more nodes associated to that error

try:
log.info(
"Setting following failed nodes into DOWN state %s with reason: %s", print_with_count(node_list), reason
)
set_nodes_down(node_list, reason=reason)
except Exception as e:
log.error(
"Failed to place nodes %s into DOWN for reason %s with exception: %s",
print_with_count(node_list),
reason,
e,
)


def _resume(arg_nodes, resume_config, slurm_resume):
Expand Down Expand Up @@ -208,7 +205,6 @@ def _resume(arg_nodes, resume_config, slurm_resume):
run_instances_overrides=resume_config.run_instances_overrides,
create_fleet_overrides=resume_config.create_fleet_overrides,
job_level_scaling=resume_config.job_level_scaling,
temp_jls_for_node_sharing=resume_config.temp_jls_for_node_sharing,
)
instance_manager.add_instances(
slurm_resume=slurm_resume,
Expand Down
109 changes: 92 additions & 17 deletions tests/slurm_plugin/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,9 @@ def test_add_instances(
"assign_node_batch_size",
"update_node_address",
"all_or_nothing_batch",
"expected_nodes_oversubscribe",
"expected_jobs_multi_node_oversubscribe",
"expected_multi_node_oversubscribe",
"expected_jobs_single_node_oversubscribe",
"expected_jobs_multi_node_no_oversubscribe",
"expected_multi_node_no_oversubscribe",
"expected_jobs_single_node_no_oversubscribe",
Expand Down Expand Up @@ -1557,12 +1559,27 @@ def test_add_instances(
30,
True,
False,
[
SlurmResumeJob(
job_id=140814,
nodes_alloc="queue1-st-c5xlarge-[1-4]",
nodes_resume="queue1-st-c5xlarge-[1-3]",
oversubscribe="YES",
),
SlurmResumeJob(
job_id=140818,
nodes_alloc="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11",
nodes_resume="queue1-st-c5xlarge-[1-3], queue4-st-c5xlarge-11",
oversubscribe="OK",
),
],
[
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
"queue1-st-c5xlarge-3",
"queue4-st-c5xlarge-11",
],
[],
[
SlurmResumeJob(
job_id=140815,
Expand Down Expand Up @@ -1608,6 +1625,14 @@ def test_add_instances(
25,
False,
False,
[
SlurmResumeJob(
job_id=140814,
nodes_alloc="queue1-st-c5xlarge-[1-4]",
nodes_resume="queue1-st-c5xlarge-[1-3]",
oversubscribe="FORCE",
),
],
[
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
Expand All @@ -1616,6 +1641,7 @@ def test_add_instances(
[],
[],
[],
[],
),
(
{
Expand All @@ -1637,6 +1663,8 @@ def test_add_instances(
[],
[],
[],
[],
[],
[
SlurmResumeJob(
job_id=140814,
Expand Down Expand Up @@ -1672,7 +1700,6 @@ def test_add_instances(
},
[
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
"queue2-st-c5xlarge-1",
"queue2-st-c5xlarge-2",
"queue3-st-c5xlarge-1",
Expand All @@ -1681,8 +1708,15 @@ def test_add_instances(
28,
True,
False,
[],
[],
[
"queue3-st-c5xlarge-1",
SlurmResumeJob(
job_id=140816,
nodes_alloc="queue3-st-c5xlarge-1",
nodes_resume="queue3-st-c5xlarge-1",
oversubscribe="YES",
),
],
[
SlurmResumeJob(
Expand Down Expand Up @@ -1712,7 +1746,9 @@ def test_add_instances_for_resume_file(
assign_node_batch_size,
update_node_address,
all_or_nothing_batch,
expected_nodes_oversubscribe,
expected_jobs_multi_node_oversubscribe,
expected_multi_node_oversubscribe,
expected_jobs_single_node_oversubscribe,
expected_jobs_multi_node_no_oversubscribe,
expected_multi_node_no_oversubscribe,
expected_jobs_single_node_no_oversubscribe,
Expand All @@ -1734,33 +1770,23 @@ def test_add_instances_for_resume_file(
)

instance_manager._scaling_for_jobs_single_node.assert_any_call(
job_list=expected_jobs_single_node_no_oversubscribe,
job_list=expected_jobs_single_node_no_oversubscribe + expected_jobs_single_node_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)
instance_manager._scaling_for_jobs_multi_node.assert_any_call(
job_list=expected_jobs_multi_node_no_oversubscribe,
node_list=expected_multi_node_no_oversubscribe,
job_list=expected_jobs_multi_node_no_oversubscribe + expected_jobs_multi_node_oversubscribe,
node_list=expected_multi_node_no_oversubscribe + expected_multi_node_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)
if expected_nodes_oversubscribe:
instance_manager._add_instances_for_nodes.assert_any_call(
node_list=expected_nodes_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=all_or_nothing_batch,
)
assert_that(instance_manager.unused_launched_instances).is_empty()
assert_that(instance_manager._scaling_for_jobs_single_node.call_count).is_equal_to(1)
assert_that(instance_manager._scaling_for_jobs_multi_node.call_count).is_equal_to(1)
if expected_nodes_oversubscribe:
assert_that(instance_manager._add_instances_for_nodes.call_count).is_equal_to(1)

@pytest.mark.parametrize(
"slurm_resume, node_list, expected_single_node_oversubscribe, expected_multi_node_oversubscribe, "
Expand Down Expand Up @@ -3350,6 +3376,7 @@ def test_scaling_for_jobs_single_node(
instance_manager._add_instances_for_nodes.assert_called_once_with(
node_list=expected_single_nodes_no_oversubscribe,
launch_batch_size=launch_batch_size,
assign_node_batch_size=assign_node_batch_size,
update_node_address=update_node_address,
all_or_nothing_batch=False,
)
Expand Down Expand Up @@ -4269,6 +4296,54 @@ def test_scaling_for_jobs_multi_node(

assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances)

@pytest.mark.parametrize(
"nodeset, mock_failed_nodes, expected_failed_nodes",
[
(
{},
{},
{},
),
(
{},
{
"Exception": {"queue2-dy-c5xlarge-1", "queue1-st-c5xlarge-2", "queue2-st-c5xlarge-1"},
"some_error_code": {"queue1-st-c52xlarge-1"},
},
{
"Exception": {"queue2-dy-c5xlarge-1", "queue1-st-c5xlarge-2", "queue2-st-c5xlarge-1"},
"some_error_code": {"queue1-st-c52xlarge-1"},
},
),
(
{"queue1-st-c5xlarge-2"},
{
"Exception": {"queue2-dy-c5xlarge-1", "queue1-st-c5xlarge-2", "queue2-st-c5xlarge-1"},
"some_error_code": {"queue1-st-c52xlarge-1"},
},
{
"Exception": {"queue2-dy-c5xlarge-1", "queue2-st-c5xlarge-1"},
"some_error_code": {"queue1-st-c52xlarge-1"},
},
),
(
{"queue2-dy-c5xlarge-1"},
{
"Exception": {"queue2-dy-c5xlarge-1", "queue1-st-c5xlarge-2", "queue2-st-c5xlarge-1"},
"some_error_code": {"queue2-dy-c5xlarge-1"},
},
{
"Exception": {"queue1-st-c5xlarge-2", "queue2-st-c5xlarge-1"},
"some_error_code": set(),
},
),
],
)
def test_reset_failed_nodes(self, instance_manager, nodeset, mock_failed_nodes, expected_failed_nodes):
instance_manager.failed_nodes = mock_failed_nodes
instance_manager._reset_failed_nodes(nodeset)
assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes)


class TestNodeListScalingInstanceManager:
@pytest.fixture
Expand Down
Loading