Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[devel] Collect batch system info #222

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ def __init__(self, pilotParams):
try:
from Pilot.pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
@@ -56,7 +56,7 @@ def __init__(self, pilotParams):
except ImportError:
from pilotTools import (
CommandBase,
getFlavour,
getSubmitterInfo,
retrieveUrlTimeout,
safe_listdir,
sendMessage,
@@ -550,8 +550,7 @@ def execute(self):

VOs may want to replace/extend the _getBasicsCFG and _getSecurityCFG functions
"""

self.pp.flavour, self.pp.pilotReference = getFlavour(self.pp.ceName)
self.pp.flavour, self.pp.pilotReference, self.pp.batchSystemInfo = getSubmitterInfo(self.pp.ceName)

self._getBasicsCFG()
self._getSecurityCFG()
@@ -846,6 +845,17 @@ def execute(self):
"""Setup configuration parameters"""
self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour)

# Add batch system details to the configuration
# Can be used by the pilot/job later on, to interact with the batch system
self.cfg.append("-o /LocalSite/BatchSystem/Type=%s" % self.pp.batchSystemInfo.get("Type", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/JobID=%s" % self.pp.batchSystemInfo.get("JobID", "Unknown"))

batchSystemParams = self.pp.batchSystemInfo.get("Parameters", {})
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Queue=%s" % batchSystemParams.get("Queue", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/BinaryPath=%s" % batchSystemParams.get("BinaryPath", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/Host=%s" % batchSystemParams.get("Host", "Unknown"))
self.cfg.append("-o /LocalSite/BatchSystem/Parameters/InfoPath=%s" % batchSystemParams.get("InfoPath", "Unknown"))

self.cfg.append('-n "%s"' % self.pp.site)
self.cfg.append('-S "%s"' % self.pp.setup)

111 changes: 87 additions & 24 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
from datetime import datetime
from functools import partial, wraps
from threading import RLock
import warnings

############################
# python 2 -> 3 "hacks"
@@ -214,53 +215,103 @@ def listdir(directory):
return contents


def getFlavour(ceName):
def getSubmitterInfo(ceName):
fstagni marked this conversation as resolved.
Show resolved Hide resolved
"""Get information about the submitter of the pilot.

Check the environment variables to determine the type of batch system and CE used
to submit the pilot being used and return this information in a tuple.
"""

pilotReference = os.environ.get("DIRAC_PILOT_STAMP", "")
# Batch system taking care of the pilot
# Might be useful to extract the info to interact with it later on
batchSystemType = "Unknown"
batchSystemJobID = "Unknown"
batchSystemParameters = {
"BinaryPath": "Unknown",
"Host": "Unknown",
"InfoPath": "Unknown",
"Queue": "Unknown",
}
# Flavour of the pilot
# Inform whether the pilot was sent through SSH+batch system or a CE
flavour = "DIRAC"

# # Batch systems

# Take the reference from the Torque batch system
# Torque
if "PBS_JOBID" in os.environ:
flavour = "SSHTorque"
pilotReference = "sshtorque://" + ceName + "/" + os.environ["PBS_JOBID"].split(".")[0]
batchSystemType = "PBS"
batchSystemJobID = os.environ["PBS_JOBID"]
batchSystemParameters["BinaryPath"] = os.environ.get("PBS_O_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("PBS_O_QUEUE", "Unknown")

# Take the reference from the OAR batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshpbs://" + ceName + "/" + batchSystemJobID.split(".")[0]

# OAR
if "OAR_JOBID" in os.environ:
flavour = "SSHOAR"
pilotReference = "sshoar://" + ceName + "/" + os.environ["OAR_JOBID"]
batchSystemType = "OAR"
batchSystemJobID = os.environ["OAR_JOBID"]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchSystemParameters["BinaryPath"] and batchSystemParameters["Queue"] for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no OARResourceUsage module + I don't know how to get the BinaryPath and the Queue from the environment variables in this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm maybe @atsareg knows what was this for. The feeling is that it could be dropped...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have seen, the project is not very known but still exists and has active developers working on it.

flavour = "SSH%s" % batchSystemType
pilotReference = "sshoar://" + ceName + "/" + batchSystemJobID

# Grid Engine
if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ:
flavour = "SSHGE"
pilotReference = "sshge://" + ceName + "/" + os.environ["JOB_ID"]
# Generic JOB_ID
elif "JOB_ID" in os.environ:
flavour = "Generic"
pilotReference = "generic://" + ceName + "/" + os.environ["JOB_ID"]
aldbr marked this conversation as resolved.
Show resolved Hide resolved
if "SGE_TASK_ID" in os.environ:
batchSystemType = "SGE"
batchSystemJobID = os.environ["JOB_ID"]
batchSystemParameters["BinaryPath"] = os.environ.get("SGE_BINARY_PATH", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("QUEUE", "Unknown")

flavour = "SSH%s" % batchSystemType
pilotReference = "sshge://" + ceName + "/" + batchSystemJobID

# LSF
if "LSB_BATCH_JID" in os.environ:
flavour = "SSHLSF"
pilotReference = "sshlsf://" + ceName + "/" + os.environ["LSB_BATCH_JID"]
batchSystemType = "LSF"
batchSystemJobID = os.environ["LSB_BATCH_JID"]
batchSystemParameters["BinaryPath"] = os.environ.get("LSF_BINDIR", "Unknown")
batchSystemParameters["Host"] = os.environ.get("LSB_HOSTS", "Unknown")
batchSystemParameters["InfoPath"] = os.environ.get("LSF_ENVDIR", "Unknown")
batchSystemParameters["Queue"] = os.environ.get("LSB_QUEUE", "Unknown")

# SLURM batch system
flavour = "SSH%s" % batchSystemType
pilotReference = "sshlsf://" + ceName + "/" + batchSystemJobID

# SLURM
if "SLURM_JOBID" in os.environ:
flavour = "SSHSLURM"
pilotReference = "sshslurm://" + ceName + "/" + os.environ["SLURM_JOBID"]
batchSystemType = "SLURM"
batchSystemJobID = os.environ["SLURM_JOBID"]

flavour = "SSH%s" % batchSystemType
pilotReference = "sshslurm://" + ceName + "/" + batchSystemJobID

# Condor
if "CONDOR_JOBID" in os.environ:
flavour = "SSHCondor"
pilotReference = "sshcondor://" + ceName + "/" + os.environ["CONDOR_JOBID"]
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["CONDOR_JOBID"]
batchSystemParameters["InfoPath"] = os.environ.get("_CONDOR_JOB_AD", "Unknown")

# # CEs
flavour = "SSH%s" % batchSystemType
pilotReference = "sshcondor://" + ceName + "/" + batchSystemJobID

# # CEs/Batch Systems

# HTCondor
if "HTCONDOR_JOBID" in os.environ:
batchSystemType = "HTCondor"
batchSystemJobID = os.environ["HTCONDOR_JOBID"]

flavour = "HTCondorCE"
pilotReference = "htcondorce://" + ceName + "/" + os.environ["HTCONDOR_JOBID"]
pilotReference = "htcondorce://" + ceName + "/" + batchSystemJobID

# # Local/SSH

# Local submission to the host
if "LOCAL_JOBID" in os.environ:
flavour = "Local"
pilotReference = "local://" + ceName + "/" + os.environ["LOCAL_JOBID"]

# Direct SSH tunnel submission
if "SSHCE_JOBID" in os.environ:
@@ -274,6 +325,8 @@ def getFlavour(ceName):
"sshbatchhost://" + ceName + "/" + os.environ["SSH_NODE_HOST"] + "/" + os.environ["SSHBATCH_JOBID"]
)

# # CEs

# ARC
if "GRID_GLOBAL_JOBURL" in os.environ:
flavour = "ARC"
@@ -284,9 +337,18 @@ def getFlavour(ceName):
flavour = "VMDIRAC"
pilotReference = "vm://" + ceName + "/" + os.environ["JOB_ID"]

return flavour, pilotReference
return flavour, pilotReference, {"Type": batchSystemType, "JobID": batchSystemJobID, "Parameters": batchSystemParameters}


def getFlavour(ceName):
"""Old method to get the flavour of the pilot. Deprecated.

Please use getSubmitterInfo instead.
"""
warnings.warn("getFlavour() is deprecated. Please use getSubmitterInfo() instead.", category=DeprecationWarning, stacklevel=2)
flavour, pilotReference, _ = getSubmitterInfo(ceName)
return flavour, pilotReference

class ObjectLoader(object):
"""Simplified class for loading objects from a DIRAC installation.

@@ -834,6 +896,7 @@ def __init__(self):
self.stopOnApplicationFailure = True
self.stopAfterFailedMatches = 10
self.flavour = "DIRAC"
self.batchSystemInfo = {}
self.pilotReference = ""
self.releaseVersion = ""
self.releaseProject = ""