diff --git a/sisyphus/pbs_engine.py b/sisyphus/pbs_engine.py index 88b3b7d..fd3ead9 100644 --- a/sisyphus/pbs_engine.py +++ b/sisyphus/pbs_engine.py @@ -1,5 +1,3 @@ -# Author: Jan-Thorsten Peter - from typing import Any import os import subprocess @@ -143,44 +141,10 @@ def options(self, rqmt): mem = rqmt["mem"] l_out.append("mem=%s" % mem) - - # if "rss" in rqmt: - # try: - # rss = "%igb" % math.ceil(float(rqmt["rss"])) - # except ValueError: - # rss = rqmt["rss"] - # # rss = try_to_multiply(s['rss'], 1024*1024*1024) # convert to Gigabyte if possible - # out.append("mem=%s" % rss) - # else: - # out.append("mem=%s" % mem) - - #try: - # file_size = "%iG" % math.ceil(float(rqmt["file_size"])) - #except (ValueError, KeyError): - # # If a different default value is wanted it can be overwritten by adding - # # 'file_size' to the default_rqmt of this engine. - # file_size = rqmt.get("file_size", "50G") - - # out.append("-l") - # out.append("h_fsize=%s" % file_size) - l_out.append("ngpus=%s" % rqmt.get("gpu", 0)) - l_out.append("ncpus=%s" % rqmt.get("cpu", 1)) - out.append("-lselect=1:%s" % ":".join(l_out)) - # Try to convert time to float, calculate minutes from it - # and convert it back to an rounded string - # If it fails use string directly - #task_time = try_to_multiply(rqmt["time"], 60 * 60) # convert to seconds if possible - - #out.append("-l") - #out.append("h_rt=%s" % task_time) - - #if rqmt.get("multi_node_slots", None): - # out.extend(["-pe", self.pe_name, str(rqmt["multi_node_slots"])]) - qsub_args = rqmt.get("qsub_args", []) if isinstance(qsub_args, str): qsub_args = qsub_args.split() @@ -300,7 +264,7 @@ def queue_state(self): continue state = job["job_state"] name = job["Job_Name"] - task = 1 # TODO + task = 1 # TODO task_infos[(name, task)].append((job_id, state)) except Exception: logging.warning("Failed to parse squeue output: %s" % str(job)) @@ -308,7 +272,7 @@ def queue_state(self): self._task_info_cache = task_infos self._task_info_cache_last_update = time.time() return task_infos - + def output_path(self): """Return s list with all currently running tasks in this queue""" @@ -329,7 +293,7 @@ def output_path(self): import json job_dict = json.loads(b"\n".join(out)) - job_id = os.getenv("PBS_JOBID") + job_id = os.getenv("PBS_JOBID") job = job_dict["Jobs"][job_id] return job["Output_Path"].split(":")[-1] @@ -345,8 +309,6 @@ def task_state(self, task, task_id): name = escape_name(name) task_name = (name, task_id) queue_state = self.queue_state() - #print("search for %s" % str(task_name)) - #print("queue state: %s" % str(queue_state)) qs = queue_state[task_name] # task name should be uniq @@ -400,7 +362,7 @@ def init_worker(self, task): os.unlink(logpath) engine_logpath = self.output_path() - + try: if os.path.isfile(engine_logpath): os.link(engine_logpath, logpath)