diff --git a/sisyphus/load_sharing_facility_engine.py b/sisyphus/load_sharing_facility_engine.py index a7d8032..2b63318 100644 --- a/sisyphus/load_sharing_facility_engine.py +++ b/sisyphus/load_sharing_facility_engine.py @@ -1,5 +1,6 @@ # Author: Wilfried Michel +from typing import Any import os import subprocess @@ -42,6 +43,11 @@ def __init__(self, default_rqmt, gateway=None, auto_clean_eqw=True): self.default_rqmt = default_rqmt self.auto_clean_eqw = auto_clean_eqw + def _system_call_timeout_warn_msg(self, command: Any) -> str: + if self.gateway: + return f"SSH command timeout: {command!s}" + return f"Command timeout: {command!s}" + def system_call(self, command, send_to_stdin=None): if self.gateway: system_command = ["ssh", "-x", self.gateway] + [" ".join(["cd", os.getcwd(), "&&"] + command)] @@ -177,14 +183,16 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, rangestring): + "\n" ) - try: - logging.info("bsub_call: %s" % bsub_call) - logging.info("command: %s" % command) - out, err, retval = self.system_call(bsub_call, command) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.submit_helper(call, logpath, rqmt, name, task_name, rangestring) + while True: + try: + logging.info("bsub_call: %s" % bsub_call) + logging.info("command: %s" % command) + out, err, retval = self.system_call(bsub_call, command) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break ref_output = ["Job", "is", "submitted", "to", "queue"] ref_output = [i.encode() for i in ref_output] @@ -238,12 +246,14 @@ def queue_state(self): # get bjobs output system_command = ["bjobs", "-w"] - try: - out, err, retval = self.system_call(system_command) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(system_command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.queue_state() + while True: + try: + out, err, retval = self.system_call(system_command) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(system_command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break task_infos = defaultdict(list) for line in out[1:]: diff --git a/sisyphus/simple_linux_utility_for_resource_management_engine.py b/sisyphus/simple_linux_utility_for_resource_management_engine.py index cf48ea5..4cdfab9 100644 --- a/sisyphus/simple_linux_utility_for_resource_management_engine.py +++ b/sisyphus/simple_linux_utility_for_resource_management_engine.py @@ -1,5 +1,6 @@ # Author: Wilfried Michel +from typing import Any from collections import defaultdict, namedtuple from enum import Enum import getpass # used to get username @@ -70,6 +71,11 @@ def __init__( self.memory_allocation_type = memory_allocation_type self.job_name_mapping = job_name_mapping + def _system_call_timeout_warn_msg(self, command: Any) -> str: + if self.gateway: + return f"SSH command timeout: {command!s}" + return f"Command timeout: {command!s}" + def system_call(self, command, send_to_stdin=None): """ :param list[str] command: qsub command @@ -223,12 +229,14 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, sbatch_call += ["-a", "%i-%i:%i" % (start_id, end_id, step_size)] command = '"' + " ".join(call) + '"' sbatch_call += ["--wrap=%s" % " ".join(call)] - try: - out, err, retval = self.system_call(sbatch_call) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size) + while True: + try: + out, err, retval = self.system_call(sbatch_call) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break ref_output = ["Submitted", "batch", "job"] ref_output = [i.encode() for i in ref_output] @@ -285,12 +293,14 @@ def queue_state(self): "-O", "arrayjobid,arraytaskid,state,name:1000", ] - try: - out, err, retval = self.system_call(system_command) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(system_command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.queue_state() + while True: + try: + out, err, retval = self.system_call(system_command) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(system_command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break task_infos = defaultdict(list) for line in out: diff --git a/sisyphus/son_of_grid_engine.py b/sisyphus/son_of_grid_engine.py index 0143b3e..acda4f0 100644 --- a/sisyphus/son_of_grid_engine.py +++ b/sisyphus/son_of_grid_engine.py @@ -1,5 +1,6 @@ # Author: Jan-Thorsten Peter +from typing import Any import os import subprocess @@ -73,6 +74,11 @@ def __init__(self, default_rqmt, gateway=None, auto_clean_eqw=True, ignore_jobs= self.ignore_jobs = ignore_jobs self.pe_name = pe_name + def _system_call_timeout_warn_msg(self, command: Any) -> str: + if self.gateway: + return f"SSH command timeout: {command!s}" + return f"Command timeout: {command!s}" + def system_call(self, command, send_to_stdin=None): """ :param list[str] command: qsub command @@ -240,12 +246,14 @@ def submit_helper(self, call, logpath, rqmt, name, task_name, start_id, end_id, qsub_call += ["-t", "%i-%i:%i" % (start_id, end_id, step_size)] command = " ".join(call) + "\n" - try: - out, err, retval = self.system_call(qsub_call, command) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.submit_helper(call, logpath, rqmt, name, task_name, start_id, end_id, step_size) + while True: + try: + out, err, retval = self.system_call(qsub_call, command) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break ref_output = ["Your", "job-array", '("%s")' % name, "has", "been", "submitted"] ref_output = [i.encode() for i in ref_output] @@ -305,12 +313,14 @@ def queue_state(self): # get qstat output system_command = ["qstat", "-xml", "-u", getpass.getuser()] - try: - out, err, retval = self.system_call(system_command) - except subprocess.TimeoutExpired: - logging.warning("SSH command timeout %s" % str(system_command)) - time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) - return self.queue_state() + while True: + try: + out, err, retval = self.system_call(system_command) + except subprocess.TimeoutExpired: + logging.warning(self._system_call_timeout_warn_msg(system_command)) + time.sleep(gs.WAIT_PERIOD_SSH_TIMEOUT) + continue + break xml_data = "".join(i.decode("utf8") for i in out)