From dac28565f720c398eaa84b11f6b57b021d5407bf Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Wed, 15 Feb 2023 16:05:41 +0100 Subject: [PATCH] sweep: #6405 Pilot stamps available in the pilot execution environment --- src/DIRAC/Resources/Computing/ARCComputingElement.py | 1 + src/DIRAC/Resources/Computing/BatchSystems/Condor.py | 7 ++++--- src/DIRAC/Resources/Computing/BatchSystems/Host.py | 1 + src/DIRAC/Resources/Computing/BatchSystems/SLURM.py | 5 +++-- .../Resources/Computing/CloudComputingElement.py | 12 +++++++++--- .../Computing/HTCondorCEComputingElement.py | 10 ++++++---- .../Resources/Computing/SSHBatchComputingElement.py | 7 ++++++- src/DIRAC/Resources/Computing/SSHComputingElement.py | 4 ++++ src/DIRAC/Resources/Computing/cloudinit.template | 1 + .../test/Test_HTCondorCEComputingElement.py | 9 ++++++++- 10 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/DIRAC/Resources/Computing/ARCComputingElement.py b/src/DIRAC/Resources/Computing/ARCComputingElement.py index cba5e69b1f4..5069573ef11 100755 --- a/src/DIRAC/Resources/Computing/ARCComputingElement.py +++ b/src/DIRAC/Resources/Computing/ARCComputingElement.py @@ -216,6 +216,7 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None (inputFiles=({executable} "{executableFile}") {xrslInputAdditions}) (stdout="{diracStamp}.out") (stderr="{diracStamp}.err") +(environment=("DIRAC_PILOT_STAMP" "{diracStamp}")) (outputFiles={xrslOutputFiles}) (queue={queue}) {xrslMPAdditions} diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index 8dce42f3d90..21f6e58773a 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -82,6 +82,7 @@ def submitJob(self, **kwargs): nJobs = kwargs.get("NJobs") if not nJobs: nJobs = 1 + stamps = kwargs["JobStamps"] numberOfProcessors = kwargs.get("NumberOfProcessors") outputDir = kwargs["OutputDir"] executable = kwargs["Executable"] @@ -98,15 +99,15 @@ def submitJob(self, **kwargs): Output = $(Cluster).$(Process).out Error = $(Cluster).$(Process).err Log = test.log - Environment = CONDOR_JOBID=$(Cluster).$(Process) + Environment = "CONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" Getenv = False request_cpus = %s - Queue %s + Queue stamp in %s """ - % (executable, outputDir, numberOfProcessors, nJobs) + % (executable, outputDir, numberOfProcessors, ",".join(stamps)) ) jdlFile.flush() diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Host.py b/src/DIRAC/Resources/Computing/BatchSystems/Host.py index 65046045f29..cdd0a49396b 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Host.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Host.py @@ -94,6 +94,7 @@ def submitJob(self, **kwargs): args["Stamp"] = stamps[_i] envDict = os.environ envDict[jobidName] = stamps[_i] + envDict["DIRAC_PILOT_STAMP"] = stamps[_i] if nodeHost: envDict["SSH_NODE_HOST"] = nodeHost try: diff --git a/src/DIRAC/Resources/Computing/BatchSystems/SLURM.py b/src/DIRAC/Resources/Computing/BatchSystems/SLURM.py index a4d8c4c3973..70fe9be45d5 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/SLURM.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/SLURM.py @@ -32,6 +32,7 @@ def submitJob(self, **kwargs): if not nJobs: nJobs = 1 + stamps = kwargs["JobStamps"] outputDir = kwargs["OutputDir"] errorDir = kwargs["ErrorDir"] queue = kwargs["Queue"] @@ -56,13 +57,13 @@ def submitJob(self, **kwargs): self._generateSrunWrapper(executable) jobIDs = [] - for _i in range(nJobs): + for iJob in range(nJobs): jid = "" cmd = "%s; " % preamble if preamble else "" # By default, all the environment variables of the submitter node are propagated to the workers # It can create conflicts during the installation of the pilots # --export restricts the propagation to the PATH variable to get a clean environment in the workers - cmd += "sbatch --export=PATH " + cmd += "sbatch --export=PATH,DIRAC_PILOT_STAMP=%s " % stamps[iJob] cmd += "-o %s/%%j.out " % outputDir cmd += "-e %s/%%j.err " % errorDir cmd += "--partition=%s " % queue diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index 2bc30b474ec..ee19162938d 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -320,7 +320,7 @@ def _getSSHKeyID(self): """ return self.ceParameters.get("Instance_SSHKey", None) - def _getMetadata(self, executableFile): + def _getMetadata(self, executableFile, pilotStamp=""): """Builds metadata from configuration system, cloudinit template and dirac pilot job wrapper @@ -343,6 +343,8 @@ def _getMetadata(self, executableFile): filedef["content"] = self.proxy elif filedef["content"] == "EXECUTABLE_STR": filedef["content"] = exe_str + elif "STAMP_STR" in filedef["content"]: + filedef["content"] = filedef["content"].replace("STAMP_STR", pilotStamp) ext_packages = self.ceParameters.get("Context_ExtPackages", None) if ext_packages: packages = [x.strip() for x in ext_packages.split(",")] @@ -464,25 +466,29 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): if networks: instParams["networks"] = networks instParams["ex_keyname"] = self._getSSHKeyID() - instParams["ex_userdata"] = self._getMetadata(executableFile) instParams["ex_config_drive"] = True driver = self._getDriver() + stampDict = {} for _ in range(numberOfJobs): # generates an 8 character hex string instRandom = str(uuid.uuid4()).upper()[:8] instName = VM_NAME_PREFIX + instRandom instParams["name"] = instName + instParams["ex_userdata"] = self._getMetadata(executableFile, instRandom) try: node = driver.create_node(**instParams) except Exception as err: self.log.error("Failed to create_node", str(err)) continue instIDs.append(VM_ID_PREFIX + node.id) + stampDict[instName] = instRandom if not instIDs: return S_ERROR("Failed to submit any instances.") - return S_OK(instIDs) + result = S_OK(instIDs) + result["PilotStampDict"] = stampDict + return result def killJob(self, jobIDList): """Stops VM instances diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 93c70c515e6..b25b6f83a79 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -144,13 +144,14 @@ def __init__(self, ceUniqueID): self.remoteScheddOptions = "" ############################################################################# - def __writeSub(self, executable, nJobs, location, processors): + def __writeSub(self, executable, nJobs, location, processors, pilotStamps): """Create the Sub File for submission. :param str executable: name of the script to execute :param int nJobs: number of desired jobs :param str location: directory that should contain the result of the jobs :param int processors: number of CPU cores to allocate + :param list pilotStamps: list of pilot stamps (strings) """ self.log.debug("Working directory:", self.workingDirectory) @@ -189,7 +190,7 @@ def __writeSub(self, executable, nJobs, location, processors): output = $(Cluster).$(Process).out error = $(Cluster).$(Process).err log = $(Cluster).$(Process).log -environment = "HTCONDOR_JOBID=$(Cluster).$(Process)" +environment = "HTCONDOR_JOBID=$(Cluster).$(Process) DIRAC_PILOT_STAMP=$(stamp)" initialdir = %(initialDir)s grid_resource = condor %(ceName)s %(ceName)s:9619 transfer_output_files = "" @@ -200,7 +201,7 @@ def __writeSub(self, executable, nJobs, location, processors): %(extraString)s -Queue %(nJobs)s +Queue stamp in %(pilotStampList)s """ % dict( executable=executable, @@ -211,6 +212,7 @@ def __writeSub(self, executable, nJobs, location, processors): initialDir=os.path.join(self.workingDirectory, location), localScheddOptions=localScheddOptions, targetUniverse=targetUniverse, + pilotStampList=",".join(pilotStamps), ) subFile.write(sub) subFile.close() @@ -253,7 +255,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): # We randomize the location of the pilot output and log, because there are just too many of them location = logDir(self.ceName, commonJobStampPart) nProcessors = self.ceParameters.get("NumberOfProcessors", 1) - subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors) + subName = self.__writeSub(executableFile, numberOfJobs, location, nProcessors, jobStamps) cmd = ["condor_submit", "-terse", subName] # the options for submit to remote are different than the other remoteScheddOptions diff --git a/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py b/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py index b38b64b7ee4..0266bf4df7f 100644 --- a/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHBatchComputingElement.py @@ -94,6 +94,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): # Submit jobs now restJobs = numberOfJobs submittedJobs = [] + stampDict = {} for slots in range(maxSlots, 0, -1): if slots not in rankHosts: continue @@ -101,9 +102,11 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): result = self._submitJobToHost(submitFile, min(slots, restJobs), host) if not result["OK"]: continue + nJobs = len(result["Value"]) if nJobs > 0: submittedJobs.extend(result["Value"]) + stampDict.update(result.get("PilotStampDict", {})) restJobs = restJobs - nJobs if restJobs <= 0: break @@ -113,7 +116,9 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1): if proxy: os.remove(submitFile) - return S_OK(submittedJobs) + result = S_OK(submittedJobs) + result["PilotStampDict"] = stampDict + return result def killJob(self, jobIDs): """Kill specified jobs""" diff --git a/src/DIRAC/Resources/Computing/SSHComputingElement.py b/src/DIRAC/Resources/Computing/SSHComputingElement.py index 9441fc672f5..0bc1fa00705 100644 --- a/src/DIRAC/Resources/Computing/SSHComputingElement.py +++ b/src/DIRAC/Resources/Computing/SSHComputingElement.py @@ -618,6 +618,10 @@ def _submitJobToHost(self, executableFile, numberOfJobs, host=None): return S_ERROR("No jobs IDs returned") result = S_OK(jobIDs) + stampDict = {} + for iJob, jobID in enumerate(jobIDs): + stampDict[jobID] = jobStamps[iJob] + result["PilotStampDict"] = stampDict self.submittedJobs += len(batchIDs) return result diff --git a/src/DIRAC/Resources/Computing/cloudinit.template b/src/DIRAC/Resources/Computing/cloudinit.template index 3b8f489da9f..b2d1d1e0496 100644 --- a/src/DIRAC/Resources/Computing/cloudinit.template +++ b/src/DIRAC/Resources/Computing/cloudinit.template @@ -81,6 +81,7 @@ write_files: cd /mnt/dirac export X509_USER_PROXY=/mnt/proxy.pem export PILOT_UUID="cloud://$(cat /var/lib/cloud/data/instance-id)" + export DIRAC_PILOT_STAMP="STAMP_STR" bash /mnt/run_pilot.sh &> /mnt/dirac/startup.log yum_repos: diff --git a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py index 4b5706efaa4..f412624bbad 100644 --- a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py @@ -6,6 +6,7 @@ from DIRAC.Resources.Computing import HTCondorCEComputingElement as HTCE from DIRAC.Resources.Computing.BatchSystems import Condor +from DIRAC.Core.Utilities.File import makeGuid from DIRAC import S_OK MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement" @@ -121,7 +122,13 @@ def test__writeSub(mocker, localSchedd, optionsNotExpected, optionsExpected): mocker.patch(MODNAME + ".tempfile.mkstemp", return_value=("os", "pilotName")) mocker.patch(MODNAME + ".mkDir") - htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1) # pylint: disable=E1101 + jobStamps = [] + commonJobStampPart = makeGuid()[:3] + for _i in range(42): + jobStamp = commonJobStampPart + makeGuid()[:5] + jobStamps.append(jobStamp) + + htce._HTCondorCEComputingElement__writeSub("dirac-install", 42, "", 1, jobStamps) # pylint: disable=E1101 for option in optionsNotExpected: # the three [0] are: call_args_list[firstCall][ArgsArgumentsTuple][FirstArgsArgument] assert option not in subFileMock.write.call_args_list[0][0][0]