Skip to content

Commit

Permalink
Merge pull request #195 from DIRACGrid/revert-181-vacuum_pilots
Browse files Browse the repository at this point in the history
Revert "master: add new command RegisterPilot"
  • Loading branch information
fstagni authored Jun 28, 2023
2 parents 5a527da + ad84bc9 commit 15a0874
Show file tree
Hide file tree
Showing 6 changed files with 789 additions and 177 deletions.
270 changes: 118 additions & 152 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def __init__(self, pilotParams):
from pipes import quote

try:
from Pilot.pilotTools import CommandBase, retrieveUrlTimeout, which
from Pilot.pilotTools import CommandBase, retrieveUrlTimeout
except ImportError:
from pilotTools import CommandBase, retrieveUrlTimeout, which
from pilotTools import CommandBase, retrieveUrlTimeout
############################


Expand Down Expand Up @@ -520,155 +520,6 @@ def _getSecurityCFG(self):
self.cfg.append("-o /DIRAC/Security/KeyFile=%s/hostkey.pem" % self.pp.certsLocation)


class RegisterPilot(CommandBase):
"""The Pilot self-announce its own presence"""

def __init__(self, pilotParams):
"""c'tor"""
super(RegisterPilot, self).__init__(pilotParams)

if not which("dirac-admin-add-pilot", self.pp.installEnv):
self.log.info("Skipping Pilot Command RegisterPilot, as executable dirac-admin-add-pilot does not exist")

# this variable contains the options that are passed to dirac-admin-add-pilot
self.cfg = []
self.pilotStamp = os.environ.get("DIRAC_PILOT_STAMP", '')

@logFinalizer
def execute(self):
"""Calls dirac-admin-add-pilot"""

self.__setFlavour()

if self.pp.useServerCertificate:
self.cfg.append("-o /DIRAC/Security/UseServerCertificate=yes")
extractDNCommand = "openssl x509 -in %s/hostcert.pem " % self.pp.certsLocation
extractDNCommand += "-noout -subject -nameopt compat | sed 's/subject=//'"
retCode, res = self.executeAndGetOutput(extractDNCommand, self.pp.installEnv)
pilotOwnerDN = res.strip().split("\n")[-1]
if retCode:
self.log.error("Could not get execute %s [ERROR %d]" % (extractDNCommand, retCode))

pilotOwnerGroup = "certificate_group"
else:
pilotOwnerDN = self.pp.userDN
pilotOwnerGroup = self.pp.userGroup

if self.pp.localConfigFile:
if LooseVersion(self.releaseVersion) >= self.cfgOptionDIRACVersion:
self.cfg.append("--cfg")
self.cfg.append(self.pp.localConfigFile) # this file is as input

checkCmd = "dirac-admin-add-pilot %s %s %s %s %s --status=Running %s -d" % (
self.pp.pilotReference,
pilotOwnerDN,
pilotOwnerGroup,
self.pp.flavour,
self.pilotStamp,
" ".join(self.cfg),
)
retCode, _ = self.executeAndGetOutput(checkCmd, self.pp.installEnv)
if retCode:
self.log.error("Could not get execute dirac-admin-add-pilot [ERROR %d]" % retCode)

def __setFlavour(self):

self.pp.pilotReference = os.environ.get("DIRAC_PILOT_STAMP", self.pp.pilotReference)

# # Batch systems

# Take the reference from the Torque batch system
if "PBS_JOBID" in os.environ:
self.pp.flavour = "SSHTorque"
self.pp.pilotReference = "sshtorque://" + self.pp.ceName + "/" + os.environ["PBS_JOBID"].split(".")[0]

# Take the reference from the OAR batch system
if "OAR_JOBID" in os.environ:
self.pp.flavour = "SSHOAR"
self.pp.pilotReference = "sshoar://" + self.pp.ceName + "/" + os.environ["OAR_JOBID"]

# Grid Engine
if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ:
self.pp.flavour = "SSHGE"
self.pp.pilotReference = "sshge://" + self.pp.ceName + "/" + os.environ["JOB_ID"]
# Generic JOB_ID
elif "JOB_ID" in os.environ:
self.pp.flavour = "Generic"
self.pp.pilotReference = "generic://" + self.pp.ceName + "/" + os.environ["JOB_ID"]

# LSF
if "LSB_BATCH_JID" in os.environ:
self.pp.flavour = "SSHLSF"
self.pp.pilotReference = "sshlsf://" + self.pp.ceName + "/" + os.environ["LSB_BATCH_JID"]

# SLURM batch system
if "SLURM_JOBID" in os.environ:
self.pp.flavour = "SSHSLURM"
self.pp.pilotReference = "sshslurm://" + self.pp.ceName + "/" + os.environ["SLURM_JOBID"]

# Condor
if "CONDOR_JOBID" in os.environ:
self.pp.flavour = "SSHCondor"
self.pp.pilotReference = "sshcondor://" + self.pp.ceName + "/" + os.environ["CONDOR_JOBID"]

# # CEs

# HTCondor
if "HTCONDOR_JOBID" in os.environ:
self.pp.flavour = "HTCondorCE"
self.pp.pilotReference = "htcondorce://" + self.pp.ceName + "/" + os.environ["HTCONDOR_JOBID"]

# This is the CREAM direct submission case
if "CREAM_JOBID" in os.environ:
self.pp.flavour = "CREAM"
self.pp.pilotReference = os.environ["CREAM_JOBID"]

if "OSG_WN_TMP" in os.environ:
self.pp.flavour = "OSG"

# GLOBUS Computing Elements
if "GLOBUS_GRAM_JOB_CONTACT" in os.environ:
self.pp.flavour = "GLOBUS"
self.pp.pilotReference = os.environ["GLOBUS_GRAM_JOB_CONTACT"]

# Direct SSH tunnel submission
if "SSHCE_JOBID" in os.environ:
self.pp.flavour = "SSH"
self.pp.pilotReference = "ssh://" + self.pp.ceName + "/" + os.environ["SSHCE_JOBID"]

# Batch host SSH tunnel submission (SSHBatch CE)
if "SSHBATCH_JOBID" in os.environ and "SSH_NODE_HOST" in os.environ:
self.pp.flavour = "SSHBATCH"
self.pp.pilotReference = (
"sshbatchhost://"
+ self.pp.ceName
+ "/"
+ os.environ["SSH_NODE_HOST"]
+ "/"
+ os.environ["SSHBATCH_JOBID"]
)

# ARC case
# JOBID does not provide the full url in recent versions of ARC
# JOBURL has been introduced recently and should be preferred when present
if "GRID_GLOBAL_JOBID" in os.environ:
self.pp.flavour = "ARC"
self.pp.pilotReference = os.environ["GRID_GLOBAL_JOBID"]

if "GRID_GLOBAL_JOBURL" in os.environ:
self.pp.flavour = "ARC"
self.pp.pilotReference = os.environ["GRID_GLOBAL_JOBURL"]

# # DIRAC specific

# VMDIRAC case
if "VMDIRAC_VERSION" in os.environ:
self.pp.flavour = "VMDIRAC"
self.pp.pilotReference = "vm://" + self.pp.ceName + "/" + os.environ["JOB_ID"]

self.log.debug("Flavour: %s; pilot reference: %s " % (self.pp.flavour, self.pp.pilotReference))


class CheckCECapabilities(CommandBase):
"""Used to get CE tags and other relevant parameters."""

Expand Down Expand Up @@ -871,6 +722,7 @@ def __init__(self, pilotParams):
@logFinalizer
def execute(self):
"""Setup configuration parameters"""
self.__setFlavour()
self.cfg.append("-o /LocalSite/GridMiddleware=%s" % self.pp.flavour)

self.cfg.append('-n "%s"' % self.pp.site)
Expand All @@ -886,7 +738,7 @@ def execute(self):
if o == "-o" or o == "--option":
self.cfg.append('-o "%s"' % v)

if self.pp.pilotReference:
if self.pp.pilotReference != "Unknown":
self.cfg.append("-o /LocalSite/PilotReference=%s" % self.pp.pilotReference)

if self.pp.useServerCertificate:
Expand All @@ -913,6 +765,120 @@ def execute(self):
self.log.error("Could not configure DIRAC [ERROR %d]" % retCode)
self.exitWithError(retCode)

def __setFlavour(self):

pilotRef = "Unknown"
self.pp.flavour = "Generic"

# If pilot reference is specified at submission, then set flavour to DIRAC
# unless overridden by presence of batch system environment variables
if self.pp.pilotReference:
self.pp.flavour = "DIRAC"
pilotRef = self.pp.pilotReference

# # Batch systems

# Take the reference from the Torque batch system
if "PBS_JOBID" in os.environ:
self.pp.flavour = "SSHTorque"
pilotRef = "sshtorque://" + self.pp.ceName + "/" + os.environ["PBS_JOBID"].split(".")[0]

# Take the reference from the OAR batch system
if "OAR_JOBID" in os.environ:
self.pp.flavour = "SSHOAR"
pilotRef = "sshoar://" + self.pp.ceName + "/" + os.environ["OAR_JOBID"]

# Grid Engine
if "JOB_ID" in os.environ and "SGE_TASK_ID" in os.environ:
self.pp.flavour = "SSHGE"
pilotRef = "sshge://" + self.pp.ceName + "/" + os.environ["JOB_ID"]
# Generic JOB_ID
elif "JOB_ID" in os.environ:
self.pp.flavour = "Generic"
pilotRef = "generic://" + self.pp.ceName + "/" + os.environ["JOB_ID"]

# LSF
if "LSB_BATCH_JID" in os.environ:
self.pp.flavour = "SSHLSF"
pilotRef = "sshlsf://" + self.pp.ceName + "/" + os.environ["LSB_BATCH_JID"]

# SLURM batch system
if "SLURM_JOBID" in os.environ:
self.pp.flavour = "SSHSLURM"
pilotRef = "sshslurm://" + self.pp.ceName + "/" + os.environ["SLURM_JOBID"]

# Condor
if "CONDOR_JOBID" in os.environ:
self.pp.flavour = "SSHCondor"
pilotRef = "sshcondor://" + self.pp.ceName + "/" + os.environ["CONDOR_JOBID"]

# # CEs

# HTCondor
if "HTCONDOR_JOBID" in os.environ:
self.pp.flavour = "HTCondorCE"
pilotRef = "htcondorce://" + self.pp.ceName + "/" + os.environ["HTCONDOR_JOBID"]

# This is the CREAM direct submission case
if "CREAM_JOBID" in os.environ:
self.pp.flavour = "CREAM"
pilotRef = os.environ["CREAM_JOBID"]

if "OSG_WN_TMP" in os.environ:
self.pp.flavour = "OSG"

# GLOBUS Computing Elements
if "GLOBUS_GRAM_JOB_CONTACT" in os.environ:
self.pp.flavour = "GLOBUS"
pilotRef = os.environ["GLOBUS_GRAM_JOB_CONTACT"]

# Direct SSH tunnel submission
if "SSHCE_JOBID" in os.environ:
self.pp.flavour = "SSH"
pilotRef = "ssh://" + self.pp.ceName + "/" + os.environ["SSHCE_JOBID"]

# Batch host SSH tunnel submission (SSHBatch CE)
if "SSHBATCH_JOBID" in os.environ and "SSH_NODE_HOST" in os.environ:
self.pp.flavour = "SSHBATCH"
pilotRef = (
"sshbatchhost://"
+ self.pp.ceName
+ "/"
+ os.environ["SSH_NODE_HOST"]
+ "/"
+ os.environ["SSHBATCH_JOBID"]
)

# ARC case
# JOBID does not provide the full url in recent versions of ARC
# JOBURL has been introduced recently and should be preferred when present
if "GRID_GLOBAL_JOBID" in os.environ:
self.pp.flavour = "ARC"
pilotRef = os.environ["GRID_GLOBAL_JOBID"]

if "GRID_GLOBAL_JOBURL" in os.environ:
self.pp.flavour = "ARC"
pilotRef = os.environ["GRID_GLOBAL_JOBURL"]

# # DIRAC specific

# VMDIRAC case
if "VMDIRAC_VERSION" in os.environ:
self.pp.flavour = "VMDIRAC"
pilotRef = "vm://" + self.pp.ceName + "/" + os.environ["JOB_ID"]

# Pilot reference is given explicitly in environment
if "PILOT_UUID" in os.environ:
pilotRef = os.environ["PILOT_UUID"]

# Pilot reference is specified at submission
if self.pp.pilotReference:
pilotRef = self.pp.pilotReference

self.log.debug("Flavour: %s; pilot reference: %s " % (self.pp.flavour, pilotRef))

self.pp.pilotReference = pilotRef


class ConfigureArchitecture(CommandBase):
"""This command simply calls dirac-platfom to determine the platform.
Expand Down
22 changes: 2 additions & 20 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,6 @@ def retrieveUrlTimeout(url, fileName, log, timeout=0):
raise x


def which(cmd, environDict=None):
"""
test if an executable exists, python2 compatible
(in python 3 one could simply use shutil.which(cmd)).
If testing for the existance of a DIRAC command,
this would only work for pilots installing python3 DIRAC clients
"""
if not environDict:
environDict=os.environ

for prefix in environDict["PATH"].split(os.pathsep):
filename = os.path.join(prefix, cmd)
executable = os.access(filename, os.X_OK)
if executable and os.path.isfile(filename):
return os.path.join(prefix, filename)
return False


class ObjectLoader(object):
"""Simplified class for loading objects from a DIRAC installation.
Expand Down Expand Up @@ -656,7 +637,6 @@ def __init__(self):
"CheckWorkerNode",
"InstallDIRAC",
"ConfigureBasics",
"RegisterPilot",
"CheckCECapabilities",
"CheckWNCapabilities",
"ConfigureSite",
Expand Down Expand Up @@ -832,6 +812,8 @@ def __initCommandLine2(self):
self.site = v
elif o == "-y" or o == "--CEType":
self.ceType = v
elif o == "-R" or o == "--reference":
self.pilotReference = v
elif o == "-k" or o == "--keepPP":
self.keepPythonPath = True
elif o in ("-C", "--configurationServer"):
Expand Down
Loading

0 comments on commit 15a0874

Please sign in to comment.