From 1e6c69cef7a9f046084a0daab89cd58d43a6e550 Mon Sep 17 00:00:00 2001 From: Christine M Simpson Date: Tue, 4 Apr 2023 16:27:59 -0700 Subject: [PATCH 01/54] updates to app_run --- balsam/platform/app_run/perlmutter.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/balsam/platform/app_run/perlmutter.py b/balsam/platform/app_run/perlmutter.py index 82926395..12449065 100644 --- a/balsam/platform/app_run/perlmutter.py +++ b/balsam/platform/app_run/perlmutter.py @@ -8,6 +8,11 @@ class PerlmutterRun(SubprocessAppRun): https://slurm.schedmd.com/srun.html """ + def _get_cpus_per_task(self) -> int: + cpu_per_rank = 64 // self._ranks_per_node + cpu_per_task = cpu_per_rank*2 + return cpu_per_task + def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] num_nodes = str(len(node_ids)) @@ -22,6 +27,11 @@ def _build_cmdline(self) -> str: else: gpu_args = [] + launch_params = [] + for k in self._launch_params.keys(): + launch_params.append(k) + launch_params.append(str(self._launch_params[k])) + args = [ "srun", *network_args, @@ -35,8 +45,8 @@ def _build_cmdline(self) -> str: "--nodes", num_nodes, "--cpus-per-task", - self.get_cpus_per_rank(), - "--mem=40G", + self._get_cpus_per_task(), + *launch_params, "--overlap", self._cmdline, ] From ba4a23306c677e9c982e3a38b04abe44e8370ca6 Mon Sep 17 00:00:00 2001 From: Christine M Simpson Date: Tue, 4 Apr 2023 18:10:28 -0700 Subject: [PATCH 02/54] updated app_run --- balsam/platform/app_run/perlmutter.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/balsam/platform/app_run/perlmutter.py b/balsam/platform/app_run/perlmutter.py index 12449065..26d125a8 100644 --- a/balsam/platform/app_run/perlmutter.py +++ b/balsam/platform/app_run/perlmutter.py @@ -8,11 +8,6 @@ class PerlmutterRun(SubprocessAppRun): https://slurm.schedmd.com/srun.html """ - def _get_cpus_per_task(self) -> int: - cpu_per_rank = 64 // self._ranks_per_node - cpu_per_task = cpu_per_rank*2 - return cpu_per_task - def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] num_nodes = str(len(node_ids)) @@ -29,7 +24,7 @@ def _build_cmdline(self) -> str: launch_params = [] for k in self._launch_params.keys(): - launch_params.append(k) + launch_params.append("--"+k) launch_params.append(str(self._launch_params[k])) args = [ @@ -45,7 +40,7 @@ def _build_cmdline(self) -> str: "--nodes", num_nodes, "--cpus-per-task", - self._get_cpus_per_task(), + self._threads_per_rank, *launch_params, "--overlap", self._cmdline, From 42ae6efc51337d1823f896056a5c77ae27b6754c Mon Sep 17 00:00:00 2001 From: Christine M Simpson Date: Tue, 4 Apr 2023 18:18:10 -0700 Subject: [PATCH 03/54] removed white space --- balsam/platform/app_run/perlmutter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/perlmutter.py b/balsam/platform/app_run/perlmutter.py index 26d125a8..af5ec8b3 100644 --- a/balsam/platform/app_run/perlmutter.py +++ b/balsam/platform/app_run/perlmutter.py @@ -26,7 +26,7 @@ def _build_cmdline(self) -> str: for k in self._launch_params.keys(): launch_params.append("--"+k) launch_params.append(str(self._launch_params[k])) - + args = [ "srun", *network_args, From 0e25cd449114ee8bc3fed26118386158eb22eb7f Mon Sep 17 00:00:00 2001 From: Christine M Simpson Date: Thu, 6 Apr 2023 11:36:45 -0700 Subject: [PATCH 04/54] added spaces --- balsam/platform/app_run/perlmutter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/perlmutter.py b/balsam/platform/app_run/perlmutter.py index af5ec8b3..b4c91aed 100644 --- a/balsam/platform/app_run/perlmutter.py +++ b/balsam/platform/app_run/perlmutter.py @@ -24,7 +24,7 @@ def _build_cmdline(self) -> str: launch_params = [] for k in self._launch_params.keys(): - launch_params.append("--"+k) + launch_params.append("--" + k) launch_params.append(str(self._launch_params[k])) args = [ From 1a3071fb09a778a8ef2b95c05da512c34ba8e9fa Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Thu, 6 Apr 2023 21:09:52 -0500 Subject: [PATCH 05/54] Update deploy.txt Need dill 0.3.6 to use balsam with ipython and Jupyter notebooks --- requirements/deploy.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/deploy.txt b/requirements/deploy.txt index 434fef2b..bce28e09 100644 --- a/requirements/deploy.txt +++ b/requirements/deploy.txt @@ -27,6 +27,6 @@ psutil==5.9.2 globus-sdk==3.14.0 configobj==5.0.6 pyzmq==24.0.1 -dill==0.3.5.1 +dill==0.3.6 tblib==1.7.0 -e . From a389a2e1b72663759eafe6a2a8b300f38125ab6f Mon Sep 17 00:00:00 2001 From: Thomas Uram Date: Mon, 10 Apr 2023 20:34:39 +0000 Subject: [PATCH 06/54] Update to version a22 --- balsam/__init__.py | 2 +- balsam/_api/models.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/balsam/__init__.py b/balsam/__init__.py index 47b27195..b43cf51a 100644 --- a/balsam/__init__.py +++ b/balsam/__init__.py @@ -1,4 +1,4 @@ from balsam.util import config_root_logger -__version__ = "0.7.0.a21" +__version__ = "0.7.0.a22" config_root_logger() diff --git a/balsam/_api/models.py b/balsam/_api/models.py index 0c3fcdaa..3321a5d5 100644 --- a/balsam/_api/models.py +++ b/balsam/_api/models.py @@ -1,5 +1,5 @@ -# This file was auto-generated via /Users/turam/opt/miniconda3/bin/python balsam/schemas/api_generator.py -# [git rev ce4bdce] +# This file was auto-generated via /home/turam/miniconda3/bin/python balsam/schemas/api_generator.py +# [git rev 7634992] # Do *not* make changes to the API by changing this file! import datetime @@ -765,7 +765,7 @@ class BatchJob(balsam._api.bases.BatchJobBase): job_mode = Field[balsam.schemas.batchjob.JobMode]() optional_params = Field[typing.Dict[str, str]]() filter_tags = Field[typing.Dict[str, str]]() - partitions = Field[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]]() + partitions = Field[Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]]]() site_id = Field[int]() project = Field[str]() queue = Field[str]() @@ -786,7 +786,7 @@ def __init__( queue: str, optional_params: Optional[typing.Dict[str, str]] = None, filter_tags: Optional[typing.Dict[str, str]] = None, - partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None, + partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None, **kwargs: Any, ) -> None: """ @@ -918,7 +918,7 @@ def create( queue: str, optional_params: Optional[typing.Dict[str, str]] = None, filter_tags: Optional[typing.Dict[str, str]] = None, - partitions: Optional[typing.Optional[typing.List[balsam.schemas.batchjob.BatchJobPartition]]] = None, + partitions: Optional[typing.Union[typing.List[balsam.schemas.batchjob.BatchJobPartition], None]] = None, ) -> BatchJob: """ Create a new BatchJob object and save it to the API in one step. From 2db331499be8f0cf0a205da3c977f7d8a03c045e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 26 Apr 2023 17:05:41 -0500 Subject: [PATCH 07/54] attempt to handle missing PBS record bug --- balsam/platform/scheduler/__init__.py | 2 ++ balsam/platform/scheduler/pbs_sched.py | 10 +++++++++- balsam/platform/scheduler/scheduler.py | 4 ++++ balsam/site/service/scheduler.py | 6 +++++- 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/balsam/platform/scheduler/__init__.py b/balsam/platform/scheduler/__init__.py index 22dc7c45..e04e7f53 100644 --- a/balsam/platform/scheduler/__init__.py +++ b/balsam/platform/scheduler/__init__.py @@ -3,6 +3,7 @@ from .lsf_sched import LsfScheduler from .pbs_sched import PBSScheduler from .scheduler import ( + DelayedSubmitFail, SchedulerDeleteError, SchedulerError, SchedulerInterface, @@ -22,4 +23,5 @@ "SchedulerSubmitError", "SchedulerDeleteError", "SchedulerNonZeroReturnCode", + "DelayedSubmitFail", ] diff --git a/balsam/platform/scheduler/pbs_sched.py b/balsam/platform/scheduler/pbs_sched.py index 1f7fff5e..85f3f7d9 100644 --- a/balsam/platform/scheduler/pbs_sched.py +++ b/balsam/platform/scheduler/pbs_sched.py @@ -11,9 +11,11 @@ from balsam.util import parse_to_utc from .scheduler import ( + DelayedSubmitFail, SchedulerBackfillWindow, SchedulerJobLog, SchedulerJobStatus, + SchedulerNonZeroReturnCode, SubprocessSchedulerInterface, scheduler_subproc, ) @@ -320,7 +322,13 @@ def _parse_logs(scheduler_id: int, job_script_path: Optional[PathLike]) -> Sched args += ["-x", "-f", "-F", "json"] args += [str(scheduler_id)] logger.info(f"_parse_logs issuing qstat: {str(args)}") - stdout = scheduler_subproc(args) + try: + stdout = scheduler_subproc(args) + except SchedulerNonZeroReturnCode as e: + if "Unknown Job Id" in e: + logger.warning(f"Batch Job {scheduler_id} not found in PBS") + raise DelayedSubmitFail + return SchedulerJobLog() json_output = json.loads(stdout) # logger.info(f"_parse_logs json_output: {json_output}") if len(json_output["Jobs"]) == 0: diff --git a/balsam/platform/scheduler/scheduler.py b/balsam/platform/scheduler/scheduler.py index 9e7a1903..87b422bc 100644 --- a/balsam/platform/scheduler/scheduler.py +++ b/balsam/platform/scheduler/scheduler.py @@ -17,6 +17,10 @@ class SchedulerNonZeroReturnCode(SchedulerError): pass +class DelayedSubmitFail(SchedulerError): + pass + + class SchedulerSubmitError(SchedulerError): pass diff --git a/balsam/site/service/scheduler.py b/balsam/site/service/scheduler.py index b0fc7bb9..4c62e60f 100644 --- a/balsam/site/service/scheduler.py +++ b/balsam/site/service/scheduler.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Dict, List, Type from balsam.platform.scheduler import ( + DelayedSubmitFail, SchedulerDeleteError, SchedulerError, SchedulerNonZeroReturnCode, @@ -154,7 +155,10 @@ def run_cycle(self) -> None: job.state = BatchJobState.finished assert job.scheduler_id is not None assert job.status_info is not None - job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None)) + try: + job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None)) + except DelayedSubmitFail: + job.state = BatchJobState.submit_failed start_time = job_log.start_time end_time = job_log.end_time if start_time: From ac68b4f2c531467d99a8d05080a82e1760c63bde Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 27 Apr 2023 08:42:57 -0500 Subject: [PATCH 08/54] fixed type --- balsam/platform/scheduler/pbs_sched.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/scheduler/pbs_sched.py b/balsam/platform/scheduler/pbs_sched.py index 85f3f7d9..6401342b 100644 --- a/balsam/platform/scheduler/pbs_sched.py +++ b/balsam/platform/scheduler/pbs_sched.py @@ -325,7 +325,7 @@ def _parse_logs(scheduler_id: int, job_script_path: Optional[PathLike]) -> Sched try: stdout = scheduler_subproc(args) except SchedulerNonZeroReturnCode as e: - if "Unknown Job Id" in e: + if "Unknown Job Id" in str(e): logger.warning(f"Batch Job {scheduler_id} not found in PBS") raise DelayedSubmitFail return SchedulerJobLog() From 760049d8eb216980cd46ffd2d0483c373913d0f2 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 27 Apr 2023 09:31:04 -0500 Subject: [PATCH 09/54] fixed exception handling --- balsam/site/service/scheduler.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/balsam/site/service/scheduler.py b/balsam/site/service/scheduler.py index 4c62e60f..ab43f970 100644 --- a/balsam/site/service/scheduler.py +++ b/balsam/site/service/scheduler.py @@ -157,14 +157,17 @@ def run_cycle(self) -> None: assert job.status_info is not None try: job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None)) + + start_time = job_log.start_time + end_time = job_log.end_time + if start_time: + job.start_time = start_time + if end_time: + job.end_time = end_time + except DelayedSubmitFail: job.state = BatchJobState.submit_failed - start_time = job_log.start_time - end_time = job_log.end_time - if start_time: - job.start_time = start_time - if end_time: - job.end_time = end_time + elif job.state != scheduler_jobs[job.scheduler_id].state: job.state = scheduler_jobs[job.scheduler_id].state logger.info(f"Job {job.id} (sched_id {job.scheduler_id}) advanced to state {job.state}") From 4945700f63a79186b99b3090cb8b94e54b4aa77a Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 28 Apr 2023 16:27:45 -0500 Subject: [PATCH 10/54] first attempt to sketch out cpu affinity bindings --- balsam/platform/app_run/polaris.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 72834f39..4da45bfd 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,3 +1,5 @@ +import os + from .app_run import SubprocessAppRun @@ -9,6 +11,18 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") + if ( + cpu_bind == "none" + and self._gpus_per_rank > 0 + and self.get_num_ranks() == 8 + and self.get_cpus_per_rank == 1 + ): + gpu_device = int(os.getenv("CUDA_VISIBLE_DEVICES")) + cpu_bind_list = ["list"] + start_cpu = 32 - 8 * (1 + gpu_device) + for i in range(8): + cpu_bind_list.append(":" + str(start_cpu + i)) + cpu_bind = "".join(cpu_bind_list) nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", From 994b00db38393649ab2cad021339ebb0c2a740d7 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 10 May 2023 14:00:29 -0500 Subject: [PATCH 11/54] added all option to balsam app rm --- balsam/cmdline/app.py | 76 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/balsam/cmdline/app.py b/balsam/cmdline/app.py index 1ed8bd21..957655ad 100644 --- a/balsam/cmdline/app.py +++ b/balsam/cmdline/app.py @@ -45,20 +45,72 @@ def ls(site_selector: str, verbose: bool) -> None: @app.command() -@click.option("-n", "--name", required=True) +@click.option("-n", "--name", "name_selector", default=None) @click.option("-s", "--site", "site_selector", default="") -def rm(site_selector: str, name: str) -> None: +@click.option("-a", "--all", is_flag=True, default=False) +def rm(site_selector: str, name_selector: str, all: bool) -> None: + """ + Remove Apps + + 1) Remove named app + + balsam app rm -n hello_world + + 1) Remove all apps across a site + + balsam app rm --all --site=123,my_site_folder + + 2) Filter apps by specific site IDs or Path fragments + + balsam app rm -n hello_world --site=123,my_site_folder + + """ client = ClientSettings.load_from_file().build_client() qs = client.App.objects.all() qs = filter_by_sites(qs, site_selector) - resolved_app = qs.get(name=name) - resolved_id = resolved_app.id - appstr = f"App(id={resolved_id}, name={resolved_app.name})" - job_count = client.Job.objects.filter(app_id=resolved_id).count() - if job_count == 0: - resolved_app.delete() - click.echo(f"Deleted {appstr}: there were no associated jobs.") - elif click.confirm(f"Really Delete {appstr}?? There are {job_count} Jobs that will be ERASED!"): - resolved_app.delete() - click.echo(f"Deleted App {resolved_id} ({name})") + if all and name_selector is not None: + raise click.BadParameter("Specify app name or --all, but not both") + elif not all and name_selector is None: + raise click.BadParameter("Specify app name with -n or specify --all") + else: + app_list = [] + + if all and site_selector == "": + raise click.BadParameter("balsam app rm --all requires that you specify --site to remove jobs") + elif all and site_selector != "": + click.echo("THIS WILL DELETE ALL APPS IN SITE! CAUTION!") + app_list = [a.name for a in list(qs)] + num_apps = 0 + num_jobs = 0 + elif name_selector is not None: + app_list = [name_selector] + + if len(app_list) > 0: + for name in app_list: + resolved_app = qs.get(name=name) + resolved_id = resolved_app.id + job_count = client.Job.objects.filter(app_id=resolved_id).count() + + if name_selector is not None: + appstr = f"App(id={resolved_id}, name={resolved_app.name}, site={resolved_app.site_id})" + if job_count == 0: + resolved_app.delete() + click.echo(f"Deleted {appstr}: there were no associated jobs.") + elif click.confirm(f"Really Delete {appstr}?? There are {job_count} Jobs that will be ERASED!"): + resolved_app.delete() + click.echo(f"Deleted App {resolved_id} ({name})") + else: + num_apps += 1 + num_jobs += job_count + + if all: + if click.confirm( + f"Really DELETE {num_apps} apps and {num_jobs} jobs from site {site_selector}?? They will be ERASED!" + ): + for name in app_list: + resolved_app = qs.get(name=name) + resolved_app.delete() + click.echo(f"Deleted {num_apps} apps and {num_jobs} jobs from site {site_selector}") + else: + click.echo("Found no apps to Delete") From 3ca0ec28b59efc5c0dfa0ec3e838d4dc1ad54dd4 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 10 May 2023 15:36:10 -0500 Subject: [PATCH 12/54] updates to balsam queue ls --- balsam/cmdline/scheduler.py | 102 +++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 37 deletions(-) diff --git a/balsam/cmdline/scheduler.py b/balsam/cmdline/scheduler.py index 4bf14296..6e9ea80e 100644 --- a/balsam/cmdline/scheduler.py +++ b/balsam/cmdline/scheduler.py @@ -84,9 +84,12 @@ def submit( @queue.command() +@click.option("-n", "--num", default=3, type=int) @click.option("-h", "--history", is_flag=True, default=False) +@click.option("-v", "--verbose", is_flag=True, default=False) @click.option("--site", "site_selector", default="") -def ls(history: bool, site_selector: str) -> None: +@click.option("--id", "scheduler_id", type=int, default=None) +def ls(history: bool, verbose: bool, num: int, site_selector: str, scheduler_id: int) -> None: """ List BatchJobs @@ -97,49 +100,74 @@ def ls(history: bool, site_selector: str) -> None: 2) View historical BatchJobs at all sites balsam queue ls --history --site all + + 3) View verbose record for BatchJob with scheduler id + + balsam queue ls --id 12345 -v + + 4) View the last n BatchJobs + + balsam queue ls --num n + """ client = load_client() BatchJob = client.BatchJob qs = filter_by_sites(BatchJob.objects.all(), site_selector) + + active_only = False if not history: - qs = qs.filter(state=["pending_submission", "queued", "running", "pending_deletion"]) + active_only = True + qs_filter = qs.filter(state=["pending_submission", "queued", "running", "pending_deletion"]) + if len(qs_filter) > 0 or num == 0: + qs = qs_filter + + if scheduler_id is not None: + qs = qs.filter(scheduler_id=scheduler_id) jobs = [j.display_dict() for j in qs] - sites = {site.id: site for site in client.Site.objects.all()} - for job in jobs: - site = sites[job["site_id"]] - path_str = site.path.as_posix() - if len(path_str) > 27: - path_str = "..." + path_str[-27:] - job["site"] = f"{site.name}" - - fields = [ - "id", - "site", - "scheduler_id", - "state", - "filter_tags", - "project", - "queue", - "num_nodes", - "wall_time_min", - "job_mode", - ] - rows = [[str(j[field]) for field in fields] for j in jobs] - - col_widths = [len(f) for f in fields] - for row in rows: - for col_idx, width in enumerate(col_widths): - col_widths[col_idx] = max(width, len(row[col_idx])) - - for i, field in enumerate(fields): - fields[i] = field.rjust(col_widths[i] + 1) - - print(*fields) - for row in rows: - for i, col in enumerate(row): - row[i] = col.rjust(col_widths[i] + 1) - print(*row) + if active_only and num > 0: + click.echo(f"No active Batch Jobs. Displaying records for last {num} Batch Jobs") + jobs = jobs[-num:] + + if verbose: + for j in jobs: + click.echo(j) + else: + sites = {site.id: site for site in client.Site.objects.all()} + for job in jobs: + site = sites[job["site_id"]] + path_str = site.path.as_posix() + if len(path_str) > 27: + path_str = "..." + path_str[-27:] + job["site"] = f"{site.name}" + + fields = [ + "id", + "site", + "scheduler_id", + "state", + "filter_tags", + "project", + "queue", + "num_nodes", + "wall_time_min", + "job_mode", + ] + rows = [[str(j[field]) for field in fields] for j in jobs] + + col_widths = [len(f) for f in fields] + for row in rows: + for col_idx, width in enumerate(col_widths): + col_widths[col_idx] = max(width, len(row[col_idx])) + + for i, field in enumerate(fields): + fields[i] = field.rjust(col_widths[i] + 1) + + print(*fields) + for row in rows: + for i, col in enumerate(row): + row[i] = col.rjust(col_widths[i] + 1) + print(*row) @queue.command() From 62f0e69f28b2c1fe9f6104a04a6f084883a9617e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 10 May 2023 15:52:55 -0500 Subject: [PATCH 13/54] more updates to balsam queue ls --- balsam/cmdline/scheduler.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/balsam/cmdline/scheduler.py b/balsam/cmdline/scheduler.py index 6e9ea80e..6aca8e48 100644 --- a/balsam/cmdline/scheduler.py +++ b/balsam/cmdline/scheduler.py @@ -114,18 +114,16 @@ def ls(history: bool, verbose: bool, num: int, site_selector: str, scheduler_id: BatchJob = client.BatchJob qs = filter_by_sites(BatchJob.objects.all(), site_selector) - active_only = False if not history: - active_only = True qs_filter = qs.filter(state=["pending_submission", "queued", "running", "pending_deletion"]) - if len(qs_filter) > 0 or num == 0: + if (len(qs_filter) > 0 and scheduler_id is None) or num == 0: qs = qs_filter if scheduler_id is not None: qs = qs.filter(scheduler_id=scheduler_id) jobs = [j.display_dict() for j in qs] - if active_only and num > 0: + if not history and num > 0 and scheduler_id is None: click.echo(f"No active Batch Jobs. Displaying records for last {num} Batch Jobs") jobs = jobs[-num:] From 2016f51ceeb21668b0da2f44466fe3f7e6a23ad6 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 10 May 2023 16:15:05 -0500 Subject: [PATCH 14/54] updates to balsam job modify --- balsam/cmdline/job.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/balsam/cmdline/job.py b/balsam/cmdline/job.py index eb2943d5..ac8bffd1 100644 --- a/balsam/cmdline/job.py +++ b/balsam/cmdline/job.py @@ -315,8 +315,9 @@ def ls( @job.command() @click.option("-i", "--id", "job_ids", multiple=True, type=int) +@click.option("-t", "--tag", "tags", multiple=True, type=str, callback=validate_tags) @click.option("-s", "--state", "state", type=str) -def modify(job_ids: List[int], state: JobState) -> None: +def modify(job_ids: List[int], tags: List[str], state: JobState) -> None: """ Modify Jobs @@ -328,6 +329,8 @@ def modify(job_ids: List[int], state: JobState) -> None: jobs = client.Job.objects.all() if job_ids: jobs = jobs.filter(id=job_ids) + elif tags: + jobs = jobs.filter(tags=tags) else: raise click.BadParameter("Provide either list of Job ids or tags to delete") count = jobs.count() From 6112efe9a48a705daf0c7c0d91e84248679da830 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 10 May 2023 19:16:03 -0500 Subject: [PATCH 15/54] added forced site deletion --- balsam/cmdline/site.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/balsam/cmdline/site.py b/balsam/cmdline/site.py index f7ffbffe..4ba39ffe 100644 --- a/balsam/cmdline/site.py +++ b/balsam/cmdline/site.py @@ -1,3 +1,4 @@ +import os import shutil import socket import sys @@ -128,23 +129,40 @@ def mv(src: Union[Path, str], dest: Union[Path, str]) -> None: @site.command() -@click.argument("path", type=click.Path(exists=True, file_okay=False)) -def rm(path: Union[str, Path]) -> None: +# @click.argument("path", type=click.Path(exists=True, file_okay=False)) +@click.argument("path", type=click.Path()) +@click.option("-f", "--force", is_flag=True, default=False) +def rm(path: str, force: bool) -> None: """ Remove a balsam site balsam site rm /path/to/site """ - cf = SiteConfig(path) - client = cf.client - site = client.Site.objects.get(id=cf.site_id) - jobcount = client.Job.objects.filter(site_id=site.id).count() - warning = f"This will wipe out {jobcount} jobs inside!" if jobcount else "" - - if click.confirm(f"Do you really want to destroy {Path(path).name}? {warning}"): - site.delete() - shutil.rmtree(path) - click.echo(f"Deleted site {path}") + if not force: + if os.path.exists(path): + cf = SiteConfig(path) + client = cf.client + site = client.Site.objects.get(id=cf.site_id) + jobcount = client.Job.objects.filter(site_id=site.id).count() + warning = f"This will wipe out {jobcount} jobs inside!" if jobcount else "" + + if click.confirm(f"Do you really want to destroy {Path(path).name}? {warning}"): + site.delete() + shutil.rmtree(path) + click.echo(f"Deleted site {path}") + else: + raise click.BadParameter("Path doesn't exist") + else: + client = ClientSettings.load_from_file().build_client() + qs = client.Site.objects.all() + qs = qs.filter(path=path) + if len(qs) > 1: + raise click.BadParameter(f"Path found in {len(qs)} sites") + else: + site_id = qs[0].id + site = client.Site.objects.get(id=site_id) + site.delete() + click.echo("Forced site deletion; check for path to clean up") @site.command() From 5c85221db688af073946807a6a95324e67adf07e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 19 May 2023 18:09:04 -0500 Subject: [PATCH 16/54] updates to polaris app run --- balsam/platform/app_run/polaris.py | 56 +++++++++++++++---- .../compute_node/alcf_polaris_node.py | 2 + 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 4da45bfd..8eb806c0 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,7 +1,10 @@ +import logging import os from .app_run import SubprocessAppRun +logger = logging.getLogger(__name__) + class PolarisRun(SubprocessAppRun): """ @@ -10,19 +13,35 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] + cpu_bind = self._launch_params.get("cpu_bind", "none") - if ( - cpu_bind == "none" - and self._gpus_per_rank > 0 - and self.get_num_ranks() == 8 - and self.get_cpus_per_rank == 1 - ): - gpu_device = int(os.getenv("CUDA_VISIBLE_DEVICES")) - cpu_bind_list = ["list"] - start_cpu = 32 - 8 * (1 + gpu_device) - for i in range(8): - cpu_bind_list.append(":" + str(start_cpu + i)) + if cpu_bind == "none" and self._gpus_per_rank > 0: + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") + gpu_ids.reverse() + cpu_ids = self._node_spec.cpu_ids[0] + + cpu_bind_list = ["verbose,list"] + for gid in gpu_ids: + start_cpu = 32 - int(gid) * 8 - self.get_cpus_per_rank() + cpu_bind_list.append(":") + for icpu in range(self.get_cpus_per_rank()): + if icpu > 0: + cpu_bind_list.append(",") + cpu_bind_list.append(str(start_cpu + icpu)) + + # start_cpu = 32 - 8 * (1 + gpu_device) + # for i in range(8): + # cpu_bind_list.append(":" + str(start_cpu + i)) cpu_bind = "".join(cpu_bind_list) + logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_bind": + launch_params.append("--" + k) + launch_params.append(str(self._launch_params[k])) + nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", @@ -36,6 +55,21 @@ def _build_cmdline(self) -> str: cpu_bind, "-d", self._threads_per_rank, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) + + # Overide default because sunspot does not use CUDA + def _set_envs(self) -> None: + + envs = os.environ.copy() + envs.update(self._envs) + # Check the assigned GPU ID list from the first compute node: + gpu_ids = self._node_spec.gpu_ids[0] + + if gpu_ids: + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) + self._envs = envs diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index b5283c3b..af5925fb 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -12,6 +12,8 @@ class PolarisNode(ComputeNode): # turam: confirm number of cpus cpu_ids = list(range(64)) + # cms21: recommended cpu affinity for polaris nodes is in reverse order to gpu ids + cpu_ids.reverse() gpu_ids: List[IntStr] = list(range(4)) @classmethod From 9f1b1ca717bb400c22e9d32c73cd39e2f7a96d8d Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 19 May 2023 18:27:51 -0500 Subject: [PATCH 17/54] updates to polaris app run --- balsam/platform/app_run/polaris.py | 1 - 1 file changed, 1 deletion(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 8eb806c0..6b5afdec 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -18,7 +18,6 @@ def _build_cmdline(self) -> str: if cpu_bind == "none" and self._gpus_per_rank > 0: gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] gpu_ids = gpu_device.split(",") - gpu_ids.reverse() cpu_ids = self._node_spec.cpu_ids[0] cpu_bind_list = ["verbose,list"] From 81e3782461c4fc679bec03adab05dcf6569a7f01 Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Mon, 22 May 2023 12:09:35 -0500 Subject: [PATCH 18/54] Reduce filter chunk size Reduce filter chunk size to reduce the length of URL querying server for events. Fix to PR #246 --- balsam/_api/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/_api/manager.py b/balsam/_api/manager.py index 2a5e2941..4975dd24 100644 --- a/balsam/_api/manager.py +++ b/balsam/_api/manager.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from balsam.client import RESTClient -FILTER_CHUNK_SIZE = 512 +FILTER_CHUNK_SIZE = 500 logger = logging.getLogger(__name__) T = TypeVar("T", bound=BalsamModel) From c811f374163ec4c6e164a9fe0c6e670790712e68 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 24 May 2023 18:49:37 -0500 Subject: [PATCH 19/54] attempt to fix cpu affinity in Polaris app_run --- balsam/platform/app_run/polaris.py | 44 ++++++++++++------- .../compute_node/alcf_polaris_node.py | 9 ++-- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 6b5afdec..f04b6277 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,6 +1,8 @@ import logging import os +from balsam.platform.compute_node.alcf_polaris_node import PolarisNode + from .app_run import SubprocessAppRun logger = logging.getLogger(__name__) @@ -14,26 +16,31 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] + # cms21: currently this is broken for multinode jobs + cpu_bind = self._launch_params.get("cpu_bind", "none") if cpu_bind == "none" and self._gpus_per_rank > 0: - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") - cpu_ids = self._node_spec.cpu_ids[0] + polaris_node = PolarisNode() + # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + # gpu_ids = gpu_device.split(",") + # cpu_ids = self._node_spec.cpu_ids[0] + cpu_ids = polaris_node.cpu_ids + gpu_ids = polaris_node.gpu_ids + cpus_per_rank = self.get_cpus_per_rank() + cpu_ids_ns = self._node_spec.cpu_ids cpu_bind_list = ["verbose,list"] - for gid in gpu_ids: - start_cpu = 32 - int(gid) * 8 - self.get_cpus_per_rank() + for irank in range(self._ranks_per_node): cpu_bind_list.append(":") - for icpu in range(self.get_cpus_per_rank()): - if icpu > 0: + for i in range(cpus_per_rank): + if i > 0: cpu_bind_list.append(",") - cpu_bind_list.append(str(start_cpu + icpu)) - - # start_cpu = 32 - 8 * (1 + gpu_device) - # for i in range(8): - # cpu_bind_list.append(":" + str(start_cpu + i)) + cid = str(cpu_ids[i + cpus_per_rank * irank]) + cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + logger.info( + f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} cpu_ids_ns={cpu_ids_ns} gpu_ids={gpu_ids}" + ) launch_params = [] for k in self._launch_params.keys(): @@ -65,9 +72,16 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) # Check the assigned GPU ID list from the first compute node: - gpu_ids = self._node_spec.gpu_ids[0] + gpu_ids = self._node_spec.gpu_ids + cpu_ids = self._node_spec.cpu_ids + logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") + if gpu_ids[0] and len(self._node_spec.node_ids) == 1: + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + if not gpu_ids[0] and len(self._node_spec.node_ids) > 1 and self._gpus_per_rank > 0: + polaris_node = PolarisNode() + gpu_ids = polaris_node.gpu_ids - if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index af5925fb..c2788e6d 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,12 +10,13 @@ class PolarisNode(ComputeNode): - # turam: confirm number of cpus - cpu_ids = list(range(64)) - # cms21: recommended cpu affinity for polaris nodes is in reverse order to gpu ids - cpu_ids.reverse() + + cpu_ids = list(range(32)) gpu_ids: List[IntStr] = list(range(4)) + # cms21: optimal gpu/cpu binding on Polaris nodes goes in reverse order + gpu_ids.reverse() + @classmethod def get_job_nodelist(cls) -> List["PolarisNode"]: """ From 0936c686177f1c67c0e1980da60ab3b507bd604e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 03:16:41 -0500 Subject: [PATCH 20/54] added polaris gpu affinity script --- balsam/platform/app_run/app_run.py | 11 ++++ balsam/platform/app_run/polaris.py | 84 +++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 14 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index ff9f2cf7..5d5af973 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,6 +8,7 @@ import psutil # type: ignore +from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -72,6 +73,16 @@ def get_cpus_per_rank(self) -> int: cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) return cpu_per_rank + def get_gpus_per_node_for_job(self) -> int: + gpus_per_node = self._gpus_per_rank * self._ranks_per_node + compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + total_gpus_per_node = len(compute_node.gpu_ids) + if gpus_per_node > total_gpus_per_node: + logger.warning( + f"You have too many gpus per node! Physical gpus={total_gpus_per_node} gpus_per_rank={self._gpus_per_rank} ranks_per_node={self._ranks_per_node}" + ) + return min(gpus_per_node, total_gpus_per_node) + @abstractmethod def start(self) -> None: pass diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index f04b6277..1a0bfae7 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,5 +1,6 @@ import logging import os +import stat from balsam.platform.compute_node.alcf_polaris_node import PolarisNode @@ -19,13 +20,66 @@ def _build_cmdline(self) -> str: # cms21: currently this is broken for multinode jobs cpu_bind = self._launch_params.get("cpu_bind", "none") + gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: - polaris_node = PolarisNode() - # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - # gpu_ids = gpu_device.split(",") - # cpu_ids = self._node_spec.cpu_ids[0] - cpu_ids = polaris_node.cpu_ids - gpu_ids = polaris_node.gpu_ids + if len(self._node_spec.node_ids) == 1 or self._ranks_per_node == 1: + cpu_ids = self._node_spec.cpu_ids[0] + gpu_ids = self._node_spec.gpu_ids[0] + else: + gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split( + "," + ) # These should be distributed across local ranks + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = polaris_node.cpu_ids + node_gpu_ids = polaris_node.gpu_ids + gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") + with open(gpu_affinity_script, "w") as f: + f.write( + f"""#!/bin/bash -l + gpu_ids=( "{" ".join(gpu_ids)}" ) + num_gpus={len(node_gpu_ids)} + gpus_per_rank={self._gpus_per_rank} + ngpu=0 + gpu_string=""\n + """ + ) + f.write( + """while [ $ngpu -lt $gpus_per_rank ] + do + igpu=$(((${PMI_LOCAL_RANK} * ${gpus_per_rank}) + ${ngpu} % ${num_gpus})) + gpu=${gpu_ids[$igpu]} + ##gpu=$((${num_gpus} - 1 - ${ngpu} - (${PMI_LOCAL_RANK} * ${gpus_per_rank}) % ${num_gpus})) + sep="" + if [ $ngpu -gt 0 ] + then + sep="," + fi + gpu_string=$gpu_string$sep$gpu + ngpu=$((${igpu} + 1)) + done + export CUDA_VISIBLE_DEVICES=$gpu_string + echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= $gpu_string” + exec "$@" + """ + ) + st = os.stat(gpu_affinity_script) + os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) + + # gpu_ids = polaris_node.gpu_ids + # num_gpus = len(gpu_ids) + # gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") + # with open(gpu_affinity_script,"w") as f: + # f.write(f"""#!/bin/bash -l + # num_gpus={num_gpus} + # gpus_per_rank={self._gpus_per_rank}\n"""+ + # """gpu=$((${num_gpus} - 1 - ${PMI_LOCAL_RANK} % ${num_gpus}))\n + # export CUDA_VISIBLE_DEVICES=$gpu\n + # echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= ${gpu}”\n + # exec "$@"\n + # """) + # st = os.stat(gpu_affinity_script) + # os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) + cpus_per_rank = self.get_cpus_per_rank() cpu_ids_ns = self._node_spec.cpu_ids @@ -62,6 +116,7 @@ def _build_cmdline(self) -> str: "-d", self._threads_per_rank, *launch_params, + gpu_affinity_script, self._cmdline, ] return " ".join(str(arg) for arg in args) @@ -72,17 +127,18 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) # Check the assigned GPU ID list from the first compute node: - gpu_ids = self._node_spec.gpu_ids - cpu_ids = self._node_spec.cpu_ids + gpu_ids = self._node_spec.gpu_ids[0] + cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids[0] and len(self._node_spec.node_ids) == 1: + if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - if not gpu_ids[0] and len(self._node_spec.node_ids) > 1 and self._gpus_per_rank > 0: - polaris_node = PolarisNode() - gpu_ids = polaris_node.gpu_ids + else: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + if self._gpus_per_rank > 0: + gpu_ids = polaris_node.gpu_ids[0 : self.get_gpus_per_node_for_job()] + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From 08e09766a6fa1ee912e209b76f56e36479ac9424 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 10:20:40 -0500 Subject: [PATCH 21/54] fixes to the affinity script --- balsam/platform/app_run/polaris.py | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 1a0bfae7..c30f4536 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -22,7 +22,7 @@ def _build_cmdline(self) -> str: cpu_bind = self._launch_params.get("cpu_bind", "none") gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: - if len(self._node_spec.node_ids) == 1 or self._ranks_per_node == 1: + if len(self._node_spec.node_ids) == 1: cpu_ids = self._node_spec.cpu_ids[0] gpu_ids = self._node_spec.gpu_ids[0] else: @@ -31,12 +31,15 @@ def _build_cmdline(self) -> str: ) # These should be distributed across local ranks polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids + + if len(self._node_spec.node_ids) > 1 or self._ranks_per_node > 1: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) node_gpu_ids = polaris_node.gpu_ids gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") with open(gpu_affinity_script, "w") as f: f.write( f"""#!/bin/bash -l - gpu_ids=( "{" ".join(gpu_ids)}" ) + gpu_ids=( {" ".join(gpu_ids)} ) num_gpus={len(node_gpu_ids)} gpus_per_rank={self._gpus_per_rank} ngpu=0 @@ -65,21 +68,6 @@ def _build_cmdline(self) -> str: st = os.stat(gpu_affinity_script) os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - # gpu_ids = polaris_node.gpu_ids - # num_gpus = len(gpu_ids) - # gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") - # with open(gpu_affinity_script,"w") as f: - # f.write(f"""#!/bin/bash -l - # num_gpus={num_gpus} - # gpus_per_rank={self._gpus_per_rank}\n"""+ - # """gpu=$((${num_gpus} - 1 - ${PMI_LOCAL_RANK} % ${num_gpus}))\n - # export CUDA_VISIBLE_DEVICES=$gpu\n - # echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= ${gpu}”\n - # exec "$@"\n - # """) - # st = os.stat(gpu_affinity_script) - # os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - cpus_per_rank = self.get_cpus_per_rank() cpu_ids_ns = self._node_spec.cpu_ids From e61c12cbe79afad2e8ebbcb7aa492ab558aeb329 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 10:49:10 -0500 Subject: [PATCH 22/54] some style changes --- balsam/platform/app_run/polaris.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index c30f4536..0f258033 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -2,7 +2,7 @@ import os import stat -from balsam.platform.compute_node.alcf_polaris_node import PolarisNode +from balsam.platform.compute_node import PolarisNode from .app_run import SubprocessAppRun @@ -17,8 +17,6 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] - # cms21: currently this is broken for multinode jobs - cpu_bind = self._launch_params.get("cpu_bind", "none") gpu_affinity_script = "" if cpu_bind == "none" and self._gpus_per_rank > 0: From 7bcbc52c353a06977ee880c6cd02a69d21396519 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 25 May 2023 15:39:26 -0500 Subject: [PATCH 23/54] reverting affinity script addition, put in different branch --- balsam/platform/app_run/polaris.py | 62 ++++-------------------------- 1 file changed, 8 insertions(+), 54 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 0f258033..05b506c1 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -1,6 +1,5 @@ import logging import os -import stat from balsam.platform.compute_node import PolarisNode @@ -18,56 +17,15 @@ def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") - gpu_affinity_script = "" - if cpu_bind == "none" and self._gpus_per_rank > 0: + if cpu_bind == "none" and self._gpus_per_rank > 0 and self._ranks_per_node == 1: + gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split(",") if len(self._node_spec.node_ids) == 1: cpu_ids = self._node_spec.cpu_ids[0] - gpu_ids = self._node_spec.gpu_ids[0] else: - gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split( - "," - ) # These should be distributed across local ranks polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids - if len(self._node_spec.node_ids) > 1 or self._ranks_per_node > 1: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - node_gpu_ids = polaris_node.gpu_ids - gpu_affinity_script = self._cwd.joinpath("set_affinity_gpu_polaris.sh") - with open(gpu_affinity_script, "w") as f: - f.write( - f"""#!/bin/bash -l - gpu_ids=( {" ".join(gpu_ids)} ) - num_gpus={len(node_gpu_ids)} - gpus_per_rank={self._gpus_per_rank} - ngpu=0 - gpu_string=""\n - """ - ) - f.write( - """while [ $ngpu -lt $gpus_per_rank ] - do - igpu=$(((${PMI_LOCAL_RANK} * ${gpus_per_rank}) + ${ngpu} % ${num_gpus})) - gpu=${gpu_ids[$igpu]} - ##gpu=$((${num_gpus} - 1 - ${ngpu} - (${PMI_LOCAL_RANK} * ${gpus_per_rank}) % ${num_gpus})) - sep="" - if [ $ngpu -gt 0 ] - then - sep="," - fi - gpu_string=$gpu_string$sep$gpu - ngpu=$((${igpu} + 1)) - done - export CUDA_VISIBLE_DEVICES=$gpu_string - echo “RANK= ${PMI_RANK} LOCAL_RANK= ${PMI_LOCAL_RANK} gpu= $gpu_string” - exec "$@" - """ - ) - st = os.stat(gpu_affinity_script) - os.chmod(gpu_affinity_script, st.st_mode | stat.S_IEXEC) - cpus_per_rank = self.get_cpus_per_rank() - cpu_ids_ns = self._node_spec.cpu_ids cpu_bind_list = ["verbose,list"] for irank in range(self._ranks_per_node): @@ -78,9 +36,7 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - logger.info( - f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} cpu_ids_ns={cpu_ids_ns} gpu_ids={gpu_ids}" - ) + logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): @@ -102,7 +58,6 @@ def _build_cmdline(self) -> str: "-d", self._threads_per_rank, *launch_params, - gpu_affinity_script, self._cmdline, ] return " ".join(str(arg) for arg in args) @@ -116,15 +71,14 @@ def _set_envs(self) -> None: gpu_ids = self._node_spec.gpu_ids[0] cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids: + if gpu_ids and self._ranks_per_node == 1: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - else: + if not gpu_ids and self._ranks_per_node == 1 and self._gpus_per_rank > 0: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - if self._gpus_per_rank > 0: - gpu_ids = polaris_node.gpu_ids[0 : self.get_gpus_per_node_for_job()] - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) + gpu_ids = polaris_node.gpu_ids[0 : self._gpus_per_rank] + envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From b0973cf47b60852b84ef77ac5556087152554b0f Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 10:39:22 -0500 Subject: [PATCH 24/54] removed helper function --- balsam/platform/app_run/app_run.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 5d5af973..ff9f2cf7 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,7 +8,6 @@ import psutil # type: ignore -from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -73,16 +72,6 @@ def get_cpus_per_rank(self) -> int: cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) return cpu_per_rank - def get_gpus_per_node_for_job(self) -> int: - gpus_per_node = self._gpus_per_rank * self._ranks_per_node - compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - total_gpus_per_node = len(compute_node.gpu_ids) - if gpus_per_node > total_gpus_per_node: - logger.warning( - f"You have too many gpus per node! Physical gpus={total_gpus_per_node} gpus_per_rank={self._gpus_per_rank} ranks_per_node={self._ranks_per_node}" - ) - return min(gpus_per_node, total_gpus_per_node) - @abstractmethod def start(self) -> None: pass From 77f8941307b102c088ccda0f1bc2a65d5f24ce0b Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 13:54:22 -0500 Subject: [PATCH 25/54] Updates to polaris cmdline implementation after dev discussion; includes notes --- balsam/platform/app_run/polaris.py | 47 +++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 05b506c1..6ca5b397 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -17,9 +17,20 @@ def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] cpu_bind = self._launch_params.get("cpu_bind", "none") - if cpu_bind == "none" and self._gpus_per_rank > 0 and self._ranks_per_node == 1: - gpu_ids = self._envs["CUDA_VISIBLE_DEVICES"].split(",") - if len(self._node_spec.node_ids) == 1: + + # If the user does not set a cpu_bind option and gpus are being used, + # this code sets cpu-bind to be optimal for the gpus being used. + # This does not handle the case where the application is using less than + # 8 cpus per gpu. This code will not skip the appropriate number of cpus + # in the rank binding assignments. + if cpu_bind == "none" and self._gpus_per_rank > 0: + + # Here we grab the cpu_ids assigned to the job in the NodeSpec object + # If this is not set in NodeSpec (it is only set for single node jobs), + # then we take the cpu_id list from the Polaris ComputeNode subclass, + # assuming the job will have use of all the cpus in nodes assigned to it. + cpu_ids_ns = self._node_spec.cpu_ids[0] + if cpu_ids_ns: cpu_ids = self._node_spec.cpu_ids[0] else: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) @@ -27,6 +38,8 @@ def _build_cmdline(self) -> str: cpus_per_rank = self.get_cpus_per_rank() + # PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind + # in ascending cpu order is what we want here. cpu_bind_list = ["verbose,list"] for irank in range(self._ranks_per_node): cpu_bind_list.append(":") @@ -36,6 +49,8 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] @@ -67,18 +82,28 @@ def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) - # Check the assigned GPU ID list from the first compute node: + + # Here we grab the gpus assigned to the job from NodeSpec. NodeSpec only + # sets this for single node jobs. For multinode jobs, gpu_ids below will + # be an empty list of lists (e.g. [[], []]). The ordering of the gpu_ids + # is reversed in PolarisNode and therefore the reverse ordering of + # cpus to gpus should be reflected here gpu_ids = self._node_spec.gpu_ids[0] cpu_ids = self._node_spec.cpu_ids[0] logger.info(f"Polaris set_envs: gpu_ids={gpu_ids} cpu_ids={cpu_ids}") - if gpu_ids and self._ranks_per_node == 1: - envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - if not gpu_ids and self._ranks_per_node == 1 and self._gpus_per_rank > 0: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - gpu_ids = polaris_node.gpu_ids[0 : self._gpus_per_rank] + + # Here we set CUDA_VISIBLE_DEVICES for single node jobs only. We assume + # for multinode jobs that the job has access to all gpus, and + # CUDA_VISIBLE_DEVICES is set by the user, for example by local rank with an + # gpu_affinity.sh script that wraps around the user application in the + # ApplicationDefinition. + # One special case: if your job has one node, 2 ranks, and 1 gpu per rank, the + # code here will set CUDA_VISIBLE_DEVICES to "3,2" or "1,0". A user provided + # gpu_affinity.sh script should take this assigment and use it to reset + # CUDA_VISIBLE_DEVICES for each local rank. The user script should NOT + # round-robin the setting CUDA_VISIBLE_DEVICES starting from 3. + if gpu_ids: envs["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" envs["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_ids)) - envs["OMP_NUM_THREADS"] = str(self._threads_per_rank) self._envs = envs From 2efaa8ed82c87487188a1dac01ee0d08aafb5451 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 17:23:17 -0500 Subject: [PATCH 26/54] remove turam path from polaris job-template.sh --- balsam/config/defaults/alcf_polaris/job-template.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/balsam/config/defaults/alcf_polaris/job-template.sh b/balsam/config/defaults/alcf_polaris/job-template.sh index 8dae69c2..dd090dee 100644 --- a/balsam/config/defaults/alcf_polaris/job-template.sh +++ b/balsam/config/defaults/alcf_polaris/job-template.sh @@ -8,8 +8,6 @@ export http_proxy="http://proxy:3128" export https_proxy="http://proxy:3128" -export PYTHONPATH=/home/turam/dev/polaris/balsam:$PYTHONPATH - #remove export PMI_NO_FORK=1 export BALSAM_SITE_PATH={{balsam_site_path}} cd $BALSAM_SITE_PATH From 1281a794650a3311bfb671e7656153ca60bfda10 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 26 May 2023 17:24:04 -0500 Subject: [PATCH 27/54] more updates to polaris cmdline --- balsam/platform/app_run/polaris.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 6ca5b397..46062f2d 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -49,8 +49,11 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") + if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): + gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + gpu_ids = gpu_device.split(",") + else: + gpu_ids = [] logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] @@ -71,7 +74,7 @@ def _build_cmdline(self) -> str: "--cpu-bind", cpu_bind, "-d", - self._threads_per_rank, + self.get_cpus_per_rank(), *launch_params, self._cmdline, ] From 1b64cdb798f7804b652782149d0b4d07b5449089 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 21:56:52 -0500 Subject: [PATCH 28/54] changes to make depth paramter for Polaris app_run consistent with docs --- balsam/platform/app_run/app_run.py | 23 +++++++++++++++++++---- balsam/platform/app_run/polaris.py | 18 +++++++++++++++++- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index ff9f2cf7..7713b974 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -8,6 +8,7 @@ import psutil # type: ignore +from balsam.platform.compute_node import ComputeNode from balsam.site.launcher import NodeSpec logger = logging.getLogger(__name__) @@ -67,10 +68,24 @@ def get_num_ranks(self) -> int: return self._ranks_per_node * len(self._node_spec.node_ids) def get_cpus_per_rank(self) -> int: - cpu_per_rank = len(self._node_spec.cpu_ids[0]) // self._ranks_per_node - if not cpu_per_rank: - cpu_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) - return cpu_per_rank + + # Get the list of cpus assigned to the job. If it is a single node job, that is stored in + # the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty, + # so we will assume all cpus on a compute node are available to the job. The list of cpus is + # just the list of cpus on the node in that case. + cpu_ids = self._node_spec.cpu_ids[0] + if not cpu_ids: + compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = compute_node.cpu_ids + + cpus_per_node = len(cpu_ids) + cpus_per_rank = cpus_per_node // self._ranks_per_node + + # If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of + # 1 cpu per rank or the number of cores per rank from the threading settings + if not cpus_per_rank: + cpus_per_rank = max(1, int(self._threads_per_rank // self._threads_per_core)) + return cpus_per_rank @abstractmethod def start(self) -> None: diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 46062f2d..5f59cd78 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -32,6 +32,8 @@ def _build_cmdline(self) -> str: cpu_ids_ns = self._node_spec.cpu_ids[0] if cpu_ids_ns: cpu_ids = self._node_spec.cpu_ids[0] + if self._threads_per_core == 2: + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) else: polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) cpu_ids = polaris_node.cpu_ids @@ -48,6 +50,13 @@ def _build_cmdline(self) -> str: cpu_bind_list.append(",") cid = str(cpu_ids[i + cpus_per_rank * irank]) cpu_bind_list.append(cid) + # If the job is using 2 hardware threads per core, we need to add those threads to the list + # The additional threads should go in the same ascending order (threads 0 and 32 are on the + # same physical core, threads 31 and 63 are on the same physical core) + if self._threads_per_core == 2: + cpu_bind_list.append(",") + cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) + cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] @@ -62,6 +71,13 @@ def _build_cmdline(self) -> str: launch_params.append("--" + k) launch_params.append(str(self._launch_params[k])) + # The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of + # physical cores per rank, otherwise it is the number of hardware threads per rank + # https://docs.alcf.anl.gov/running-jobs/example-job-scripts/ + depth = self._threads_per_rank + if "core" in cpu_bind: + depth = self.get_cpus_per_rank() + nid_str = ",".join(map(str, node_ids)) args = [ "mpiexec", @@ -74,7 +90,7 @@ def _build_cmdline(self) -> str: "--cpu-bind", cpu_bind, "-d", - self.get_cpus_per_rank(), + depth, *launch_params, self._cmdline, ] From 937947ecfa7f483bb8ee789a6ba4c003f6bd7e28 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 22:41:17 -0500 Subject: [PATCH 29/54] Removed blank lines --- balsam/platform/app_run/app_run.py | 1 - balsam/platform/app_run/polaris.py | 2 -- balsam/platform/compute_node/alcf_polaris_node.py | 1 - 3 files changed, 4 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 7713b974..2aa06e39 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -68,7 +68,6 @@ def get_num_ranks(self) -> int: return self._ranks_per_node * len(self._node_spec.node_ids) def get_cpus_per_rank(self) -> int: - # Get the list of cpus assigned to the job. If it is a single node job, that is stored in # the NodeSpec object. If it is a multinode job, the cpu_ids assigned to NodeSpec is empty, # so we will assume all cpus on a compute node are available to the job. The list of cpus is diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 5f59cd78..761878a5 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -24,7 +24,6 @@ def _build_cmdline(self) -> str: # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. if cpu_bind == "none" and self._gpus_per_rank > 0: - # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, @@ -98,7 +97,6 @@ def _build_cmdline(self) -> str: # Overide default because sunspot does not use CUDA def _set_envs(self) -> None: - envs = os.environ.copy() envs.update(self._envs) diff --git a/balsam/platform/compute_node/alcf_polaris_node.py b/balsam/platform/compute_node/alcf_polaris_node.py index c2788e6d..208490a1 100644 --- a/balsam/platform/compute_node/alcf_polaris_node.py +++ b/balsam/platform/compute_node/alcf_polaris_node.py @@ -10,7 +10,6 @@ class PolarisNode(ComputeNode): - cpu_ids = list(range(32)) gpu_ids: List[IntStr] = list(range(4)) From 8d6f5f00f1fd2f14c703998e8955c7074640f478 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 31 May 2023 22:51:30 -0500 Subject: [PATCH 30/54] lint fixes --- balsam/_api/model.py | 2 +- balsam/config/config.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/balsam/_api/model.py b/balsam/_api/model.py index 05d4831a..38ae2c48 100644 --- a/balsam/_api/model.py +++ b/balsam/_api/model.py @@ -186,7 +186,7 @@ def __repr__(self) -> str: def __str__(self) -> str: d = self.display_dict() - return yaml.dump(d, sort_keys=False, indent=4) # type: ignore + return yaml.dump(d, sort_keys=False, indent=4) def __eq__(self, other: Any) -> bool: if not isinstance(other, BalsamModel): diff --git a/balsam/config/config.py b/balsam/config/config.py index 5766afc5..00d95c69 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -235,13 +235,10 @@ def save(self, path: Union[str, Path]) -> None: fp.write(self.dump_yaml()) def dump_yaml(self) -> str: - return cast( - str, - yaml.dump( - json.loads(self.json()), - sort_keys=False, - indent=4, - ), + return yaml.dump( + json.loads(self.json()), + sort_keys=False, + indent=4, ) @classmethod From c57beb787ba78471dbf73950cb78434aaf0fc0a4 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 1 Jun 2023 11:56:46 -0500 Subject: [PATCH 31/54] fix type error --- balsam/platform/app_run/app_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index 2aa06e39..c4efb45b 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -75,7 +75,7 @@ def get_cpus_per_rank(self) -> int: cpu_ids = self._node_spec.cpu_ids[0] if not cpu_ids: compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - cpu_ids = compute_node.cpu_ids + cpu_ids = list(compute_node.cpu_ids) cpus_per_node = len(cpu_ids) cpus_per_rank = cpus_per_node // self._ranks_per_node From 0691ed3d88c2170c4f355808e9c311d95145aecf Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Thu, 1 Jun 2023 12:00:49 -0500 Subject: [PATCH 32/54] fix type error --- balsam/platform/app_run/app_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balsam/platform/app_run/app_run.py b/balsam/platform/app_run/app_run.py index c4efb45b..3c25c4f6 100644 --- a/balsam/platform/app_run/app_run.py +++ b/balsam/platform/app_run/app_run.py @@ -73,11 +73,11 @@ def get_cpus_per_rank(self) -> int: # so we will assume all cpus on a compute node are available to the job. The list of cpus is # just the list of cpus on the node in that case. cpu_ids = self._node_spec.cpu_ids[0] + cpus_per_node = len(cpu_ids) if not cpu_ids: compute_node = ComputeNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - cpu_ids = list(compute_node.cpu_ids) + cpus_per_node = len(compute_node.cpu_ids) - cpus_per_node = len(cpu_ids) cpus_per_rank = cpus_per_node // self._ranks_per_node # If ranks are oversubscribed to cpus (ranks_per_node > cpus_per_node), set it to a minimum of From 22a4718cacc5fadbed04340e127d5f8ec510d91a Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 6 Jun 2023 14:22:22 -0500 Subject: [PATCH 33/54] split node names for machines that use full node address --- balsam/site/launcher/_serial_mode_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/balsam/site/launcher/_serial_mode_worker.py b/balsam/site/launcher/_serial_mode_worker.py index b861d63a..5ed46e14 100644 --- a/balsam/site/launcher/_serial_mode_worker.py +++ b/balsam/site/launcher/_serial_mode_worker.py @@ -239,7 +239,7 @@ def worker_main( SigHandler() site_config.enable_logging("serial_mode", filename=log_filename + f".{hostname}") - if hostname == master_host: + if hostname == master_host.split(".")[0]: logger.info(f"Launching master subprocess on {hostname}") master_proc = launch_master_subprocess() else: @@ -247,7 +247,8 @@ def worker_main( launch_settings = site_config.settings.launcher node_cls = launch_settings.compute_node - nodes = [node for node in node_cls.get_job_nodelist() if node.hostname == hostname] + logger.debug(f"node.hostname={node_cls.get_job_nodelist()[0].hostname} and hostname={hostname}") + nodes = [node for node in node_cls.get_job_nodelist() if node.hostname.split(".")[0] == hostname] node_manager = NodeManager(nodes, allow_node_packing=True) worker = Worker( app_run=launch_settings.local_app_launcher, From 9f70b10ee350627a7ae805a1de3b6ebbeb894dda Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 6 Jun 2023 16:41:22 -0500 Subject: [PATCH 34/54] lint fixes --- balsam/_api/model.py | 2 +- balsam/config/config.py | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/balsam/_api/model.py b/balsam/_api/model.py index 05d4831a..38ae2c48 100644 --- a/balsam/_api/model.py +++ b/balsam/_api/model.py @@ -186,7 +186,7 @@ def __repr__(self) -> str: def __str__(self) -> str: d = self.display_dict() - return yaml.dump(d, sort_keys=False, indent=4) # type: ignore + return yaml.dump(d, sort_keys=False, indent=4) def __eq__(self, other: Any) -> bool: if not isinstance(other, BalsamModel): diff --git a/balsam/config/config.py b/balsam/config/config.py index 5766afc5..00d95c69 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -235,13 +235,10 @@ def save(self, path: Union[str, Path]) -> None: fp.write(self.dump_yaml()) def dump_yaml(self) -> str: - return cast( - str, - yaml.dump( - json.loads(self.json()), - sort_keys=False, - indent=4, - ), + return yaml.dump( + json.loads(self.json()), + sort_keys=False, + indent=4, ) @classmethod From 92b528d24ae41e674f4613d3ddc2b8bd5577918f Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 7 Jun 2023 17:10:32 -0500 Subject: [PATCH 35/54] some changes to balsam queue ls --- balsam/cmdline/scheduler.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/balsam/cmdline/scheduler.py b/balsam/cmdline/scheduler.py index 6aca8e48..12e9cccf 100644 --- a/balsam/cmdline/scheduler.py +++ b/balsam/cmdline/scheduler.py @@ -84,11 +84,11 @@ def submit( @queue.command() -@click.option("-n", "--num", default=3, type=int) +@click.option("-n", "--num", default=0, type=int) @click.option("-h", "--history", is_flag=True, default=False) @click.option("-v", "--verbose", is_flag=True, default=False) @click.option("--site", "site_selector", default="") -@click.option("--id", "scheduler_id", type=int, default=None) +@click.option("--scheduler_id", "scheduler_id", type=int, default=None) def ls(history: bool, verbose: bool, num: int, site_selector: str, scheduler_id: int) -> None: """ List BatchJobs @@ -103,7 +103,7 @@ def ls(history: bool, verbose: bool, num: int, site_selector: str, scheduler_id: 3) View verbose record for BatchJob with scheduler id - balsam queue ls --id 12345 -v + balsam queue ls --scheduler_id 12345 -v 4) View the last n BatchJobs @@ -114,17 +114,17 @@ def ls(history: bool, verbose: bool, num: int, site_selector: str, scheduler_id: BatchJob = client.BatchJob qs = filter_by_sites(BatchJob.objects.all(), site_selector) - if not history: - qs_filter = qs.filter(state=["pending_submission", "queued", "running", "pending_deletion"]) - if (len(qs_filter) > 0 and scheduler_id is None) or num == 0: - qs = qs_filter + if not history and scheduler_id is None and num == 0: + qs = qs.filter(state=["pending_submission", "queued", "running", "pending_deletion"]) + if len(qs) == 0: + click.echo("No active batch jobs. Use --history option to list completed batch jobs.") if scheduler_id is not None: qs = qs.filter(scheduler_id=scheduler_id) jobs = [j.display_dict() for j in qs] if not history and num > 0 and scheduler_id is None: - click.echo(f"No active Batch Jobs. Displaying records for last {num} Batch Jobs") + click.echo(f"Displaying records for last {num} Batch Jobs") jobs = jobs[-num:] if verbose: From 367541149ebab6cb0a926645e55e415258f8adda Mon Sep 17 00:00:00 2001 From: Bas van der Vlies Date: Fri, 9 Jun 2023 12:01:00 +0200 Subject: [PATCH 36/54] Problems with docker compose and the latest python:3-slim version We get all kind of errors, See: * https://github.com/argonne-lcf/balsam/issues/343 so pinned it on `FROM python:3.10-slim` --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a916ee3a..c32c9f57 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3-slim +FROM python:3.10-slim WORKDIR /balsam From ad0e661b68a9bfd9f5c37ee537c75dac48043bc5 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 13 Jun 2023 16:28:22 -0500 Subject: [PATCH 37/54] made change to accept a user setting cpu_bind to none --- balsam/platform/app_run/polaris.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 761878a5..d18efb12 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -16,14 +16,14 @@ class PolarisRun(SubprocessAppRun): def _build_cmdline(self) -> str: node_ids = [h for h in self._node_spec.hostnames] - cpu_bind = self._launch_params.get("cpu_bind", "none") - - # If the user does not set a cpu_bind option and gpus are being used, + # If the user does not set a cpu_bind option, # this code sets cpu-bind to be optimal for the gpus being used. # This does not handle the case where the application is using less than # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. - if cpu_bind == "none" and self._gpus_per_rank > 0: + if "cpu_bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("cpu_bind", "none") + else: # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, @@ -57,12 +57,12 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): - gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - gpu_ids = gpu_device.split(",") - else: - gpu_ids = [] - logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") + # if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): + # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] + # gpu_ids = gpu_device.split(",") + # else: + # gpu_ids = [] + # logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): From ee582b30fdb605b87e7214dfe395a0f2cf98dfc0 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 14 Jul 2023 13:50:38 -0500 Subject: [PATCH 38/54] updated theta launch params --- balsam/platform/app_run/theta.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/balsam/platform/app_run/theta.py b/balsam/platform/app_run/theta.py index 9576e5f4..614a7411 100644 --- a/balsam/platform/app_run/theta.py +++ b/balsam/platform/app_run/theta.py @@ -14,6 +14,13 @@ def _pre_popen(self) -> None: def _build_cmdline(self) -> str: node_ids = [nid for nid in self._node_spec.node_ids] nid_str = ",".join(map(str, node_ids)) + + launch_params = [] + for k in self._launch_params.keys(): + if k != "cpu_affinity": + launch_params.append(k) + launch_params.append(str(self._launch_params[k])) + cpu_affinity = self._launch_params.get("cpu_affinity", "none") if cpu_affinity not in ["none", "depth"]: cpu_affinity = "none" @@ -31,6 +38,7 @@ def _build_cmdline(self) -> str: self._threads_per_rank, "-j", self._threads_per_core, + *launch_params, self._cmdline, ] return " ".join(str(arg) for arg in args) From ea919c6145e11c3326c3c963ead1f0114494e514 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 14 Jul 2023 14:34:14 -0500 Subject: [PATCH 39/54] removed white space --- balsam/platform/app_run/theta.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/balsam/platform/app_run/theta.py b/balsam/platform/app_run/theta.py index 614a7411..e79b3167 100644 --- a/balsam/platform/app_run/theta.py +++ b/balsam/platform/app_run/theta.py @@ -14,13 +14,13 @@ def _pre_popen(self) -> None: def _build_cmdline(self) -> str: node_ids = [nid for nid in self._node_spec.node_ids] nid_str = ",".join(map(str, node_ids)) - + launch_params = [] for k in self._launch_params.keys(): if k != "cpu_affinity": launch_params.append(k) launch_params.append(str(self._launch_params[k])) - + cpu_affinity = self._launch_params.get("cpu_affinity", "none") if cpu_affinity not in ["none", "depth"]: cpu_affinity = "none" From a56bcdc1ea63666a112c8e0edf20cf96068ea44e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Wed, 19 Jul 2023 15:32:52 -0500 Subject: [PATCH 40/54] allow session to sort jobs according to parameter passed in optional params --- balsam/server/models/crud/sessions.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index ff40985a..40c53713 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -192,6 +192,18 @@ def acquire( .limit(spec.max_num_jobs) .with_for_update(of=models.Job.__table__, skip_locked=True) ) + if "sort_walltime_first" in models.BatchJob.optional_params.keys(): + if models.BatchJob.optional_params["sort_walltime_first"]: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.node_packing_count.desc(), + models.Job.num_nodes.asc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore From 58a3b4476b7abe5d81e9a9fbb3638e51a80a90c8 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 21 Jul 2023 11:06:26 -0500 Subject: [PATCH 41/54] change to SessionAcquire --- balsam/_api/bases.py | 1 + balsam/schemas/session.py | 1 + balsam/server/models/crud/sessions.py | 41 +++++++++++---------- balsam/site/job_source.py | 4 ++ balsam/site/launcher/_mpi_mode.py | 1 + balsam/site/launcher/_serial_mode_master.py | 1 + 6 files changed, 29 insertions(+), 20 deletions(-) diff --git a/balsam/_api/bases.py b/balsam/_api/bases.py index e7560bde..ccda194e 100644 --- a/balsam/_api/bases.py +++ b/balsam/_api/bases.py @@ -370,6 +370,7 @@ def acquire_jobs( max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, serial_only: bool = False, + sort_by: Optional[str] = None, filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = RUNNABLE_STATES, app_ids: Optional[Set[int]] = None, diff --git a/balsam/schemas/session.py b/balsam/schemas/session.py index e978c7f4..31fb0298 100644 --- a/balsam/schemas/session.py +++ b/balsam/schemas/session.py @@ -29,6 +29,7 @@ class SessionAcquire(BaseModel): max_nodes_per_job: Optional[int] max_aggregate_nodes: Optional[float] serial_only: bool = False + sort_by: Optional[str] = None filter_tags: Dict[str, str] states: Set[JobState] = RUNNABLE_STATES app_ids: Set[int] = set() diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 40c53713..ddedce53 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -182,28 +182,29 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), + if spec.sort_by == "wall_time": + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.node_packing_count.desc(), + models.Job.num_nodes.asc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) - if "sort_walltime_first" in models.BatchJob.optional_params.keys(): - if models.BatchJob.optional_params["sort_walltime_first"]: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.wall_time_min.desc(), - models.Job.node_packing_count.desc(), - models.Job.num_nodes.asc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) + else: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) + locked_ids = db.execute(lock_ids_q).scalars().all() subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore diff --git a/balsam/site/job_source.py b/balsam/site/job_source.py index fe4a6bef..9fa878da 100644 --- a/balsam/site/job_source.py +++ b/balsam/site/job_source.py @@ -72,6 +72,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[str] = {"PREPROCESSED", "RESTART_READY"}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, max_nodes_per_job: Optional[int] = None, max_aggregate_nodes: Optional[float] = None, @@ -158,6 +159,7 @@ def _get_acquire_parameters(self, num_jobs: int) -> Dict[str, Any]: max_aggregate_nodes=self.max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, @@ -182,6 +184,7 @@ def __init__( filter_tags: Optional[Dict[str, str]] = None, states: Set[JobState] = {JobState.preprocessed, JobState.restart_ready}, serial_only: bool = False, + sort_by: Optional[str] = None, max_wall_time_min: Optional[int] = None, scheduler_id: Optional[int] = None, app_ids: Optional[Set[int]] = None, @@ -229,6 +232,7 @@ def get_jobs( max_aggregate_nodes=max_aggregate_nodes, max_wall_time_min=request_time, serial_only=self.serial_only, + sort_by=self.sort_by, filter_tags=self.filter_tags, states=self.states, app_ids=self.app_ids, diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index c3662984..6948fa59 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -275,6 +275,7 @@ def main( client=site_config.client, site_id=site_config.site_id, filter_tags=filter_tags_dict, + sort_by=site_config.settings.launcher.sort_by_quantity, max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, ) diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 558fdc9b..29248d84 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -237,6 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, serial_only=True, + sort_by=site_config.settings.launcher.sort_by_quantity, max_nodes_per_job=1, ) status_updater = BulkStatusUpdater(site_config.client) From 4ef3e75abd3b3760e2c8495e31ea70df087990a7 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 21 Jul 2023 13:30:40 -0500 Subject: [PATCH 42/54] change to sort_by option --- balsam/server/models/crud/sessions.py | 4 ++-- balsam/site/launcher/_mpi_mode.py | 2 +- balsam/site/launcher/_serial_mode_master.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index ddedce53..80d27644 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -182,13 +182,13 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - if spec.sort_by == "wall_time": + if spec.sort_by == "long_large_first": lock_ids_q = ( job_q.with_only_columns([models.Job.id]) .order_by( models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), models.Job.node_packing_count.desc(), - models.Job.num_nodes.asc(), ) .limit(spec.max_num_jobs) .with_for_update(of=models.Job.__table__, skip_locked=True) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 6948fa59..1792fead 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -275,7 +275,7 @@ def main( client=site_config.client, site_id=site_config.site_id, filter_tags=filter_tags_dict, - sort_by=site_config.settings.launcher.sort_by_quantity, + sort_by=site_config.settings.launcher.sort_by, max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, ) diff --git a/balsam/site/launcher/_serial_mode_master.py b/balsam/site/launcher/_serial_mode_master.py index 29248d84..ccd7ccd1 100644 --- a/balsam/site/launcher/_serial_mode_master.py +++ b/balsam/site/launcher/_serial_mode_master.py @@ -237,7 +237,7 @@ def master_main(wall_time_min: int, master_port: int, log_filename: str, num_wor max_wall_time_min=wall_time_min, scheduler_id=scheduler_id, serial_only=True, - sort_by=site_config.settings.launcher.sort_by_quantity, + sort_by=site_config.settings.launcher.sort_by, max_nodes_per_job=1, ) status_updater = BulkStatusUpdater(site_config.client) From babca798bf8849658f2c359db83c879a748c9e54 Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Mon, 24 Jul 2023 11:19:01 -0500 Subject: [PATCH 43/54] Update config.py --- balsam/config/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/balsam/config/config.py b/balsam/config/config.py index 00d95c69..e4c8f066 100644 --- a/balsam/config/config.py +++ b/balsam/config/config.py @@ -193,6 +193,7 @@ class LauncherSettings(BaseSettings): local_app_launcher: Type[AppRun] = Field("balsam.platform.app_run.LocalAppRun") mpirun_allows_node_packing: bool = False serial_mode_prefetch_per_rank: int = 64 + sort_by: Optional[str] = None serial_mode_startup_params: Dict[str, str] = {"cpu_affinity": "none"} @validator("compute_node", pre=True, always=True) From 08d0986906f18964139fb8304e19f7f2cf0bb5f6 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 25 Jul 2023 18:48:51 +0000 Subject: [PATCH 44/54] updates --- balsam/_api/bases.py | 1 + balsam/server/models/crud/sessions.py | 36 +++++++++++++++++++-------- balsam/site/job_source.py | 2 ++ balsam/site/launcher/_mpi_mode.py | 3 +++ 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/balsam/_api/bases.py b/balsam/_api/bases.py index ccda194e..0ee4840e 100644 --- a/balsam/_api/bases.py +++ b/balsam/_api/bases.py @@ -386,6 +386,7 @@ def acquire_jobs( max_nodes_per_job=max_nodes_per_job, max_aggregate_nodes=max_aggregate_nodes, serial_only=serial_only, + sort_by=sort_by, filter_tags=filter_tags, states=states, app_ids=app_ids, diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 80d27644..d2226d9b 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -130,20 +130,34 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li return acquired_jobs -def _footprint_func() -> Any: +def _footprint_func(spec: schemas.SessionAcquire) -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), + if spec.sort_by == "long_large_first": + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), + ) ) + .label("aggregate_footprint") + ) + else: + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") ) - .label("aggregate_footprint") - ) def acquire( diff --git a/balsam/site/job_source.py b/balsam/site/job_source.py index 9fa878da..537969a3 100644 --- a/balsam/site/job_source.py +++ b/balsam/site/job_source.py @@ -91,6 +91,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.max_nodes_per_job = max_nodes_per_job self.max_aggregate_nodes = max_aggregate_nodes @@ -195,6 +196,7 @@ def __init__( self.app_ids = set() if app_ids is None else app_ids self.states = states self.serial_only = serial_only + self.sort_by = sort_by self.max_wall_time_min = max_wall_time_min self.start_time = time.time() diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 1792fead..7b430d40 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -127,6 +127,7 @@ def acquire_jobs(self) -> List["Job"]: def launch_runs(self) -> None: acquired = self.acquire_jobs() acquired.extend(self.job_stash) + logger.info(f"acquired jobs: {acquired}") self.job_stash = [] for job in acquired: assert job.id is not None @@ -271,6 +272,8 @@ def main( ) scheduler_id = node_cls.get_scheduler_id() + + job_source = SynchronousJobSource( client=site_config.client, site_id=site_config.site_id, From 172f900b6309161f022bf6cefce9990fd69dbe4e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 25 Jul 2023 17:08:56 -0500 Subject: [PATCH 45/54] made arg function --- balsam/server/models/crud/sessions.py | 72 ++++++++++----------------- 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index d2226d9b..e32d2bb0 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -130,34 +130,28 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li return acquired_jobs -def _footprint_func(spec: schemas.SessionAcquire) -> Any: - footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) +def _order_args(spec: schemas.SessionAcquire) -> Any: if spec.sort_by == "long_large_first": - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - models.Job.id.asc(), - ) - ) - .label("aggregate_footprint") + order_args = ( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), ) else: - return ( - func.sum(footprint) - .over( - order_by=( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), - ) - ) - .label("aggregate_footprint") + order_args = ( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), ) + return order_args + + +def _footprint_func(spec: schemas.SessionAcquire) -> Any: + footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) + order_args = _order_args(spec) + return func.sum(footprint).over(order_by=(*order_args)).label("aggregate_footprint") def acquire( @@ -196,28 +190,14 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - if spec.sort_by == "long_large_first": - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) - else: - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - ) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) + order_args = _order_args(spec) + + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by(*order_args) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() From 54ccb1b9440614c399176d38ceeb8fad1b42c026 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Mon, 31 Jul 2023 13:14:30 -0500 Subject: [PATCH 46/54] undo changes --- balsam/server/models/crud/sessions.py | 67 ++++++++++++++------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index e32d2bb0..5834b55f 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -129,30 +129,20 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li logger.debug(f"Acquired {len(acquired_jobs)} jobs") return acquired_jobs - -def _order_args(spec: schemas.SessionAcquire) -> Any: - if spec.sort_by == "long_large_first": - order_args = ( - models.Job.wall_time_min.desc(), - models.Job.num_nodes.desc(), - models.Job.node_packing_count.desc(), - models.Job.id.asc(), - ) - else: - order_args = ( - models.Job.num_nodes.asc(), - models.Job.node_packing_count.desc(), - models.Job.wall_time_min.desc(), - models.Job.id.asc(), - ) - return order_args - - -def _footprint_func(spec: schemas.SessionAcquire) -> Any: +def _footprint_func() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) - order_args = _order_args(spec) - return func.sum(footprint).over(order_by=(*order_args)).label("aggregate_footprint") - + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") + ) def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire @@ -190,14 +180,29 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - order_args = _order_args(spec) - - lock_ids_q = ( - job_q.with_only_columns([models.Job.id]) - .order_by(*order_args) - .limit(spec.max_num_jobs) - .with_for_update(of=models.Job.__table__, skip_locked=True) - ) + + if spec.sort_by == "long_large_first": + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) + else: + lock_ids_q = ( + job_q.with_only_columns([models.Job.id]) + .order_by( + models.Job.num_nodes.asc(), + models.Job.node_packing_count.desc(), + models.Job.wall_time_min.desc(), + ) + .limit(spec.max_num_jobs) + .with_for_update(of=models.Job.__table__, skip_locked=True) + ) locked_ids = db.execute(lock_ids_q).scalars().all() From 14d39b3d7b3636f808f4cd854bfc94b89b116f78 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:13:26 -0500 Subject: [PATCH 47/54] added new window function for node footprint calc --- balsam/server/models/crud/sessions.py | 30 ++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 5834b55f..3aec679c 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -114,6 +114,7 @@ def create(db: Session, owner: schemas.UserOut, session: schemas.SessionCreate) def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> List[Dict[str, Any]]: acquired_jobs = [{str(key): value for key, value in job.items()} for job in db.execute(job_q).mappings()] acquired_ids = [job["id"] for job in acquired_jobs] + # logger.info(f"*** in _acquire_jobs acquired_ids={acquired_ids}") stmt = update(models.Job.__table__).where(models.Job.id.in_(acquired_ids)).values(session_id=session.id) @@ -129,7 +130,8 @@ def _acquire_jobs(db: orm.Session, job_q: Select, session: models.Session) -> Li logger.debug(f"Acquired {len(acquired_jobs)} jobs") return acquired_jobs -def _footprint_func() -> Any: + +def _footprint_func_nodes() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) return ( func.sum(footprint) @@ -144,6 +146,22 @@ def _footprint_func() -> Any: .label("aggregate_footprint") ) +def _footprint_func_walltime() -> Any: + footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) + return ( + func.sum(footprint) + .over( + order_by=( + models.Job.wall_time_min.desc(), + models.Job.num_nodes.desc(), + models.Job.node_packing_count.desc(), + models.Job.id.asc(), + ) + ) + .label("aggregate_footprint") + ) + + def acquire( db: Session, owner: schemas.UserOut, session_id: int, spec: schemas.SessionAcquire ) -> List[Dict[str, Any]]: @@ -180,7 +198,7 @@ def acquire( return _acquire_jobs(db, job_q, session) # MPI Mode Launcher will take this path: - + # logger.info(f"*** In session.acquire: spec.sort_by = {spec.sort_by}") if spec.sort_by == "long_large_first": lock_ids_q = ( job_q.with_only_columns([models.Job.id]) @@ -205,10 +223,16 @@ def acquire( ) locked_ids = db.execute(lock_ids_q).scalars().all() + # logger.info(f"*** locked_ids: {locked_ids}") + if spec.sort_by == "long_large_first": + subq = select(models.Job.__table__, _footprint_func_walltime()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + else: + subq = select(models.Job.__table__, _footprint_func_nodes()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore - subq = select(models.Job.__table__, _footprint_func()).where(models.Job.id.in_(locked_ids)).subquery() # type: ignore + # logger.info(f"*** max_aggregate_nodes: {spec.max_aggregate_nodes}") cols = [c for c in subq.c if c.name not in ["aggregate_footprint", "session_id"]] job_q = select(cols).where(subq.c.aggregate_footprint <= spec.max_aggregate_nodes) + return _acquire_jobs(db, job_q, session) From 7ab9cf870d0b3cec3e9332e5245b18e572fc9562 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:37:41 -0500 Subject: [PATCH 48/54] lint fixes --- balsam/site/launcher/_mpi_mode.py | 1 - tests/server/test_sites.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 7b430d40..3e484dfd 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -273,7 +273,6 @@ def main( scheduler_id = node_cls.get_scheduler_id() - job_source = SynchronousJobSource( client=site_config.client, site_id=site_config.site_id, diff --git a/tests/server/test_sites.py b/tests/server/test_sites.py index 5541bc45..f97d217b 100644 --- a/tests/server/test_sites.py +++ b/tests/server/test_sites.py @@ -12,7 +12,7 @@ def test_create_site(auth_client): name="thetalogin3.alcf.anl.gov", path="/projects/myProject/balsam-site", ) - assert type(posted_site["id"]) == int + assert type(posted_site["id"]) is int site_list = auth_client.get("/sites/")["results"] assert isinstance(site_list, list) assert len(site_list) == 1 From 0e224987ac59fa688464aefe29b380f3f77d0c17 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:40:52 -0500 Subject: [PATCH 49/54] lint fixes --- tests/server/test_sites.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_sites.py b/tests/server/test_sites.py index f97d217b..e55b3ace 100644 --- a/tests/server/test_sites.py +++ b/tests/server/test_sites.py @@ -12,7 +12,7 @@ def test_create_site(auth_client): name="thetalogin3.alcf.anl.gov", path="/projects/myProject/balsam-site", ) - assert type(posted_site["id"]) is int + assert isinstance(posted_site["id"], int) site_list = auth_client.get("/sites/")["results"] assert isinstance(site_list, list) assert len(site_list) == 1 From 063a48b284465621d83f0b9ed35c2746a742e672 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 15:45:42 -0500 Subject: [PATCH 50/54] lint fixes --- tests/server/test_auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/server/test_auth.py b/tests/server/test_auth.py index d600c60e..a9aedad2 100644 --- a/tests/server/test_auth.py +++ b/tests/server/test_auth.py @@ -12,7 +12,7 @@ def test_unauth_user_cannot_view_sites(anon_client): def test_register(anon_client): login_credentials = {"username": f"user{uuid4()}", "password": "foo"} resp = anon_client.post("/" + urls.PASSWORD_REGISTER, **login_credentials) - assert type(resp["id"]) == int + assert isinstance(resp["id"], int) assert resp["username"] == login_credentials["username"] From ff9dd8cda83ee1275eb0fdd6560526c555ab335d Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Tue, 1 Aug 2023 16:21:15 -0500 Subject: [PATCH 51/54] lint fixes --- balsam/server/models/crud/sessions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/balsam/server/models/crud/sessions.py b/balsam/server/models/crud/sessions.py index 3aec679c..8862e6f3 100644 --- a/balsam/server/models/crud/sessions.py +++ b/balsam/server/models/crud/sessions.py @@ -146,6 +146,7 @@ def _footprint_func_nodes() -> Any: .label("aggregate_footprint") ) + def _footprint_func_walltime() -> Any: footprint = cast(models.Job.num_nodes, Float) / cast(models.Job.node_packing_count, Float) return ( From 6a10eb72b8d421cce25a6166b90abf029d49596e Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 4 Aug 2023 16:52:42 -0500 Subject: [PATCH 52/54] polaris app_run cleanup --- balsam/platform/app_run/polaris.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index d18efb12..3bc86a65 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -22,26 +22,24 @@ def _build_cmdline(self) -> str: # 8 cpus per gpu. This code will not skip the appropriate number of cpus # in the rank binding assignments. if "cpu_bind" in self._launch_params.keys(): - cpu_bind = self._launch_params.get("cpu_bind", "none") + cpu_bind = self._launch_params.get("cpu_bind") + elif "--cpu-bind" in self._launch_params.keys(): + cpu_bind = self._launch_params.get("--cpu-bind") else: # Here we grab the cpu_ids assigned to the job in the NodeSpec object # If this is not set in NodeSpec (it is only set for single node jobs), # then we take the cpu_id list from the Polaris ComputeNode subclass, # assuming the job will have use of all the cpus in nodes assigned to it. - cpu_ids_ns = self._node_spec.cpu_ids[0] - if cpu_ids_ns: - cpu_ids = self._node_spec.cpu_ids[0] - if self._threads_per_core == 2: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) - else: - polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + cpu_ids = self._node_spec.cpu_ids[0] + polaris_node = PolarisNode(self._node_spec.node_ids[0], self._node_spec.hostnames[0]) + if not cpu_ids: cpu_ids = polaris_node.cpu_ids cpus_per_rank = self.get_cpus_per_rank() # PolarisNode reverses the order of the gpu_ids, so assigning the cpu-bind # in ascending cpu order is what we want here. - cpu_bind_list = ["verbose,list"] + cpu_bind_list = ["list"] for irank in range(self._ranks_per_node): cpu_bind_list.append(":") for i in range(cpus_per_rank): @@ -57,17 +55,10 @@ def _build_cmdline(self) -> str: cid = str(cpu_ids[i + cpus_per_rank * irank] + len(polaris_node.cpu_ids)) cpu_bind_list.append(cid) cpu_bind = "".join(cpu_bind_list) - # if "CUDA_VISIBLE_DEVICES" in self._envs.keys(): - # gpu_device = self._envs["CUDA_VISIBLE_DEVICES"] - # gpu_ids = gpu_device.split(",") - # else: - # gpu_ids = [] - # logger.info(f"Polaris app_run: cpu_bind={cpu_bind} cpu_ids={cpu_ids} gpu_ids={gpu_ids}") launch_params = [] for k in self._launch_params.keys(): - if k != "cpu_bind": - launch_params.append("--" + k) + if k != "cpu_bind" and k != "--cpu-bind": launch_params.append(str(self._launch_params[k])) # The value of -d depends on the setting of cpu_bind. If cpu-bind=core, -d is the number of @@ -95,7 +86,6 @@ def _build_cmdline(self) -> str: ] return " ".join(str(arg) for arg in args) - # Overide default because sunspot does not use CUDA def _set_envs(self) -> None: envs = os.environ.copy() envs.update(self._envs) From 020ae447d1ade936cc71f4e4e243c80fab08ae09 Mon Sep 17 00:00:00 2001 From: Christine Simpson Date: Fri, 4 Aug 2023 16:58:11 -0500 Subject: [PATCH 53/54] lint fix --- balsam/platform/app_run/polaris.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balsam/platform/app_run/polaris.py b/balsam/platform/app_run/polaris.py index 3bc86a65..20f6ea22 100644 --- a/balsam/platform/app_run/polaris.py +++ b/balsam/platform/app_run/polaris.py @@ -65,7 +65,7 @@ def _build_cmdline(self) -> str: # physical cores per rank, otherwise it is the number of hardware threads per rank # https://docs.alcf.anl.gov/running-jobs/example-job-scripts/ depth = self._threads_per_rank - if "core" in cpu_bind: + if "core" == cpu_bind: depth = self.get_cpus_per_rank() nid_str = ",".join(map(str, node_ids)) From 273c95ed407177193f92252ae49680c86c140744 Mon Sep 17 00:00:00 2001 From: Christine Simpson <48525133+cms21@users.noreply.github.com> Date: Tue, 8 Aug 2023 10:14:07 -0500 Subject: [PATCH 54/54] Removing unnecessary logging from _mpi_mode.py --- balsam/site/launcher/_mpi_mode.py | 1 - 1 file changed, 1 deletion(-) diff --git a/balsam/site/launcher/_mpi_mode.py b/balsam/site/launcher/_mpi_mode.py index 3e484dfd..05f6e957 100644 --- a/balsam/site/launcher/_mpi_mode.py +++ b/balsam/site/launcher/_mpi_mode.py @@ -127,7 +127,6 @@ def acquire_jobs(self) -> List["Job"]: def launch_runs(self) -> None: acquired = self.acquire_jobs() acquired.extend(self.job_stash) - logger.info(f"acquired jobs: {acquired}") self.job_stash = [] for job in acquired: assert job.id is not None