Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sweep:integration] Pilot stamps available in the pilot execution environment #6806

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/ARCComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
(inputFiles=({executable} "{executableFile}") {xrslInputAdditions})
(stdout="{diracStamp}.out")
(stderr="{diracStamp}.err")
(environment=("DIRAC_PILOT_STAMP" "{diracStamp}"))
(outputFiles={xrslOutputFiles})
(queue={queue})
{xrslMPAdditions}
Expand Down
7 changes: 4 additions & 3 deletions src/DIRAC/Resources/Computing/BatchSystems/Condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def submitJob(self, **kwargs):
nJobs = kwargs.get("NJobs")
if not nJobs:
nJobs = 1
stamps = kwargs["JobStamps"]
numberOfProcessors = kwargs.get("NumberOfProcessors")
outputDir = kwargs["OutputDir"]
executable = kwargs["Executable"]
Expand All @@ -98,15 +99,15 @@ def submitJob(self, **kwargs):
Output = $(Cluster).$(Process).out
Error = $(Cluster).$(Process).err
Log = test.log
Environment = CONDOR_JOBID=$(Cluster).$(Process)
Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
Getenv = False

request_cpus = %s

Queue %s
Queue stamp in %s

"""
% (executable, outputDir, numberOfProcessors, nJobs)
% (executable, outputDir, numberOfProcessors, ",".join(stamps))
)

jdlFile.flush()
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/BatchSystems/Host.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def submitJob(self, **kwargs):
args["Stamp"] = stamps[_i]
envDict = os.environ
envDict[jobidName] = stamps[_i]
envDict["DIRAC_PILOT_STAMP"] = stamps[_i]
if nodeHost:
envDict["SSH_NODE_HOST"] = nodeHost
try:
Expand Down
5 changes: 3 additions & 2 deletions src/DIRAC/Resources/Computing/BatchSystems/SLURM.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def submitJob(self, **kwargs):
if not nJobs:
nJobs = 1

stamps = kwargs["JobStamps"]
outputDir = kwargs["OutputDir"]
errorDir = kwargs["ErrorDir"]
queue = kwargs["Queue"]
Expand All @@ -56,13 +57,13 @@ def submitJob(self, **kwargs):
self._generateSrunWrapper(executable)

jobIDs = []
for _i in range(nJobs):
for iJob in range(nJobs):
jid = ""
cmd = "%s; " % preamble if preamble else ""
# By default, all the environment variables of the submitter node are propagated to the workers
# It can create conflicts during the installation of the pilots
# --export restricts the propagation to the PATH variable to get a clean environment in the workers
cmd += "sbatch --export=PATH "
cmd += "sbatch --export=PATH,DIRAC_PILOT_STAMP=%s " % stamps[iJob]
cmd += "-o %s/%%j.out " % outputDir
cmd += "-e %s/%%j.err " % errorDir
cmd += "--partition=%s " % queue
Expand Down
12 changes: 9 additions & 3 deletions src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def _getSSHKeyID(self):
"""
return self.ceParameters.get("Instance_SSHKey", None)

def _getMetadata(self, executableFile):
def _getMetadata(self, executableFile, pilotStamp=""):
"""Builds metadata from configuration system, cloudinit template
and dirac pilot job wrapper

Expand All @@ -343,6 +343,8 @@ def _getMetadata(self, executableFile):
filedef["content"] = self.proxy
elif filedef["content"] == "EXECUTABLE_STR":
filedef["content"] = exe_str
elif "STAMP_STR" in filedef["content"]:
filedef["content"] = filedef["content"].replace("STAMP_STR", pilotStamp)
ext_packages = self.ceParameters.get("Context_ExtPackages", None)
if ext_packages:
packages = [x.strip() for x in ext_packages.split(",")]
Expand Down Expand Up @@ -464,25 +466,29 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
if networks:
instParams["networks"] = networks
instParams["ex_keyname"] = self._getSSHKeyID()
instParams["ex_userdata"] = self._getMetadata(executableFile)
instParams["ex_config_drive"] = True

driver = self._getDriver()
stampDict = {}

for _ in range(numberOfJobs):
# generates an 8 character hex string
instRandom = str(uuid.uuid4()).upper()[:8]
instName = VM_NAME_PREFIX + instRandom
instParams["name"] = instName
instParams["ex_userdata"] = self._getMetadata(executableFile, instRandom)
try:
node = driver.create_node(**instParams)
except Exception as err:
self.log.error("Failed to create_node", str(err))
continue
instIDs.append(VM_ID_PREFIX + node.id)
stampDict[instName] = instRandom
if not instIDs:
return S_ERROR("Failed to submit any instances.")
return S_OK(instIDs)
result = S_OK(instIDs)
result["PilotStampDict"] = stampDict
return result

def killJob(self, jobIDList):
"""Stops VM instances
Expand Down
10 changes: 6 additions & 4 deletions src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,14 @@ def __init__(self, ceUniqueID):
self.remoteScheddOptions = ""

#############################################################################
def __writeSub(self, executable, nJobs, location, processors):
def __writeSub(self, executable, nJobs, location, processors, pilotStamps):
"""Create the Sub File for submission.

:param str executable: name of the script to execute
:param int nJobs: number of desired jobs
:param str location: directory that should contain the result of the jobs
:param int processors: number of CPU cores to allocate
:param list pilotStamps: list of pilot stamps (strings)
"""

self.log.debug("Working directory:", self.workingDirectory)
Expand Down Expand Up @@ -189,7 +190,7 @@ def __writeSub(self, executable, nJobs, location, processors):
output = $(Cluster).$(Process).out
error = $(Cluster).$(Process).err
log = $(Cluster).$(Process).log
environment = "HTCONDOR_JOBID=$(Cluster).$(Process)"
environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)"
initialdir = %(initialDir)s
grid_resource = condor %(ceName)s %(ceName)s:9619
transfer_output_files = ""
Expand All @@ -200,7 +201,7 @@ def __writeSub(self, executable, nJobs, location, processors):

%(extraString)s

Queue %(nJobs)s
Queue stamp in %(pilotStampList)s

""" % dict(
executable=executable,
Expand All @@ -211,6 +212,7 @@ def __writeSub(self, executable, nJobs, location, processors):
initialDir=os.path.join(self.workingDirectory, location),
localScheddOptions=localScheddOptions,
targetUniverse=targetUniverse,
pilotStampList=",".join(pilotStamps),
)
subFile.write(sub)
subFile.close()
Expand Down Expand Up @@ -253,7 +255,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
# We randomize the location of the pilot output and log, because there are just too many of them
location = logDir(self.ceName, commonJobStampPart)
nProcessors = self.ceParameters.get("NumberOfProcessors", 1)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors)
subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps)

cmd = ["condor_submit", "-terse", subName]
# the options for submit to remote are different than the other remoteScheddOptions
Expand Down
7 changes: 6 additions & 1 deletion src/DIRAC/Resources/Computing/SSHBatchComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,19 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
# Submit jobs now
restJobs = numberOfJobs
submittedJobs = []
stampDict = {}
for slots in range(maxSlots, 0, -1):
if slots not in rankHosts:
continue
for host in rankHosts[slots]:
result = self._submitJobToHost(submitFile, min(slots, restJobs), host)
if not result["OK"]:
continue

nJobs = len(result["Value"])
if nJobs > 0:
submittedJobs.extend(result["Value"])
stampDict.update(result.get("PilotStampDict", {}))
restJobs = restJobs - nJobs
if restJobs <= 0:
break
Expand All @@ -113,7 +116,9 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
if proxy:
os.remove(submitFile)

return S_OK(submittedJobs)
result = S_OK(submittedJobs)
result["PilotStampDict"] = stampDict
return result

def killJob(self, jobIDs):
"""Kill specified jobs"""
Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/Resources/Computing/SSHComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,10 @@ def _submitJobToHost(self, executableFile, numberOfJobs, host=None):
return S_ERROR("No jobs IDs returned")

result = S_OK(jobIDs)
stampDict = {}
for iJob, jobID in enumerate(jobIDs):
stampDict[jobID] = jobStamps[iJob]
result["PilotStampDict"] = stampDict
self.submittedJobs += len(batchIDs)

return result
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/Resources/Computing/cloudinit.template
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ write_files:
cd /mnt/dirac
export X509_USER_PROXY=/mnt/proxy.pem
export PILOT_UUID="cloud://$(cat /var/lib/cloud/data/instance-id)"
export DIRAC_PILOT_STAMP="STAMP_STR"
bash /mnt/run_pilot.sh &> /mnt/dirac/startup.log

yum_repos:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from DIRAC.Resources.Computing import HTCondorCEComputingElement as HTCE
from DIRAC.Resources.Computing.BatchSystems import Condor
from DIRAC.Core.Utilities.File import makeGuid
from DIRAC import S_OK

MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement"
Expand Down Expand Up @@ -121,7 +122,13 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected):
mocker.patch(MODNAME + ".tempfile.mkstemp", return_value=("os", "pilotName"))
mocker.patch(MODNAME + ".mkDir")

htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1) # pylint: disable=E1101
jobStamps = []
commonJobStampPart = makeGuid()[:3]
for _i in range(42):
jobStamp = commonJobStampPart + makeGuid()[:5]
jobStamps.append(jobStamp)

htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101
for option in optionsNotExpected:
# the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument]
assert option not in subFileMock.write.call_args_list[0][0][0]
Expand Down