diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index a84fd9fb..9f1b7fd5 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -47,7 +47,7 @@ def __init__(self, pilotParams): try: from Pilot.pilotTools import ( CommandBase, - getFlavour, + getSubmitterInfo, retrieveUrlTimeout, safe_listdir, sendMessage, @@ -56,7 +56,7 @@ def __init__(self, pilotParams): except ImportError: from pilotTools import ( CommandBase, - getFlavour, + getSubmitterInfo, retrieveUrlTimeout, safe_listdir, sendMessage, @@ -559,8 +559,7 @@ def execute(self): VOs may want to replace/extend the _getBasicsCFG and _getSecurityCFG functions """ - - self.pp.flavour, self.pp.pilotReference = getFlavour(self.pp.ceName) + self.pp.flavour, self.pp.pilotReference, self.pp.batchSystemInfo = getSubmitterInfo(self.pp.ceName) self._getBasicsCFG() self._getSecurityCFG() @@ -855,6 +854,17 @@ def execute(self): """Setup configuration parameters""" self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour) + # Add batch system details to the configuration + # Can be used by the pilot/job later on, to interact with the batch system + self.cfg.append("-o /LocalSite/BatchSystem/Type=%s" % self.pp.batchSystemInfo.get("Type", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/JobID=%s" % self.pp.batchSystemInfo.get("JobID", "Unknown")) + + batchSystemParams = self.pp.batchSystemInfo.get("Parameters", {}) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Queue=%s" % batchSystemParams.get("Queue", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/BinaryPath=%s" % batchSystemParams.get("BinaryPath", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Host=%s" % batchSystemParams.get("Host", "Unknown")) + self.cfg.append("-o /LocalSite/BatchSystem/Parameters/InfoPath=%s" % batchSystemParams.get("InfoPath", "Unknown")) + self.cfg.append('-n "%s"' % self.pp.site) self.cfg.append('-S "%s"' % self.pp.setup) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 08916781..dce65a86 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,6 +18,7 @@ from datetime import datetime from functools import partial, wraps from threading import RLock +import warnings ############################ # python 2 -> 3 "hacks" @@ -214,53 +215,103 @@ def listdir(directory): return contents -def getFlavour(ceName): +def getSubmitterInfo(ceName): + """Get information about the submitter of the pilot. + + Check the environment variables to determine the type of batch system and CE used + to submit the pilot being used and return this information in a tuple. + """ pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "") + # Batch system taking care of the pilot + # Might be useful to extract the info to interact with it later on + batchSystemType = "Unknown" + batchSystemJobID = "Unknown" + batchSystemParameters = { + "BinaryPath": "Unknown", + "Host": "Unknown", + "InfoPath": "Unknown", + "Queue": "Unknown", + } + # Flavour of the pilot + # Inform whether the pilot was sent through SSH+batch system or a CE flavour = "DIRAC" # # Batch systems - # Take the reference from the Torque batch system + # Torque if "PBS_JOBID" in os.environ: - flavour = "SSHTorque" - pilotReference = "sshtorque://" + ceName + "/" + os.environ["PBS_JOBID"].split(".")[0] + batchSystemType = "PBS" + batchSystemJobID = os.environ["PBS_JOBID"] + batchSystemParameters["BinaryPath"] = os.environ.get("PBS_O_PATH", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("PBS_O_QUEUE", "Unknown") - # Take the reference from the OAR batch system + flavour = "SSH%s" % batchSystemType + pilotReference = "sshpbs://" + ceName + "/" + batchSystemJobID.split(".")[0] + + # OAR if "OAR_JOBID" in os.environ: - flavour = "SSHOAR" - pilotReference = "sshoar://" + ceName + "/" + os.environ["OAR_JOBID"] + batchSystemType = "OAR" + batchSystemJobID = os.environ["OAR_JOBID"] + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshoar://" + ceName + "/" + batchSystemJobID # Grid Engine - if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ: - flavour = "SSHGE" - pilotReference = "sshge://" + ceName + "/" + os.environ["JOB_ID"] - # Generic JOB_ID - elif "JOB_ID" in os.environ: - flavour = "Generic" - pilotReference = "generic://" + ceName + "/" + os.environ["JOB_ID"] + if "SGE_TASK_ID" in os.environ: + batchSystemType = "SGE" + batchSystemJobID = os.environ["JOB_ID"] + batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown") + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshge://" + ceName + "/" + batchSystemJobID # LSF if "LSB_BATCH_JID" in os.environ: - flavour = "SSHLSF" - pilotReference = "sshlsf://" + ceName + "/" + os.environ["LSB_BATCH_JID"] + batchSystemType = "LSF" + batchSystemJobID = os.environ["LSB_BATCH_JID"] + batchSystemParameters["BinaryPath"] = os.environ.get("LSF_BINDIR", "Unknown") + batchSystemParameters["Host"] = os.environ.get("LSB_HOSTS", "Unknown") + batchSystemParameters["InfoPath"] = os.environ.get("LSF_ENVDIR", "Unknown") + batchSystemParameters["Queue"] = os.environ.get("LSB_QUEUE", "Unknown") - # SLURM batch system + flavour = "SSH%s" % batchSystemType + pilotReference = "sshlsf://" + ceName + "/" + batchSystemJobID + + # SLURM if "SLURM_JOBID" in os.environ: - flavour = "SSHSLURM" - pilotReference = "sshslurm://" + ceName + "/" + os.environ["SLURM_JOBID"] + batchSystemType = "SLURM" + batchSystemJobID = os.environ["SLURM_JOBID"] + + flavour = "SSH%s" % batchSystemType + pilotReference = "sshslurm://" + ceName + "/" + batchSystemJobID # Condor if "CONDOR_JOBID" in os.environ: - flavour = "SSHCondor" - pilotReference = "sshcondor://" + ceName + "/" + os.environ["CONDOR_JOBID"] + batchSystemType = "HTCondor" + batchSystemJobID = os.environ["CONDOR_JOBID"] + batchSystemParameters["InfoPath"] = os.environ.get("_CONDOR_JOB_AD", "Unknown") - # # CEs + flavour = "SSH%s" % batchSystemType + pilotReference = "sshcondor://" + ceName + "/" + batchSystemJobID + + # # CEs/Batch Systems # HTCondor if "HTCONDOR_JOBID" in os.environ: + batchSystemType = "HTCondor" + batchSystemJobID = os.environ["HTCONDOR_JOBID"] + flavour = "HTCondorCE" - pilotReference = "htcondorce://" + ceName + "/" + os.environ["HTCONDOR_JOBID"] + pilotReference = "htcondorce://" + ceName + "/" + batchSystemJobID + + # # Local/SSH + + # Local submission to the host + if "LOCAL_JOBID" in os.environ: + flavour = "Local" + pilotReference = "local://" + ceName + "/" + os.environ["LOCAL_JOBID"] # Direct SSH tunnel submission if "SSHCE_JOBID" in os.environ: @@ -274,6 +325,8 @@ def getFlavour(ceName): "sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"] ) + # # CEs + # ARC if "GRID_GLOBAL_JOBURL" in os.environ: flavour = "ARC" @@ -284,9 +337,18 @@ def getFlavour(ceName): flavour = "VMDIRAC" pilotReference = "vm://" + ceName + "/" + os.environ["JOB_ID"] - return flavour, pilotReference + return flavour, pilotReference, {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters} +def getFlavour(ceName): + """Old method to get the flavour of the pilot. Deprecated. + + Please use getSubmitterInfo instead. + """ + warnings.warn("getFlavour() is deprecated. Please use getSubmitterInfo() instead.", category=DeprecationWarning, stacklevel=2) + flavour, pilotReference, _ = getSubmitterInfo(ceName) + return flavour, pilotReference + class ObjectLoader(object): """Simplified class for loading objects from a DIRAC installation. @@ -834,6 +896,7 @@ def __init__(self): self.stopOnApplicationFailure = True self.stopAfterFailedMatches = 10 self.flavour = "DIRAC" + self.batchSystemInfo = {} self.pilotReference = "" self.releaseVersion = "" self.releaseProject = ""