Skip to content

Commit

Permalink
sweep: DIRACGrid#6405 Pilot stamps available in the pilot execution e…
Browse files Browse the repository at this point in the history
…nvironment
  • Loading branch information
atsareg authored and fstagni committed May 25, 2023
1 parent 57ff9fa commit 9969ff1
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 14 deletions.
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/ARCComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
(stdout="{diracStamp}.out")
(stderr="{diracStamp}.err")
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}"))
(outputFiles={xrslOutputFiles})
(queue={queue})
{xrslMPAdditions}
Expand Down
7 changes: 4 additions & 3 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def submitJob(self, **kwargs):
nJobs = kwargs.get("NJobs")
if not nJobs:
nJobs = 1
stamps = kwargs["JobStamps"]
numberOfProcessors = kwargs.get("NumberOfProcessors")
outputDir = kwargs["OutputDir"]
executable = kwargs["Executable"]
Expand All @@ -98,15 +99,15 @@ def submitJob(self, **kwargs):
Output = $(Cluster).$(Process).out
Error = $(Cluster).$(Process).err
Log = test.log
Environment = CONDOR_JOBID=$(Cluster).$(Process)
Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
Getenv = False
request_cpus = %s
Queue %s
Queue stamp in %s
"""
% (executable, outputDir, numberOfProcessors, nJobs)
% (executable, outputDir, numberOfProcessors, ",".join(stamps))
)

jdlFile.flush()
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/BatchSystems/Host.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def submitJob(self, **kwargs):
args["Stamp"] = stamps[_i]
envDict = os.environ
envDict[jobidName] = stamps[_i]
envDict["DIRAC_PILOT_STAMP"] = stamps[_i]
if nodeHost:
envDict["SSH_NODE_HOST"] = nodeHost
try:
Expand Down
5 changes: 3 additions & 2 deletions src/DIRAC/Resources/Computing/BatchSystems/SLURM.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def submitJob(self, **kwargs):
if not nJobs:
nJobs = 1

stamps = kwargs["JobStamps"]
outputDir = kwargs["OutputDir"]
errorDir = kwargs["ErrorDir"]
queue = kwargs["Queue"]
Expand All @@ -56,13 +57,13 @@ def submitJob(self, **kwargs):
self._generateSrunWrapper(executable)

jobIDs = []
for _i in range(nJobs):
for iJob in range(nJobs):
jid = ""
cmd = "%s; " % preamble if preamble else ""
# By default, all the environment variables of the submitter node are propagated to the workers
# It can create conflicts during the installation of the pilots
# --export restricts the propagation to the PATH variable to get a clean environment in the workers
cmd += "sbatch --export=PATH "
cmd += "sbatch --export=PATH,DIRAC_PILOT_STAMP=%s " % stamps[iJob]
cmd += "-o %s/%%j.out " % outputDir
cmd += "-e %s/%%j.err " % errorDir
cmd += "--partition=%s " % queue
Expand Down
12 changes: 9 additions & 3 deletions src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def _getSSHKeyID(self):
"""
return self.ceParameters.get("Instance_SSHKey", None)

def _getMetadata(self, executableFile):
def _getMetadata(self, executableFile, pilotStamp=""):
"""Builds metadata from configuration system, cloudinit template
and dirac pilot job wrapper
Expand All @@ -343,6 +343,8 @@ def _getMetadata(self, executableFile):
filedef["content"] = self.proxy
elif filedef["content"] == "EXECUTABLE_STR":
filedef["content"] = exe_str
elif "STAMP_STR" in filedef["content"]:
filedef["content"] = filedef["content"].replace("STAMP_STR", pilotStamp)
ext_packages = self.ceParameters.get("Context_ExtPackages", None)
if ext_packages:
packages = [x.strip() for x in ext_packages.split(",")]
Expand Down Expand Up @@ -464,25 +466,29 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
if networks:
instParams["networks"] = networks
instParams["ex_keyname"] = self._getSSHKeyID()
instParams["ex_userdata"] = self._getMetadata(executableFile)
instParams["ex_config_drive"] = True

driver = self._getDriver()
stampDict = {}

for _ in range(numberOfJobs):
# generates an 8 character hex string
instRandom = str(uuid.uuid4()).upper()[:8]
instName = VM_NAME_PREFIX + instRandom
instParams["name"] = instName
instParams["ex_userdata"] = self._getMetadata(executableFile, instRandom)
try:
node = driver.create_node(**instParams)
except Exception as err:
self.log.error("Failed to create_node", str(err))
continue
instIDs.append(VM_ID_PREFIX + node.id)
stampDict[instName] = instRandom
if not instIDs:
return S_ERROR("Failed to submit any instances.")
return S_OK(instIDs)
result = S_OK(instIDs)
result["PilotStampDict"] = stampDict
return result

def killJob(self, jobIDList):
"""Stops VM instances
Expand Down
10 changes: 6 additions & 4 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,14 @@ def __init__(self, ceUniqueID):
self.remoteScheddOptions = ""

#############################################################################
def __writeSub(self, executable, nJobs, location, processors):
def __writeSub(self, executable, nJobs, location, processors, pilotStamps):
"""Create the Sub File for submission.
:param str executable: name of the script to execute
:param int nJobs: number of desired jobs
:param str location: directory that should contain the result of the jobs
:param int processors: number of CPU cores to allocate
:param list pilotStamps: list of pilot stamps (strings)
"""

self.log.debug("Working directory:", self.workingDirectory)
Expand Down Expand Up @@ -189,7 +190,7 @@ def __writeSub(self, executable, nJobs, location, processors):
output = $(Cluster).$(Process).out
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
environment = "HTCONDOR_JOBID=$(Cluster).$(Process)"
environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
initialdir = %(initialDir)s
grid_resource = condor %(ceName)s %(ceName)s:9619
transfer_output_files = ""
Expand All @@ -200,7 +201,7 @@ def __writeSub(self, executable, nJobs, location, processors):
%(extraString)s
Queue %(nJobs)s
Queue stamp in %(pilotStampList)s
""" % dict(
executable=executable,
Expand All @@ -211,6 +212,7 @@ def __writeSub(self, executable, nJobs, location, processors):
initialDir=os.path.join(self.workingDirectory, location),
localScheddOptions=localScheddOptions,
targetUniverse=targetUniverse,
pilotStampList=",".join(pilotStamps),
)
subFile.write(sub)
subFile.close()
Expand Down Expand Up @@ -253,7 +255,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
# We randomize the location of the pilot output and log, because there are just too many of them
location = logDir(self.ceName, commonJobStampPart)
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps)

cmd = ["condor_submit", "-terse", subName]
# the options for submit to remote are different than the other remoteScheddOptions
Expand Down
7 changes: 6 additions & 1 deletion src/DIRAC/Resources/Computing/SSHBatchComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,19 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
# Submit jobs now
restJobs = numberOfJobs
submittedJobs = []
stampDict = {}
for slots in range(maxSlots, 0, -1):
if slots not in rankHosts:
continue
for host in rankHosts[slots]:
result = self._submitJobToHost(submitFile, min(slots, restJobs), host)
if not result["OK"]:
continue

nJobs = len(result["Value"])
if nJobs > 0:
submittedJobs.extend(result["Value"])
stampDict.update(result.get("PilotStampDict", {}))
restJobs = restJobs - nJobs
if restJobs <= 0:
break
Expand All @@ -113,7 +116,9 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
if proxy:
os.remove(submitFile)

return S_OK(submittedJobs)
result = S_OK(submittedJobs)
result["PilotStampDict"] = stampDict
return result

def killJob(self, jobIDs):
"""Kill specified jobs"""
Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,10 @@ def _submitJobToHost(self, executableFile, numberOfJobs, host=None):
return S_ERROR("No jobs IDs returned")

result = S_OK(jobIDs)
stampDict = {}
for iJob, jobID in enumerate(jobIDs):
stampDict[jobID] = jobStamps[iJob]
result["PilotStampDict"] = stampDict
self.submittedJobs += len(batchIDs)

return result
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/cloudinit.template
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ write_files:
cd /mnt/dirac
export X509_USER_PROXY=/mnt/proxy.pem
export PILOT_UUID="cloud://$(cat /var/lib/cloud/data/instance-id)"
export DIRAC_PILOT_STAMP="STAMP_STR"
bash /mnt/run_pilot.sh &> /mnt/dirac/startup.log

yum_repos:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from DIRAC.Resources.Computing import HTCondorCEComputingElement as HTCE
from DIRAC.Resources.Computing.BatchSystems import Condor
from DIRAC.Core.Utilities.File import makeGuid
from DIRAC import S_OK

MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement"
Expand Down Expand Up @@ -121,7 +122,13 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected):
mocker.patch(MODNAME + ".tempfile.mkstemp", return_value=("os", "pilotName"))
mocker.patch(MODNAME + ".mkDir")

htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1) # pylint: disable=E1101
jobStamps = []
commonJobStampPart = makeGuid()[:3]
for _i in range(42):
jobStamp = commonJobStampPart + makeGuid()[:5]
jobStamps.append(jobStamp)

htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101
for option in optionsNotExpected:
# the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument]
assert option not in subFileMock.write.call_args_list[0][0][0]
Expand Down

0 comments on commit 9969ff1

Please sign in to comment.