From 42c9ba4d109a2a59e07b17a0528233b5ae1d8f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A9renger=20Berthoul?= Date: Wed, 4 Sep 2024 13:26:37 +0200 Subject: [PATCH] Shell scheduler: seems to be working --- pytest_parallel/mpi_reporter.py | 32 +------- pytest_parallel/process_scheduler.py | 4 +- pytest_parallel/shell_static_scheduler.py | 92 +++++++++-------------- pytest_parallel/static_scheduler_utils.py | 24 ++++++ 4 files changed, 65 insertions(+), 87 deletions(-) create mode 100644 pytest_parallel/static_scheduler_utils.py diff --git a/pytest_parallel/mpi_reporter.py b/pytest_parallel/mpi_reporter.py index 3c0b3f8..0ead5f4 100644 --- a/pytest_parallel/mpi_reporter.py +++ b/pytest_parallel/mpi_reporter.py @@ -6,6 +6,7 @@ from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index from .utils_mpi import number_of_working_processes, is_dyn_master_process from .gather_report import gather_report_on_local_rank_0 +from .static_scheduler_utils import group_items_by_parallel_steps def mark_skip(item): @@ -134,31 +135,6 @@ def pytest_runtest_logreport(self, report): return True # ranks that don't participate in the tests don't have to report anything -def group_items_by_parallel_steps(items, n_workers): - add_n_procs(items) - items.sort(key=lambda item: item.n_proc, reverse=True) - - remaining_n_procs_by_step = [] - items_by_step = [] - items_to_skip = [] - for item in items: - if item.n_proc > n_workers: - items_to_skip += [item] - else: - found_step = False - for idx, remaining_procs in enumerate(remaining_n_procs_by_step): - if item.n_proc <= remaining_procs: - items_by_step[idx] += [item] - remaining_n_procs_by_step[idx] -= item.n_proc - found_step = True - break - if not found_step: - items_by_step += [[item]] - remaining_n_procs_by_step += [n_workers - item.n_proc] - - return items_by_step, items_to_skip - - def prepare_items_to_run(items, comm): i_rank = comm.Get_rank() @@ -237,9 +213,9 @@ def pytest_runtestloop(self, session) -> bool: n_workers = self.global_comm.Get_size() - items_by_steps, items_to_skip = group_items_by_parallel_steps( - session.items, n_workers - ) + add_n_procs(session.items) + + items_by_steps, items_to_skip = group_items_by_parallel_steps(session.items, n_workers) items = items_to_run_on_this_proc( items_by_steps, items_to_skip, self.global_comm diff --git a/pytest_parallel/process_scheduler.py b/pytest_parallel/process_scheduler.py index ebc82df..f18ff2c 100644 --- a/pytest_parallel/process_scheduler.py +++ b/pytest_parallel/process_scheduler.py @@ -104,10 +104,10 @@ def submit_items(items_to_run, socket, main_invoke_params, ntasks, slurm_conf): cmds += cmd cmds += 'wait\n' - .pytest_parallel = f'{slurm_header}\n\n{cmds}' + job_cmds = f'{slurm_header}\n\n{cmds}' Path('.pytest_parallel').mkdir(exist_ok=True) with open('.pytest_parallel/job.sh','w') as f: - f.write(.pytest_parallel) + f.write(job_cmds) # submit SLURM job with open('.pytest_parallel/env_vars.sh','wb') as f: diff --git a/pytest_parallel/shell_static_scheduler.py b/pytest_parallel/shell_static_scheduler.py index ff1a16e..b6a2de5 100644 --- a/pytest_parallel/shell_static_scheduler.py +++ b/pytest_parallel/shell_static_scheduler.py @@ -8,7 +8,8 @@ from . import socket_utils from .utils import get_n_proc_for_test, add_n_procs, run_item_test, mark_original_index from .algo import partition - +from .static_scheduler_utils import group_items_by_parallel_steps +from mpi4py import MPI def mark_skip(item, ntasks): n_proc_test = get_n_proc_for_test(item) @@ -61,8 +62,7 @@ def _get_my_ip_address(): return my_ip - -def submit_items(items_to_run, socket, main_invoke_params, ntasks): +def setup_socket(socket): # Find IP our address SCHEDULER_IP_ADDRESS = _get_my_ip_address() @@ -70,64 +70,49 @@ def submit_items(items_to_run, socket, main_invoke_params, ntasks): socket.bind((SCHEDULER_IP_ADDRESS, 0)) # 0: let the OS choose an available port socket.listen() port = socket.getsockname()[1] - - ## generate SLURM header options - #if slurm_conf['file'] is not None: - # with open(slurm_conf['file']) as f: - # slurm_header = f.read() - # # Note: - # # ntasks is supposed to be <= to the number of the ntasks submitted to slurm - # # but since the header file can be arbitrary, we have no way to check at this point - #else: - # slurm_header = '#!/bin/bash\n' - # slurm_header += '\n' - # slurm_header += '#SBATCH --job-name=pytest_parallel\n' - # slurm_header += '#SBATCH --output=.pytest_parallel/slurm.%j.out\n' - # slurm_header += '#SBATCH --error=.pytest_parallel/slurm.%j.err\n' - # for opt in slurm_conf['options']: - # slurm_header += f'#SBATCH {opt}\n' - # slurm_header += f'#SBATCH --ntasks={ntasks}' - + return SCHEDULER_IP_ADDRESS, port + +def mpi_command(current_proc, n_proc): + mpi_vendor = MPI.get_vendor()[0] + if mpi_vendor == 'Intel MPI': + cmd = f'I_MPI_PIN_PROCESSOR_LIST={current_proc}-{current_proc+n_proc-1}; ' + cmd += f'mpiexec -np {n_proc}' + return cmd + elif mpi_vendor == 'Open MPI': + cores = ','.join([str(i) for i in range(current_proc,current_proc+n_proc)]) + return f'mpiexec --cpu-list {cores} -np {n_proc}' + else: + assert 0, f'Unknown MPI implementation "{mpi_vendor}"' + +def submit_items(items_to_run, SCHEDULER_IP_ADDRESS, port, main_invoke_params, ntasks, i_step, n_step): # sort item by comm size to launch bigger first (Note: in case SLURM prioritize first-received items) items = sorted(items_to_run, key=lambda item: item.n_proc, reverse=True) - # launch srun for each item + # launch `mpiexec` for each item worker_flags=f"--_worker --_scheduler_ip_address={SCHEDULER_IP_ADDRESS} --_scheduler_port={port}" cmds = [] + current_proc = 0 for item in items: test_idx = item.original_index test_out_file_base = f'.pytest_parallel/{remove_exotic_chars(item.nodeid)}' - cmd = f'mpiexec -np {item.n_proc}' + cmd = mpi_command(current_proc, item.n_proc) cmd += f' python3 -u -m pytest -s {worker_flags} {main_invoke_params} --_test_idx={test_idx} {item.config.rootpath}/{item.nodeid}' cmd += f' > {test_out_file_base}' cmds.append(cmd) + current_proc += item.n_proc script = " & \\\n".join(cmds) Path('.pytest_parallel').mkdir(exist_ok=True) - script_path = '.pytest_parallel/pytest_static_sched.sh' + script_path = f'.pytest_parallel/pytest_static_sched_{i_step}.sh' with open(script_path,'w') as f: f.write(script) - ## submit SLURM job - #with open('.pytest_parallel/env_vars.sh','wb') as f: - # f.write(pytest._pytest_parallel_env_vars) - current_permissions = stat.S_IMODE(os.lstat(script_path).st_mode) os.chmod(script_path, current_permissions | stat.S_IXUSR) p = subprocess.Popen([script_path], shell=True, stdout=subprocess.PIPE) - print('\nLaunching tests...') - #returncode = p.wait() - #assert returncode==0, f'Error launching tests with `{script_path}`' - - #if slurm_conf['sub_command'] is None: - # slurm_job_id = int(p.stdout.read()) - #else: - # slurm_job_id = parse_job_id_from_submission_output(p.stdout.read()) - - #print(f'SLURM job {slurm_job_id} has been submitted') - #return slurm_job_id - return 0 + print(f'\nLaunching tests (step {i_step}/{n_step})...') + return p def receive_items(items, session, socket, n_item_to_recv): while n_item_to_recv>0: @@ -183,12 +168,7 @@ def pytest_runtestloop(self, session) -> bool: ## add proc to items add_n_procs(session.items) - # isolate skips - print(f"{self.ntasks=}") - for i in session.items: - print(f"{i.n_proc=}") - has_enough_procs = lambda item: item.n_proc <= self.ntasks - items_to_run, items_to_skip = partition(session.items, has_enough_procs) + items_by_steps, items_to_skip = group_items_by_parallel_steps(session.items, self.ntasks) # run skipped for i, item in enumerate(items_to_skip): @@ -198,20 +178,18 @@ def pytest_runtestloop(self, session) -> bool: run_item_test(item, nextitem, session) # schedule tests to run - n_item_to_receive = len(items_to_run) - if n_item_to_receive > 0: - self.slurm_job_id = submit_items(items_to_run, self.socket, self.main_invoke_params, self.ntasks) - if not self.detach: # The job steps are supposed to send their reports - receive_items(session.items, session, self.socket, n_item_to_receive) + SCHEDULER_IP_ADDRESS,port = setup_socket(self.socket) + n_step = len(items_by_steps) + for i_step,items in enumerate(items_by_steps): + n_item_to_receive = len(items) + sub_process = submit_items(items, SCHEDULER_IP_ADDRESS, port, self.main_invoke_params, self.ntasks, i_step, n_step) + if not self.detach: # The job steps are supposed to send their reports + receive_items(session.items, session, self.socket, n_item_to_receive) + returncode = sub_process.wait() # at this point, the sub-process should be done since items have been received + assert returncode==0, f'Error during step {i_step}` of shell scheduler' return True - #@pytest.hookimpl() - #def pytest_keyboard_interrupt(excinfo): - # if excinfo.slurm_job_id is not None: - # print(f'Calling `scancel {excinfo.slurm_job_id}`') - # subprocess.run(['scancel',str(excinfo.slurm_job_id)]) - @pytest.hookimpl(hookwrapper=True) def pytest_runtest_makereport(self, item): """ diff --git a/pytest_parallel/static_scheduler_utils.py b/pytest_parallel/static_scheduler_utils.py new file mode 100644 index 0000000..469a230 --- /dev/null +++ b/pytest_parallel/static_scheduler_utils.py @@ -0,0 +1,24 @@ +def group_items_by_parallel_steps(items, n_workers): + items.sort(key=lambda item: item.n_proc, reverse=True) + + remaining_n_procs_by_step = [] + items_by_step = [] + items_to_skip = [] + for item in items: + if item.n_proc > n_workers: + items_to_skip += [item] + else: + found_step = False + for idx, remaining_procs in enumerate(remaining_n_procs_by_step): + if item.n_proc <= remaining_procs: + items_by_step[idx] += [item] + remaining_n_procs_by_step[idx] -= item.n_proc + found_step = True + break + if not found_step: + items_by_step += [[item]] + remaining_n_procs_by_step += [n_workers - item.n_proc] + + return items_by_step, items_to_skip + +