diff --git a/docs/source/AdministratorGuide/Resources/supercomputers.rst b/docs/source/AdministratorGuide/Resources/supercomputers.rst index def98694c1d..4371bd8f22c 100644 --- a/docs/source/AdministratorGuide/Resources/supercomputers.rst +++ b/docs/source/AdministratorGuide/Resources/supercomputers.rst @@ -108,6 +108,9 @@ This case has not been addressed yet. No outbound connectivity ------------------------ +Submission management +--------------------- + Solutions seen in the previous section cannot work in an environment without external connectivity. The well-known Pilot-Job paradigm on which the DIRAC WMS is based does not apply in these circumstances: the Pilot-Jobs cannot fetch jobs from DIRAC. Thus, such supercomputers require slightly changes in the WMS: we reintroduced the push model. @@ -126,6 +129,12 @@ To leverage the Push model, one has to add the :mod:`~DIRAC.WorkloadManagementSy # Control the number of jobs handled on the machine MaxJobsToSubmit = 100 Module = PushJobAgent + # SubmissionPolicy can be "Application" or "JobWrapper" + # - Application (soon deprecated): the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site + # - JobWrapper (default): the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution + SubmissionPolicy = + # The CVMFS location to be used for the job execution on the remote site + CVMFSLocation = "/cvmfs/dirac.egi.eu/dirac/pro" } One has also to authorize the machine hosting the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` to process jobs via the ``Registry/Hosts/`` CS section:: @@ -155,25 +164,57 @@ One has to specify the concerned VO, the platform and the CPU Power in the targe Finally, one has to make sure that job scheduling parameters are correctly fine-tuned. Further details in the :ref:`JobScheduling section `. -:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` inherits from :mod:`~DIRAC.WorkloadManagementSystem.Agent.JobAgent` and proposes a similar structure: it fetches a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service and submit it to a :mod:`~DIRAC.Resources.Computing.PoolComputingElement`. - - It provides an additional parameter in ``/LocalSite`` named ``RemoteExecution`` that can be used later in the process to identify computing resources with no external connectivity. - - There is no ``timeLeft`` attribute: it runs on the DIRAC side as an ``Agent``. - - ``MaxJobsToSubmit`` corresponds to the maximum number of jobs the agent can handle at the same time. - - To fetch a job, :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` sends the dictionary of the target CE to the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service. +The :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` class extends the functionality of the :mod:`~DIRAC.WorkloadManagementSystem.Agent.JobAgent` and operates specifically on a VO box. It follows a similar architecture by retrieving jobs from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service and submitting them to a :mod:`~DIRAC.Resources.Computing.ComputingElement`. Although `PushJobAgent` does not inherit directly from the :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector`, it incorporates several comparable features, including: + +- Supervising specific Sites, Computing Elements (CEs), or Queues. +- Placing problematic queues on hold and retrying after a pre-defined number of cycles. + +The :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` supports two distinct submission modes: **JobWrapper** and **Application**. -:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` does not inherit from :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` but embeds similar features: - - It supervises specific Sites/CEs/Queues. - - If there is an error with a queue, it puts it on hold and waits for a certain number of cycles before trying again. +JobWrapper Mode (Default and Recommended) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Internally, the workflow modules originally in charge of executing the script/application (:mod:`~DIRAC.Workflow.Modules.Script`) check whether the workload should be -sent to a remote location before acting. -:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` attempts to extract the value from the environment variable initialized by the :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper`. -If the variable is not set, then the application is run locally via ``systemCall()``, else the application is submitted to a remote Computing Element such as ARC. -:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` wraps the script/application command in an executable, gets all the files of the working directory that correspond to input files and submits the executable along with the input files. It gets the status of the application submitted every 2 minutes until it is finished and finally gets the outputs. +The **JobWrapper** mode is the default and recommended submission method, due to its reliability and efficiency. The workflow for this mode includes: -What if the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` is suddenly stopped while processing jobs? Jobs would be declared as ``Stalled``. Administrators would have to manually clean up input directories (by default, they should be located in ``/opt/dirac/runit/WorkloadManagement/PushJobAgent/``). -Administrators may also have to kill processes related to the execution of the jobs: ``dirac-jobexec``. +1. **Job Retrieval**: Fetch a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service. +2. **Pre-processing**: Pre-process the job by fetching the input sandbox and any necessary data. +3. **Template Generation**: Create a :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperOfflineTemplate` designed to execute the job’s payload. +4. **Submission**: Submit the generated `JobWrapperOfflineTemplate` along with the inputs to the target Computing Element (CE). +5. **Monitoring**: Continuously monitor the status of the submitted jobs until they are completed. +6. **Output Retrieval**: Retrieve the outputs of the finished jobs from the target CE. +7. **Post-processing**: Conduct any necessary post-processing of the outputs. +Certainly! Here's an enhanced version of the reStructuredText (reST) content: + +.. warning:: The `JobWrapper` mode assumes that the job can execute without external connectivity. As an administrator, if any step of your job workflow requires external connectivity, it is crucial to review and adjust your logic accordingly. The :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator` can assist in this process. It enables you to define custom pre-processing and post-processing logic based on specific job and CE attributes. For more detailed information, refer to the :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator` documentation. + +.. image:: ../../_static/pja_jobwrapper_submission.png + :alt: PushJobAgent JobWrapper submission + :align: center + +Application Mode (Deprecated) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The **Application** mode is deprecated and slated for removal in future versions. It is considered less reliable due to higher memory consumption and sensitivity to CE-related issues, which restrict the number of jobs that can be processed concurrently. The workflow for this mode is as follows: + +1. **Job Retrieval**: Fetch a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service using the target CE attributes. +2. **Template Generation**: Generate a :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperTemplate`. +3. **Submission**: Submit the generated `JobWrapperTemplate` to a :mod:`~DIRAC.Resources.Computing.PoolComputingElement`. + - The submission includes an additional parameter in ``/LocalSite`` named ``RemoteExecution``, used to identify computing resources lacking external connectivity. + - The ``MaxJobsToSubmit`` setting defines the maximum number of jobs the agent can handle simultaneously. +4. **Execution**: The :mod:`~DIRAC.Resources.Computing.PoolComputingElement` executes the `JobWrapperTemplate` in a new process. +5. **Script Execution**: Within this context, the `JobWrapperTemplate` can only execute the :mod:`~DIRAC.WorkloadManagementSystem.scripts.dirac_jobexec` script in a new process. +6. **Workflow Module Processing**: Workflow modules responsible for script or application execution (:mod:`~DIRAC.Workflow.Modules.Script`) determine whether the payload needs to be offloaded to a remote location. +7. **Remote Execution**: :mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` checks for the environment variable initialized by the `JobWrapper`. + - If the variable is unset, the application runs locally via ``systemCall()``; otherwise, it is submitted to a remote CE such as ARC. + - `RemoteRunner` wraps the script or application command in an executable, gathers input files from the working directory, and submits these along with the executable to the remote CE. + - The status of the submitted application is monitored every 2 minutes until completion, after which the outputs are retrieved. + +.. warning:: If the `PushJobAgent` is interrupted while processing jobs, administrators must manually clean up input directories (usually located at ``/opt/dirac/runit/WorkloadManagement/PushJobAgent/``) and terminate any associated processes (e.g., ``dirac-jobexec``). + +.. image:: ../../_static/pja_application_submission.png + :alt: PushJobAgent Application submission + :align: center Multi-core/node allocations --------------------------- diff --git a/docs/source/_static/pja_application_submission.png b/docs/source/_static/pja_application_submission.png new file mode 100644 index 00000000000..488e715bb61 Binary files /dev/null and b/docs/source/_static/pja_application_submission.png differ diff --git a/docs/source/_static/pja_jobwrapper_submission.png b/docs/source/_static/pja_jobwrapper_submission.png new file mode 100644 index 00000000000..781ba9ac420 Binary files /dev/null and b/docs/source/_static/pja_jobwrapper_submission.png differ diff --git a/src/DIRAC/Core/Utilities/Proxy.py b/src/DIRAC/Core/Utilities/Proxy.py index c790803a4a4..ab8bbe158a1 100644 --- a/src/DIRAC/Core/Utilities/Proxy.py +++ b/src/DIRAC/Core/Utilities/Proxy.py @@ -34,6 +34,7 @@ def undecoratedFunction(foo='bar'): """ +import functools import os from DIRAC import gConfig, gLogger, S_ERROR, S_OK @@ -61,6 +62,7 @@ def executeWithUserProxy(fcn): :param bool executionLock: flag to execute with a lock for the time of user proxy application ( default False ) """ + @functools.wraps(fcn) def wrapped_fcn(*args, **kwargs): userName = kwargs.pop("proxyUserName", "") userDN = kwargs.pop("proxyUserDN", "") diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index 0ba9d3b68f0..4b8f99c3088 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -654,6 +654,10 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs= f"{jobReference} to CE {self.ceName}", ) + # Remove the bundled preamble + if self.preamble: + os.remove(executableFile) + if batchIDList: result = S_OK(batchIDList) result["PilotStampDict"] = stampDict diff --git a/src/DIRAC/Workflow/Modules/Script.py b/src/DIRAC/Workflow/Modules/Script.py index 9954ea2fe7e..651b942ef30 100644 --- a/src/DIRAC/Workflow/Modules/Script.py +++ b/src/DIRAC/Workflow/Modules/Script.py @@ -10,7 +10,6 @@ from DIRAC import gLogger, gConfig from DIRAC.Core.Utilities.Subprocess import systemCall -from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner from DIRAC.Workflow.Modules.ModuleBase import ModuleBase @@ -83,22 +82,13 @@ def _executeCommand(self): """execute the self.command (uses systemCall)""" failed = False - # Check whether the execution should be done remotely - if gConfig.getValue("/LocalSite/RemoteExecution", False): - remoteRunner = RemoteRunner( - gConfig.getValue("/LocalSite/Site"), - gConfig.getValue("/LocalSite/GridCE"), - gConfig.getValue("/LocalSite/CEQueue"), - ) - retVal = remoteRunner.execute(self.command) - else: - retVal = systemCall( - timeout=0, - cmdSeq=shlex.split(self.command), - env=self.environment, - callbackFunction=self.callbackFunction, - bufferLimit=self.bufferLimit, - ) + retVal = systemCall( + timeout=0, + cmdSeq=shlex.split(self.command), + env=self.environment, + callbackFunction=self.callbackFunction, + bufferLimit=self.bufferLimit, + ) if not retVal["OK"]: failed = True diff --git a/src/DIRAC/Workflow/Modules/UploadOutputs.py b/src/DIRAC/Workflow/Modules/UploadOutputs.py deleted file mode 100644 index a9d1dc199c1..00000000000 --- a/src/DIRAC/Workflow/Modules/UploadOutputs.py +++ /dev/null @@ -1,81 +0,0 @@ -# ##WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING # -# Under development # -# ##WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING###WARNING # - -""" Module to upload specified job output files according to the parameters - defined in the production workflow. -""" -from DIRAC import gLogger -from DIRAC.Workflow.Modules.ModuleBase import ModuleBase, GracefulTermination - - -class UploadOutputs(ModuleBase): - ############################################################################# - - def __init__(self): - """c'tor""" - self.log = gLogger.getSubLogger(self.__class__.__name__) - super().__init__(self.log) - - self.outputDataStep = "" - self.outputData = None - self.outputList = [] - self.defaultOutputSE = [] - self.outputSE = [] - self.outputPath = "" - - ############################################################################# - - def _resolveInputVariables(self): - """The module parameters are resolved here.""" - super()._resolveInputVariables() - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputData" in self.workflow_commons: - self.outputData = self.workflow_commons["OutputData"] - if isinstance(self.outputData, str): - self.outputData = [i.strip() for i in self.outputData.split(";")] - # if not present, we use the outputList, which is instead incrementally created based on the single step outputs - # This is more typical for production jobs, that can have many steps linked one after the other - elif "outputList" in self.workflow_commons: - self.outputList = self.workflow_commons["outputList"] - else: - raise GracefulTermination("Nothing to upload") - - # in case you want to put a mask on the steps - # TODO: add it to the DIRAC API - if "outputDataStep" in self.workflow_commons: - self.outputDataStep = self.workflow_commons["outputDataStep"] - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputSE" in self.workflow_commons: - specifiedSE = self.workflow_commons["OutputSE"] - if not isinstance(specifiedSE, list): - self.outputSE = [i.strip() for i in specifiedSE.split(";")] - else: - self.log.verbose(f"No OutputSE specified, using default value: {', '.join(self.defaultOutputSE)}") - - # this comes from Job().setOutputData(). Typical for user jobs - if "OutputPath" in self.workflow_commons: - self.outputPath = self.workflow_commons["OutputPath"] - - def _initialize(self): - """gets the files to upload, check if to upload""" - # lfnsList = self.__getOutputLFNs( self.outputData ) or outputList? - - if not self._checkWFAndStepStatus(): - raise GracefulTermination("No output data upload attempted") - - def __getOuputLFNs(self, outputList, *args): - """This is really VO-specific. - It should be replaced by each VO. Setting an LFN here just as an idea, and for testing purposes. - """ - lfnList = [] - for outputFile in outputList: - lfnList.append("/".join([str(x) for x in args]) + outputFile) - - return lfnList - - def _execute(self): - """uploads the files""" - pass diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py index 98d88b4a193..4e58214afac 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py @@ -9,19 +9,41 @@ """ +import hashlib +import json +import os +from pathlib import Path import random +import shutil import sys from collections import defaultdict +import time -from DIRAC import S_OK +from diraccfg import CFG + +from DIRAC import gConfig, S_OK, S_ERROR from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations -from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername +from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.Core.Utilities.Proxy import executeWithUserProxy +from DIRAC.Core.Utilities.Version import getVersion from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager -from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent -from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.Resources.Computing import ComputingElement +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus +from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + getJobWrapper, + processJobOutputs, + rescheduleFailedJob, + resolveInputData, + transferInputSandbox, +) +from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved +from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent +from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved @@ -48,13 +70,47 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None): self.failedQueueCycleFactor = 10 self.failedQueues = defaultdict(int) + # Clients to interact with DIRAC services + self.jobMonitoring = None + self.resourcesModule = None + self.opsHelper = None + + # Choose the submission policy + # - Application (deprecated): the agent will submit a workflow to a PoolCE, the workflow is responsible for + # interacting with the remote site + # - JobWrapper: the agent will submit a JobWrapper directly to the remote site, it is responsible of the + # remote execution + self.submissionPolicy = "JobWrapper" + + # Options related to the "JobWrapper" submission policy + # The location of Dirac on the subset of CVMFS, such as: /cvmfs//dirac/ + # To avoid having to manually update the version, we recommend to use a symlink + # pointing to the latest release/pre-release + self.cvmfsLocation = "/cvmfs/dirac.egi.eu/dirac/pro" + + # cleanTask is used to clean the task in the remote site + self.cleanTask = True + # The results of the payload execution will be stored in this file + self.payloadResultFile = "payloadResult.json" + # The results of the checksums will be stored in this file + self.checkSumResultsFile = "checksums.json" + def initialize(self): """Sets default parameters and creates CE instance""" super().initialize() - result = self._initializeComputingElement("Pool") - if not result["OK"]: - return result + self.jobMonitoring = JobMonitoringClient() + + # Get the submission policy + # Initialized here because it cannot be dynamically modified during the execution + self.submissionPolicy = self.am_getOption("SubmissionPolicy", self.submissionPolicy) + if self.submissionPolicy not in ["Application", "JobWrapper"]: + return S_ERROR("SubmissionPolicy must be either Workflow or JobWrapper") + + if self.submissionPolicy == "Application": + result = self._initializeComputingElement("Pool") + if not result["OK"]: + return result # on-the fly imports ol = ObjectLoader() @@ -79,11 +135,18 @@ def beginExecution(self): return result self.pilotDN, _ = result["Value"] - # Maximum number of jobs that can be handled at the same time by the agent + # (only for Application submission) Maximum number of jobs that can be handled at the same time by the agent self.maxJobsToSubmit = self.am_getOption("MaxJobsToSubmit", self.maxJobsToSubmit) - self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) + if self.submissionPolicy == "Application": + self.computingElement.setParameters({"NumberOfProcessors": self.maxJobsToSubmit}) self.failedQueueCycleFactor = self.am_getOption("FailedQueueCycleFactor", self.failedQueueCycleFactor) + self.cleanTask = self.am_getOption("CleanTask", self.cleanTask) + + # (only for JobWrapper submission) Get the location of the CVMFS installation + if self.submissionPolicy == "JobWrapper": + self.cvmfsLocation = self.am_getOption("CVMFSLocation", self.cvmfsLocation) + self.log.info("CVMFS location:", self.cvmfsLocation) # Get target queues from the configuration siteNames = None @@ -111,14 +174,32 @@ def beginExecution(self): self.queueDict = result["Value"] - if self.firstPass: - if self.queueDict: - self.log.always("Agent will serve queues:") - for queue in self.queueDict: - self.log.always( - "Site: %s, CE: %s, Queue: %s" - % (self.queueDict[queue]["Site"], self.queueDict[queue]["CEName"], queue) - ) + # Get the version: + # If the webapp is running on the same machine, the version is the one of the webapp so we should ignore it + versions = getVersion()["Value"]["Extensions"] + for extension, version in versions.items(): + if extension not in ["WebAppDIRAC", "DIRACWebAppResources"]: + self.version = version + break + + if not self.version: + self.log.error("Cannot get the version of the agent") + return S_ERROR("Cannot get the version of the agent") + + for queue in self.queueDict: + ce = self.queueDict[queue]["CE"] + architecture = f"Linux-{ce.ceParameters.get('architecture', 'x86_64')}" + diracInstallLocation = os.path.join(self.cvmfsLocation, architecture) + self.queueDict[queue]["ParametersDict"]["DIRACInstallLocation"] = diracInstallLocation + + if self.firstPass: + self.log.always( + "Will serve Site: %s, CE: %s, Queue: %s" + % (self.queueDict[queue]["Site"], self.queueDict[queue]["CEName"], queue) + ) + # Add preamble based on the CVMFS location to the CE + ce.preamble = f"source {os.path.join(diracInstallLocation, 'diracosrc')}" + self.firstPass = False return S_OK() @@ -129,32 +210,40 @@ def execute(self): queueDictItems = list(self.queueDict.items()) random.shuffle(queueDictItems) - # Check that there is enough slots locally - result = self._checkCEAvailability(self.computingElement) - if not result["OK"] or result["Value"]: - return result + if self.submissionPolicy == "Application": + # Check that there is enough slots locally + result = self._checkCEAvailability(self.computingElement) + if not result["OK"] or result["Value"]: + return result + # Check errors that could have occurred during job submission and/or execution + # Status are handled internally, and therefore, not checked outside of the method + result = self._checkSubmittedJobs() + if not result["OK"]: + return result - # Check errors that could have occurred during job submission and/or execution - # Status are handled internally, and therefore, not checked outside of the method - result = self._checkSubmittedJobs() + # Get a pilot proxy + cpuTime = 86400 * 3 + self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime)) + pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup") + result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime) if not result["OK"]: return result + pilotProxy = result["Value"] for queueName, queueDictionary in queueDictItems: # Make sure there is no problem with the queue before trying to submit if not self._allowedToSubmit(queueName): continue - # Get a working proxy + # Get the CE instance ce = queueDictionary["CE"] - cpuTime = 86400 * 3 - self.log.verbose("Getting pilot proxy", "for %s/%s %d long" % (self.pilotDN, self.vo, cpuTime)) - pilotGroup = Operations(vo=self.vo).getValue("Pilot/GenericPilotGroup") - result = gProxyManager.getPilotProxyFromDIRACGroup(self.pilotDN, pilotGroup, cpuTime) - if not result["OK"]: - return result - proxy = result["Value"] - ce.setProxy(proxy) + ce.setProxy(pilotProxy) + + if self.submissionPolicy == "JobWrapper": + # Check errors that could have occurred during job submission and/or execution + result = self._checkSubmittedJobWrappers(ce, queueDictionary["ParametersDict"]["Site"]) + if not result["OK"]: + self.failedQueues[queueName] += 1 # Check that there is enough slots in the remote CE to match a job result = self._checkCEAvailability(ce) @@ -227,14 +316,6 @@ def execute(self): status=JobStatus.MATCHED, minorStatus="Job Received by Agent", sendFlag=False ) - # Setup proxy - result_setupProxy = self._setupProxy(owner, jobGroup) - if not result_setupProxy["OK"]: - result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) - self.failedQueues[queueName] += 1 - break - proxyChain = result_setupProxy.get("Value") - # Check software and install them if required self.jobs[jobID]["JobReport"].setJobStatus(minorStatus="Installing Software", sendFlag=False) software = self._checkInstallSoftware(params, ceDict) @@ -247,29 +328,56 @@ def execute(self): self.failedQueues[queueName] += 1 break + self.jobs[jobID]["JobReport"].setJobParameter(par_name="GridCE", par_value=ce.ceName, sendFlag=False) + self.jobs[jobID]["JobReport"].setJobParameter(par_name="CEQueue", par_value=queueName, sendFlag=False) + # Submit the job to the CE self.log.debug(f"Before self._submitJob() ({self.ceName}CE)") - resultSubmission = self._submitJob( - jobID=jobID, - jobParams=params, - resourceParams=ceDict, - optimizerParams=optimizerParams, - proxyChain=proxyChain, - processors=submissionParams["processors"], - wholeNode=submissionParams["wholeNode"], - maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], - mpTag=submissionParams["mpTag"], - ) - if not result["OK"]: - result = self._rescheduleFailedJob(jobID, resultSubmission["Message"]) - self.failedQueues[queueName] += 1 - break + if self.submissionPolicy == "Application": + # Setup proxy + result_setupProxy = self._setupProxy(owner, jobGroup) + if not result_setupProxy["OK"]: + result = self._rescheduleFailedJob(jobID, result_setupProxy["Message"]) + self.failedQueues[queueName] += 1 + break + proxyChain = result_setupProxy.get("Value") + + resultSubmission = self._submitJob( + jobID=jobID, + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + proxyChain=proxyChain, + processors=submissionParams["processors"], + wholeNode=submissionParams["wholeNode"], + maxNumberOfProcessors=submissionParams["maxNumberOfProcessors"], + mpTag=submissionParams["mpTag"], + ) + if not result["OK"]: + self._rescheduleFailedJob(jobID, resultSubmission["Message"]) + self.failedQueues[queueName] += 1 + break + else: + resultSubmission = self._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation=queueDictionary["ParametersDict"]["DIRACInstallLocation"], + jobParams=params, + resourceParams=ceDict, + optimizerParams=optimizerParams, + processors=submissionParams["processors"], + ) + if not result["OK"]: + self.failedQueues[queueName] += 1 + break + self.log.debug(f"After {self.ceName}CE submitJob()") - # Check that there is enough slots locally - result = self._checkCEAvailability(self.computingElement) - if not result["OK"] or result["Value"]: - return result + if self.submissionPolicy == "Application": + # Check that there is enough slots locally + result = self._checkCEAvailability(self.computingElement) + if not result["OK"] or result["Value"]: + return result # Check that there is enough slots in the remote CE to match a new job result = self._checkCEAvailability(ce) @@ -297,7 +405,7 @@ def _buildQueueDict(self, siteNames, ces, ceTypes): :return: dictionary of queue parameters """ - result = self.resourcesModule.getQueues(community="", siteList=siteNames, ceList=ces, ceTypeList=ceTypes) + result = getQueues(community="", siteList=siteNames, ceList=ces, ceTypeList=ceTypes) if not result["OK"]: return result @@ -345,8 +453,7 @@ def _setCEDict(self, ceDict): if project: ceDict["ReleaseProject"] = project - # Add a RemoteExecution entry, which can be used in the next stages - ceDict["RemoteExecution"] = True + ceDict["SubmissionPolicy"] = self.submissionPolicy def _checkMatchingIssues(self, jobRequest): """Check the source of the matching issue @@ -362,3 +469,407 @@ def _checkMatchingIssues(self, jobRequest): self.log.notice("Failed to get jobs", jobRequest["Message"]) return S_OK() + + ############################################################################# + + @executeWithUserProxy + def preProcessJob(self, job: JobWrapper): + """Preprocess the job before submission: should be executed with the user proxy associated with the payload + + :param JobWrapper job: job to preprocess + """ + if "InputSandbox" in job.jobArgs: + self.log.verbose("Getting the inputSandbox of job", job.jobID) + if not transferInputSandbox(job, job.jobArgs["InputSandbox"]): + return S_ERROR(f"Cannot get input sandbox of job {job.jobID}") + job.jobReport.commit() + + if "InputData" in job.jobArgs and job.jobArgs["InputData"]: + self.log.verbose("Getting the inputData of job", job.jobID) + if not resolveInputData(job): + return S_ERROR(f"Cannot get input data of job {job.jobID}") + job.jobReport.commit() + + # Preprocess the payload + try: + self.log.verbose("Pre-processing job", job.jobID) + result = job.preProcess() + if not result["OK"]: + self.log.error("JobWrapper failed the preprocessing phase for", f"{job.jobID}: {result['Message']}") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(JobMinorStatus.JOB_WRAPPER_EXECUTION) + except Exception: + self.log.exception("JobWrapper failed the preprocessing phase for", job.jobID) + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return S_ERROR(f"JobWrapper failed the preprocessing phase for {job.jobID}") + + job.jobReport.commit() + return S_OK(result["Value"]) + + def _submitJobWrapper( + self, + jobID: str, + ce: ComputingElement, + diracInstallLocation: str, + jobParams: dict, + resourceParams: dict, + optimizerParams: dict, + processors: int, + ): + """Submit a JobWrapper to the remote site + + :param jobID: job ID + :param ce: ComputingElement instance + :param jobParams: job parameters + :param resourceParams: resource parameters + :param optimizerParams: optimizer parameters + :param owner: owner of the job + :param jobGroup: group of the job + :param processors: number of processors + + :return: S_OK + """ + jobReport = self.jobs[jobID]["JobReport"] + jobReport.commit() + + # Add the number of requested processors to the job environment + if "ExecutionEnvironment" in jobParams: + if isinstance(jobParams["ExecutionEnvironment"], str): + jobParams["ExecutionEnvironment"] = jobParams["ExecutionEnvironment"].split(";") + jobParams.setdefault("ExecutionEnvironment", []).append("DIRAC_JOB_PROCESSORS=%d" % processors) + + # Add necessary parameters to get the payload result and analyze its integrity + jobParams["PayloadResults"] = self.payloadResultFile + jobParams["Checksum"] = self.checkSumResultsFile + # The dirac-jobexec executable is available through CVMFS in the remote site + # So we need to change the path to the executable + if "Executable" in jobParams and jobParams["Executable"] == "dirac-jobexec": + jobParams["Executable"] = os.path.join(diracInstallLocation, "bin", "dirac-jobexec") + jobParams["Arguments"] += " --cfg=dirac.cfg" + + # Prepare the job for submission + self.log.verbose("Getting a JobWrapper") + arguments = {"Job": jobParams, "CE": resourceParams, "Optimizer": optimizerParams} + + job = getJobWrapper(int(jobID), arguments, jobReport) + if not job: + return S_ERROR(f"Cannot get a JobWrapper instance for job {jobID}") + + result = self.preProcessJob( # pylint: disable=unexpected-keyword-arg + job, proxyUserName=job.owner, proxyUserGroup=job.userGroup + ) + if not result["OK"]: + return result + payloadParams = result["Value"] + + # Dump the remote CFG config into the job directory: it is needed for the JobWrapperTemplate + cfgFilename = Path(job.jobIDPath) / "dirac.cfg" + gConfig.dumpRemoteCFGToFile(cfgFilename) + # ----------------------------------------------------------------------------------------------- + # Temporary hack: in v9.0, the DIRAC/Setup is meant to be removed from the configuration + # Until then, we need to set it manually + cfg = CFG().loadFromFile(cfgFilename) + cfg.setOption("DIRAC/Setup", gConfig.getOption("DIRAC/Setup", "")["Value"]) + cfg.writeToFile(cfgFilename) + # ----------------------------------------------------------------------------------------------- + + # Generate a light JobWrapper executor script + jobDesc = { + "jobID": jobID, + "jobParams": jobParams, + "resourceParams": resourceParams, + "optimizerParams": optimizerParams, + "payloadParams": payloadParams, + "extraOptions": self.extraOptions, + } + result = createJobWrapper( + log=self.log, + logLevel=self.logLevel, + cfgPath=cfgFilename.name, + defaultWrapperLocation="DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py", + pythonPath="python", + rootLocation=".", + **jobDesc, + ) + if not result["OK"]: + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return result + jobWrapperRunner = result["Value"]["JobExecutablePath"] + jobWrapperCode = result["Value"]["JobWrapperPath"] + jobWrapperConfig = result["Value"]["JobWrapperConfigPath"] + + # Get inputs from the JobWrapper working directory and add the JobWrapper deps + jobInputs = os.listdir(job.jobIDPath) + inputs = [os.path.join(job.jobIDPath, input) for input in jobInputs] + inputs.append(jobWrapperCode) + inputs.append(jobWrapperConfig) + self.log.verbose("The executable will be sent along with the following inputs:", ",".join(inputs)) + + # Request the whole directory as output + outputs = ["/"] + + self.log.info("Submitting JobWrapper", f"{os.path.basename(jobWrapperRunner)} to {ce.ceName}CE") + + # Submit the job + result = ce.submitJob( + executableFile=jobWrapperRunner, + proxy=None, + inputs=inputs, + outputs=outputs, + ) + if not result["OK"]: + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return result + + taskID = result["Value"][0] + stamp = result["PilotStampDict"][taskID] + self.log.info("Job being submitted", f"(DIRAC JobID: {jobID}; Task ID: {taskID})") + + jobReport.setJobParameter(par_name="TaskID", par_value=taskID, sendFlag=False) + jobReport.setJobParameter(par_name="Stamp", par_value=stamp, sendFlag=False) + jobReport.commit() + + time.sleep(self.jobSubmissionDelay) + return S_OK() + + ############################################################################# + + def _checkOutputIntegrity(self, workingDirectory: str): + """Make sure that output files are not corrupted. + + :param workingDirectory: path of the outputs + """ + checkSumOutput = os.path.join(workingDirectory, self.checkSumResultsFile) + if not os.path.exists(checkSumOutput): + return S_ERROR( + DErrno.EWMSRESC, f"Cannot guarantee the integrity of the outputs: {checkSumOutput} unavailable" + ) + + with open(checkSumOutput) as f: + checksums = json.load(f) + + # for each output file, compute the md5 checksum + for output, checksum in checksums.items(): + localOutput = os.path.join(workingDirectory, output) + if not os.path.exists(localOutput): + return S_ERROR(f"{localOutput} was expected but not found") + + with open(localOutput, "rb") as f: + digest = hashlib.file_digest(f, "sha256") + + if checksum != digest.hexdigest(): + return S_ERROR(f"{localOutput} is corrupted") + + return S_OK() + + @executeWithUserProxy + def postProcessJob(self, job: JobWrapper, payloadResults): + """Perform post-processing for a job: should be executed with the user proxy associated with the payload. + + :param job: JobWrapper instance + :param payloadResults: Payload results + """ + try: + result = job.postProcess(**payloadResults) + if not result["OK"]: + self.log.error("Failed to post process the job", f"{job.jobID}: {result['Message']}") + if result["Errno"] == DErrno.EWMSRESC: + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + return + + job.jobReport.setJobParameter("Error Message", result["Message"], sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + return + except Exception as exc: # pylint: disable=broad-except + self.log.exception("Job raised exception during post processing phase") + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + shutil.rmtree(job.jobIDPath) + return + + if "OutputSandbox" in job.jobArgs or "OutputData" in job.jobArgs: + self.log.verbose("Uploading the outputSandbox/Data of the job") + if not processJobOutputs(job): + shutil.rmtree(job.jobIDPath) + return + job.jobReport.commit() + + # Clean job wrapper locally + job.finalize() + + def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str): + """Check the status of the submitted tasks. + If the task is finished, get the output and post process the job. + Finally, remove from the submission dictionary. + + :return: S_OK/S_ERROR + """ + # Get the list of running jobs + if not (result := self.jobMonitoring.getJobs({"Status": JobStatus.RUNNING, "Site": site}))["OK"]: + self.log.error("Failed to get the list of running jobs", result["Message"]) + return result + + jobs = result["Value"] + if not jobs: + self.log.info("No running jobs") + return S_OK() + + # Get their parameters + if not (result := self.jobMonitoring.getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]: + self.log.error("Failed to get the list of taskIDs", result["Message"]) + return result + + # Filter the jobs that are running on the current CE + taskIDs = {} + for jobID, values in result["Value"].items(): + if "GridCE" not in values or values["GridCE"] != ce.ceName: + continue + if "TaskID" not in values: + continue + if "Stamp" not in values: + continue + taskIDs[values["TaskID"]] = {"JobID": jobID, "Stamp": values["Stamp"]} + + if not taskIDs: + self.log.info("No running jobs on the current CE") + return S_OK() + + # Get the status of the jobs + if not (result := ce.getJobStatus(list(taskIDs.keys())))["OK"]: + self.log.error("Failed to get job status", result["Message"]) + return result + + statusDict = result["Value"] + for taskID, status in statusDict.items(): + # Get the jobID and the stamp + jobID = taskIDs[taskID]["JobID"] + stamp = taskIDs[taskID]["Stamp"] + + # Check if the job is still running + if status not in PilotStatus.PILOT_FINAL_STATES: + self.log.info("Job not finished", f"(JobID: {jobID}, DIRAC taskID: {taskID}; Status: {status})") + continue + self.log.info("Job execution finished", f"(JobID: {jobID}, DIRAC taskID: {taskID}; Status: {status})") + + # Get the JDL of the job + if not (result := self.jobMonitoring.getJobJDL(int(jobID), False))["OK"]: + self.log.error("Failed to get the JDL of job", f"{jobID}: {result['Message']}") + continue + + if not (result := self._getJDLParameters(result["Value"]))["OK"]: + self.log.error("Failed to extract the JDL parameters of job", f"{jobID}: {result['Message']}") + continue + + # Get the job and ce parameters + jobParams = result["Value"] + + result = self._getCEDict(ce) + if not (result := self._getCEDict(ce))["OK"]: + self.log.error("Failed to get the CE parameters of job", f"{jobID}: {result['Message']}") + continue + ceDict = result["Value"][0] + ceDict["NumberOfProcessors"] = ce.ceParameters.get("NumberOfProcessors") + + self.log.verbose(f"Restoring the JobWrapper of job {jobID}") + arguments = {"Job": jobParams, "CE": ceDict, "Optimizer": {}} + + # Get the job wrapper + jobReport = JobReport(jobID, f"{self.__class__.__name__}@{self.siteName}") + try: + job = JobWrapper(int(jobID), jobReport) + job.initialize(arguments) + except Exception: + self.log.exception("JobWrapper failed the initialization phase", jobID) + continue + + # Get the output of the job + self.log.info(f"Getting the outputs of taskID {taskID} for {jobID}") + if not (result := ce.getJobOutput(f"{taskID}:::{stamp}", job.jobIDPath))["OK"]: + self.log.error("Failed to get the output of taskID", f"{taskID}: {result['Message']}") + continue + + # Make sure the output is correct + self.log.info(f"Checking the integrity of the outputs of {taskID} for {jobID}") + if not (result := self._checkOutputIntegrity(job.jobIDPath))["OK"]: + self.log.error( + "Failed to check the integrity of the output of taskID", f"{taskID}: {result['Message']}" + ) + if result["Errno"] == DErrno.EWMSRESC: + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + continue + self.log.info("The output has been retrieved and declared complete") + + with open(os.path.join(job.jobIDPath, self.payloadResultFile)) as f: + result = json.load(f) + + if not result["OK"]: + self.log.error("Failed to get the payload results of job", f"{jobID}: {result['Message']}") + self.log.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + shutil.rmtree(job.jobIDPath) + continue + + payloadResults = result["Value"] + self.postProcessJob( # pylint: disable=unexpected-keyword-arg + job, payloadResults, proxyUserName=job.owner, proxyUserGroup=job.userGroup + ) + + # Clean job in the remote resource + if self.cleanTask: + if not (result := ce.cleanJob(taskID))["OK"]: + self.log.warn("Failed to clean the output remotely", result["Message"]) + self.log.info(f"TaskID {taskID} has been remotely removed") + + return S_OK() + + def finalize(self): + """PushJob Agent finalization method""" + if self.submissionPolicy == "Application": + # wait for all jobs to be completed + res = self.computingElement.shutdown() + if not res["OK"]: + self.log.error("CE could not be properly shut down", res["Message"]) + + # Check the latest submitted jobs + while self.jobs: + result = self._checkSubmittedJobs() + if not result["OK"]: + self.log.error("Problem while trying to get status of the last submitted jobs") + break + time.sleep(int(self.am_getOption("PollingTime"))) + + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py index e1de80a21cd..43010883cf8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py @@ -2,14 +2,21 @@ """ # imports +import os +from pathlib import Path +import shutil +from unittest.mock import Mock import pytest from collections import defaultdict # DIRAC Components from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.WorkloadManagementSystem.Agent.PushJobAgent import PushJobAgent +from DIRAC.WorkloadManagementSystem.Agent.test.Test_Agent_SiteDirector import config -from DIRAC import gLogger, S_ERROR +from DIRAC import gLogger, S_OK, S_ERROR +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport gLogger.setLevel("DEBUG") @@ -49,15 +56,25 @@ def test__allowedToSubmit(mocker, queue, failedQueues, failedQueueCycleFactor, e @pytest.mark.parametrize( "ceDict, pilotVersion, pilotProject, expected", [ - ({}, None, None, {"RemoteExecution": True}), - ({}, "8.0.0", None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "RemoteExecution": True}), - ({}, ["8.0.0", "7.3.7"], None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "RemoteExecution": True}), - ({}, None, "Project", {"ReleaseProject": "Project", "RemoteExecution": True}), + ({}, None, None, {"SubmissionPolicy": "JobWrapper"}), + ({}, "8.0.0", None, {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "SubmissionPolicy": "JobWrapper"}), + ( + {}, + ["8.0.0", "7.3.7"], + None, + {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "SubmissionPolicy": "JobWrapper"}, + ), + ({}, None, "Project", {"ReleaseProject": "Project", "SubmissionPolicy": "JobWrapper"}), ( {}, "8.0.0", "Project", - {"DIRACVersion": "8.0.0", "ReleaseVersion": "8.0.0", "ReleaseProject": "Project", "RemoteExecution": True}, + { + "DIRACVersion": "8.0.0", + "ReleaseVersion": "8.0.0", + "ReleaseProject": "Project", + "SubmissionPolicy": "JobWrapper", + }, ), ], ) @@ -104,3 +121,310 @@ def test__checkMatchingIssues(mocker, issueMessage, expectedResult): result = jobAgent._checkMatchingIssues(S_ERROR(issueMessage)) assert result["OK"] == expectedResult + + +def test_execute_application_localCENotAvailable(config, mocker): + """Test when local CE is not available: it should not check submitted jobs and return an error message""" + + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, + create=True, + ) + + jobAgent = PushJobAgent("Test", "Test1") + jobAgent.submissionPolicy = "Application" + jobAgent.queueDict = jobAgent._buildQueueDict( + siteNames=["LCG.Site1.com", "LCG.Site2.site2"], ces=None, ceTypes=None + )["Value"] + + # Mock the CE availability + errorMessage = "CE Not Available" + checkCEAvailability = mocker.patch.object(jobAgent, "_checkCEAvailability", return_value=S_ERROR(errorMessage)) + + checkSubmittedJobs = mocker.patch.object(jobAgent, "_checkSubmittedJobs") + checkSubmittedJobWrappers = mocker.patch.object(jobAgent, "_checkSubmittedJobWrappers") + allowedToSubmit = mocker.patch.object(jobAgent, "_allowedToSubmit") + matchAJob = mocker.patch.object(jobAgent, "_matchAJob") + + # Initialize logger + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + # Initialize inner CE + jobAgent._initializeComputingElement("Pool") + + result = jobAgent.execute() + # Check the CE availability and submitted jobs + checkCEAvailability.assert_called() + checkSubmittedJobs.assert_not_called() + # Does not check if allowed to submit and does not match a job + allowedToSubmit.assert_not_called() + matchAJob.assert_not_called() + # This is not called because submission policy is Application + checkSubmittedJobWrappers.assert_not_called() + # Result should not be OK + assert not result["OK"], result + assert result["Message"] == errorMessage + + +@pytest.fixture +def jobID(): + jobID = "123" + Path(jobID).mkdir(parents=True, exist_ok=True) + + yield jobID + + shutil.rmtree(jobID) + + +def test_submitJobWrapper(mocker, jobID): + """Test JobAgent._submitJobWrapper()""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, + create=True, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") + + # Initialize PJA + jobAgent = PushJobAgent("Test", "Test1") + jobAgent.submissionPolicy = "JobWrapper" + jobAgent.queueDict = jobAgent._buildQueueDict( + siteNames=["LCG.Site1.com", "LCG.Site2.site2"], ces=None, ceTypes=None + )["Value"] + jobAgent.log = gLogger + jobAgent.log.setLevel("DEBUG") + + jobAgent.jobs[jobID] = {"JobReport": JobReport(jobID)} + jobParams = {} + + # Current working directory: it should not change + cwd = os.getcwd() + + # Mock the JobWrapper + # Create a mock JobWrapper instance + job = Mock() + job.sendJobAccounting = Mock() + + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities.JobWrapper", return_value=job) + + rescheduleValue = "valueProvingRescheduling" + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities.rescheduleFailedJob", + return_value=rescheduleValue, + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PushJobAgent.rescheduleFailedJob", return_value=rescheduleValue) + + # 1. getJobWrapper returns an error + job.initialize = Mock(side_effect=Exception("Error initializing JobWrapper")) + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == f"Cannot get a JobWrapper instance for job {jobID}" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with( + status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION + ) + + # 2. getJobWrapper returns a JobWrapper instance but fails to process input sandbox + jobParams = {"InputSandbox": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(side_effect=Exception("Error transferring input sandbox")) + + job.owner = None + job.userGroup = None + job.jobArgs = jobParams + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert "Cannot get input sandbox of job" in result["Message"] + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with( + status=rescheduleValue, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX + ) + + # 3. getJobWrapper returns a JobWrapper instance but fails to process input data + jobParams = {"InputSandbox": True, "InputData": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(side_effect=Exception("Error resolving input data")) + + job.owner = None + job.userGroup = None + job.jobArgs = jobParams + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert "Cannot get input data of job" in result["Message"] + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + + # 4. getJobWrapper returns a JobWrapper instance but fails to pre-process payload + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(side_effect=S_ERROR("Error pre-processing payload")) + + job.owner = None + job.userGroup = None + job.jobArgs = jobParams + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=jobAgent.queueDict["ce1.site2.com_condor"]["CE"], + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert "JobWrapper failed the preprocessing phase for" in result["Message"] + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + + # 5. getJobWrapper returns a JobWrapper instance but fails to submit the job + mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.Utils.createJobWrapper", return_value=S_OK({})) + mocker.patch("DIRAC.gConfig.getOption", return_value=S_OK("Setup")) + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + + jobID = jobID + job.jobIDPath = Path(jobID) + + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(return_value=S_OK()) + + job.owner = None + job.userGroup = None + job.jobArgs = jobParams + + ce = Mock() + ce.submitJob = Mock(return_value=S_ERROR("Error submitting job")) + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert not result["OK"], result + assert result["Message"] == "Error submitting job" + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_called_with(status=rescheduleValue, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + + # 6. getJobWrapper returns a JobWrapper instance and submits it successfully + mocker.patch("DIRAC.WorkloadManagementSystem.Utilities.Utils.createJobWrapper", return_value=S_OK({})) + jobParams = {"InputSandbox": True, "InputData": True, "Payload": True} + job.initialize = Mock() + + jobID = jobID + job.jobIDPath = Path(jobID) + + job.sendJobAccounting.reset_mock() + job.transferInputSandbox = Mock(return_value=S_OK()) + job.resolveInputData = Mock(return_value=S_OK()) + job.preProcess = Mock(return_value=S_OK()) + + job.owner = None + job.userGroup = None + job.jobArgs = jobParams + + ce = Mock() + ce.submitJob = Mock(return_value={"OK": True, "Value": ["456"], "PilotStampDict": {"456": "abcdef"}}) + + result = jobAgent._submitJobWrapper( + jobID=jobID, + ce=ce, + diracInstallLocation="diracInstallLocation", + jobParams=jobParams, + resourceParams={}, + optimizerParams={}, + processors=1, + ) + + assert "PayloadResults" in jobParams + assert jobParams["PayloadResults"] == jobAgent.payloadResultFile + assert "Checksum" in jobParams + assert jobParams["Checksum"] == jobAgent.checkSumResultsFile + + assert result["OK"], result + + assert os.getcwd() == cwd + + job.sendJobAccounting.assert_not_called() + shutil.rmtree("job") diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index 27b87ddacd1..3f2a8d2a8d0 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -321,6 +321,12 @@ Agents MaxJobsToSubmit = 100 # How many cycels to skip if queue is not working FailedQueueCycleFactor = 10 + # How the agent manages the submission of the jobs + SubmissionPolicy = JobWrapper + # The CVMFS location to be used for the job execution on the remote site + CVMFSLocation = "/cvmfs/dirac.egi.eu/dirac/pro" + # Clean the task after the job is done + CleanTask = True } ##END ##BEGIN StatesAccountingAgent diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py new file mode 100644 index 00000000000..f2b7a4f0020 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobExecutionCoordinator.py @@ -0,0 +1,73 @@ +from DIRAC import S_OK + + +class JobExecutionCoordinator: + """ + Job Execution Coordinator Class. + + This class serves as the base class for job execution coordinators, providing + the necessary methods for pre-processing and post-processing jobs before and after + their execution. + + Communities who need to implement specific workflows for job pre-processing + and post-processing in their Dirac extension should inherit from this class and + override the relevant methods with their custom implementations. + + The `JobExecutionCoordinator` class is primarily used by the `JobWrapper` to manage + the execution of jobs, ensuring that all necessary preparations are made before the + job starts, and that any required cleanup or data handling is performed after the + job completes. + + **Example Usage in your Extension:** + + .. code-block:: python + + from DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator import ( + JobExecutionCoordinator as DIRACJobExecutionCoordinator + ) + + class JobExecutionCoordinator(DIRACJobExecutionCoordinator): + def preProcess(self, job): + # Custom pre-processing code here + pass + + def postProcess(self, job): + # Custom post-processing code here + pass + + In this example, `JobExecutionCoordinator` inherits from `DiracJobExecutionCoordinator` + and provides custom implementations for the `preProcess` and `postProcess` methods. + + **Methods to Override:** + + - `preProcess(job)` + - `postProcess(job)` + """ + + def __init__(self, jobArgs: dict, ceArgs: dict) -> None: + """ + Initialize the job execution coordinator. + + :param jobArgs: The job arguments + :param ceArgs: The environment arguments + """ + self.jobArgs = jobArgs + self.ceArgs = ceArgs + + def preProcess(self, command: str, exeEnv: dict): + """ + Pre-process a job before executing it. + This should handle tasks like downloading inputs, preparing commands, etc. + + :param job: The job to be pre-processed + """ + return S_OK({"command": command, "env": exeEnv}) + + def postProcess(self): + """ + Post-process a job after executing it. + This should handle tasks like uploading outputs, checking results, etc. + + :param job: The job to be post-processed + """ + return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py index b4eebaae229..b8dc79c9ea6 100755 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py @@ -10,10 +10,12 @@ :caption: JobWrapper options """ +import contextlib import datetime import glob import json import os +from pathlib import Path import re import shutil import stat @@ -47,7 +49,6 @@ from DIRAC.Resources.Catalog.FileCatalog import FileCatalog from DIRAC.Resources.Catalog.PoolXMLFile import getGUID from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus -from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient @@ -59,7 +60,7 @@ class JobWrapper: """The only user of the JobWrapper is the JobWrapperTemplate""" ############################################################################# - def __init__(self, jobID=None, jobReport=None): + def __init__(self, jobID: int | None = None, jobReport: JobReport | None = None): """Standard constructor""" self.initialTiming = os.times() self.section = os.path.join(getSystemSection("WorkloadManagement/JobWrapper"), "JobWrapper") @@ -85,7 +86,21 @@ def __init__(self, jobID=None, jobReport=None): self.log.showHeaders(True) # self.root is the path the Wrapper is running at - self.root = os.getcwd() + self.root = Path.cwd() + # `self.jobIDPath` represents the directory path where the job is being executed. + # By default, it is set to `self.root`, which corresponds to the current directory, + # since the `jobID` is not yet assigned. In this scenario, the job runs directly in the current directory. + # + # This default behavior is particularly useful when the JobWrapper is initialized without a `jobID`, + # such as when it is transferred to a remote computing resource for execution. In these cases, + # the JobWrapper on the remote resource is initialized without a `jobID` because the current directory + # already corresponds to the job's directory, which was set up on the resource where the JobWrapper + # was originally created. + # + # However, if a `jobID` is provided (normal use case), `self.jobIDPath` is updated to `self.root/jobID`. + # This indicates that the job will be executed in a specific subdirectory named after the job ID, + # rather than directly in the root directory. + self.jobIDPath = self.root result = getCurrentVersion() if result["OK"]: self.diracVersion = result["Value"] @@ -95,8 +110,8 @@ def __init__(self, jobID=None, jobReport=None): if self.maxPeekLines < 0: self.maxPeekLines = 0 self.defaultCPUTime = gConfig.getValue(self.section + "/DefaultCPUTime", 600) - self.defaultOutputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out") - self.defaultErrorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err") + self.outputFile = gConfig.getValue(self.section + "/DefaultOutputFile", "std.out") + self.errorFile = gConfig.getValue(self.section + "/DefaultErrorFile", "std.err") self.diskSE = gConfig.getValue(self.section + "/DiskSE", ["-disk", "-DST", "-USER"]) self.tapeSE = gConfig.getValue(self.section + "/TapeSE", ["-tape", "-RDST", "-RAW"]) self.failoverRequestDelay = gConfig.getValue(self.section + "/FailoverRequestDelay", 45) @@ -173,6 +188,8 @@ def __init__(self, jobID=None, jobReport=None): self.optArgs = {} self.ceArgs = {} + self.jobExecutionCoordinator = None + # Store the result of the payload execution self.executionResults = {} @@ -192,6 +209,10 @@ def initialize(self, arguments): self.jobGroup = self.jobArgs.get("JobGroup", self.jobGroup) self.jobName = self.jobArgs.get("JobName", self.jobName) self.jobType = self.jobArgs.get("JobType", self.jobType) + # Prepare outputs + self.errorFile = self.jobArgs.get("StdError", self.errorFile) + self.outputFile = self.jobArgs.get("StdOutput", self.outputFile) + dataParam = self.jobArgs.get("InputData", []) if dataParam and not isinstance(dataParam, list): dataParam = [dataParam] @@ -209,25 +230,35 @@ def initialize(self, arguments): # Prepare the working directory, cd to there, and copying eventual extra arguments in it if self.jobID: - if os.path.exists(str(self.jobID)): - shutil.rmtree(str(self.jobID)) - os.mkdir(str(self.jobID)) - os.chdir(str(self.jobID)) + self.jobIDPath = self.root / str(self.jobID) + if self.jobIDPath.exists(): + shutil.rmtree(self.jobIDPath) + self.jobIDPath.mkdir() + extraOpts = self.jobArgs.get("ExtraOptions", "") if extraOpts and "dirac-jobexec" in self.jobArgs.get("Executable", "").strip(): - if os.path.exists(f"{self.root}/{extraOpts}"): - shutil.copyfile(f"{self.root}/{extraOpts}", extraOpts) + src = self.root / extraOpts + if os.path.exists(src): + shutil.copyfile(src, self.jobIDPath / extraOpts) else: self.log.info("JobID is not defined, running in current directory") - with open("job.info", "w") as infoFile: + with open(self.jobIDPath / "job.info", "w") as infoFile: infoFile.write(self.__dictAsInfoString(self.jobArgs, "/Job")) self.log.debug("Environment used") self.log.debug("================") self.log.debug(json.dumps(dict(os.environ), indent=4)) + # Load the Job Execution Coordinator: can be overriden by a specific implementation + result = ObjectLoader().loadObject( + "WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator", "JobExecutionCoordinator" + ) + if not result["OK"]: + return result + self.jobExecutionCoordinator = result["Value"](jobArgs=self.jobArgs, ceArgs=self.ceArgs) + ############################################################################# def __setInitialJobParameters(self): """Sets some initial job parameters""" @@ -263,7 +294,7 @@ def __dictAsInfoString(self, dData, infoString="", currentBase=""): ############################################################################# def __prepareCommand(self): """Prepare the command to be executed.""" - if not "Executable" in self.jobArgs: + if "Executable" not in self.jobArgs: self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.APP_NOT_FOUND, sendFlag=True) return S_ERROR(f"Job {self.jobID} has no specified executable") @@ -302,9 +333,9 @@ def __prepareCommand(self): configOptions += "-o /LocalSite/CEQueue=%s " % self.ceArgs.get( "Queue", gConfig.getValue("/LocalSite/CEQueue", "") ) - configOptions += "-o /LocalSite/RemoteExecution=%s " % self.ceArgs.get( - "RemoteExecution", gConfig.getValue("/LocalSite/RemoteExecution", False) - ) + submissionPolicy = self.ceArgs.get("SubmissionPolicy", gConfig.getValue("/LocalSite/SubmissionPolicy", "")) + if submissionPolicy == "Application": + configOptions += "-o /LocalSite/RemoteExecution=True " command = executable if jobArguments: @@ -354,26 +385,18 @@ def preProcess(self): command = result["Value"] self.log.verbose(f"Execution command: {command}") - # Prepare outputs - errorFile = self.jobArgs.get("StdError", self.defaultErrorFile) - outputFile = self.jobArgs.get("StdOutput", self.defaultOutputFile) - result = self.__prepareEnvironment() if not result["OK"]: return result exeEnv = result["Value"] - return S_OK( - { - "command": command, - "error": errorFile, - "output": outputFile, - "env": exeEnv, - } - ) + if not (result := self.jobExecutionCoordinator.preProcess(command, exeEnv))["OK"]: + self.log.error("Failed to pre-process the job", result["Message"]) + + return result ############################################################################# - def process(self, command: str, output: str, error: str, env: dict): + def process(self, command: str, env: dict): """This method calls the payload.""" self.log.info(f"Job Wrapper is starting the processing phase for job {self.jobID}") @@ -393,59 +416,62 @@ def process(self, command: str, output: str, error: str, env: dict): jobMemory = int(self.jobArgs["Memory"]) * 1024.0 * 1024.0 spObject = Subprocess(timeout=False, bufferLimit=int(self.bufferLimit)) - exeThread = ExecutionThread(spObject, command, self.maxPeekLines, output, error, env, self.executionResults) - exeThread.start() - payloadPID = None - for seconds in range(5, 40, 5): - time.sleep(seconds) - payloadPID = spObject.getChildPID() - if payloadPID: - self.__setJobParam("PayloadPID", payloadPID) - break - if not payloadPID: - return S_ERROR("Payload process could not start after 140 seconds") - - watchdog = Watchdog( - pid=self.currentPID, - exeThread=exeThread, - spObject=spObject, - jobCPUTime=jobCPUTime, - memoryLimit=jobMemory, - processors=self.numberOfProcessors, - jobArgs=self.jobArgs, - ) + with contextlib.chdir(self.jobIDPath): + exeThread = ExecutionThread( + spObject, command, self.maxPeekLines, self.outputFile, self.errorFile, env, self.executionResults + ) + exeThread.start() + payloadPID = None + for seconds in range(5, 40, 5): + time.sleep(seconds) + payloadPID = spObject.getChildPID() + if payloadPID: + self.__setJobParam("PayloadPID", payloadPID) + break + if not payloadPID: + return S_ERROR("Payload process could not start after 140 seconds") + + watchdog = Watchdog( + pid=self.currentPID, + exeThread=exeThread, + spObject=spObject, + jobCPUTime=jobCPUTime, + memoryLimit=jobMemory, + processors=self.numberOfProcessors, + jobArgs=self.jobArgs, + ) - self.log.verbose("Initializing Watchdog instance") - watchdog.initialize() - self.log.verbose("Calibrating Watchdog instance") - watchdog.calibrate() - # Do not kill Test jobs by CPU time - if self.jobArgs.get("JobType", "") == "Test": - watchdog.testCPUConsumed = False - - if "DisableCPUCheck" in self.jobArgs: - watchdog.testCPUConsumed = False - - # Disable checks if remote execution: do not need it as pre/post processing occurs locally - if self.ceArgs.get("RemoteExecution", False): - watchdog.testWallClock = False - watchdog.testDiskSpace = False - watchdog.testLoadAvg = False - watchdog.testCPUConsumed = False - watchdog.testCPULimit = False - watchdog.testMemoryLimit = False - watchdog.testTimeLeft = False - - if exeThread.is_alive(): - self.log.info("Application thread is started in Job Wrapper") - watchdog.run() - else: - self.log.warn("Application thread stopped very quickly...") + self.log.verbose("Initializing Watchdog instance") + watchdog.initialize() + self.log.verbose("Calibrating Watchdog instance") + watchdog.calibrate() + # Do not kill Test jobs by CPU time + if self.jobArgs.get("JobType", "") == "Test": + watchdog.testCPUConsumed = False + + if "DisableCPUCheck" in self.jobArgs: + watchdog.testCPUConsumed = False + + # Disable checks if remote execution: do not need it as pre/post processing occurs locally + if self.ceArgs.get("RemoteExecution", False): + watchdog.testWallClock = False + watchdog.testDiskSpace = False + watchdog.testLoadAvg = False + watchdog.testCPUConsumed = False + watchdog.testCPULimit = False + watchdog.testMemoryLimit = False + watchdog.testTimeLeft = False + + if exeThread.is_alive(): + self.log.info("Application thread is started in Job Wrapper") + watchdog.run() + else: + self.log.warn("Application thread stopped very quickly...") - if exeThread.is_alive(): - self.log.warn("Watchdog exited before completion of execution thread") - while exeThread.is_alive(): - time.sleep(5) + if exeThread.is_alive(): + self.log.warn("Watchdog exited before completion of execution thread") + while exeThread.is_alive(): + time.sleep(5) payloadResult = { "payloadStatus": None, @@ -488,7 +514,7 @@ def process(self, command: str, output: str, error: str, env: dict): ############################################################################# def postProcess( self, - payloadStatus: int, + payloadStatus: int | None, payloadOutput: str, payloadExecutorError: str, cpuTimeConsumed: list, @@ -514,9 +540,9 @@ def postProcess( applicationErrorStatus = payloadStatus self.__setJobParam("ApplicationError", applicationErrorStatus, sendFlag=True) - if cpuTimeConsumed: - cpuString = " ".join([f"{x:.2f}" for x in cpuTimeConsumed]) - self.log.info("CPU time consumed in JobWrapper process", cpuString) + # This might happen if process() and postProcess() are called on different machines + if cpuTimeConsumed and not self.executionResults.get("CPU"): + self.executionResults["CPU"] = cpuTimeConsumed if watchdogError: self.__report(status=JobStatus.FAILED, minorStatus=watchdogError, sendFlag=True) @@ -536,7 +562,7 @@ def postProcess( self.log.verbose(f"Execution thread status = {payloadStatus}") self.log.info("Checking directory contents after execution:") - res = systemCall(5, ["ls", "-al"]) + res = systemCall(5, ["ls", "-al", str(self.jobIDPath)]) if not res["OK"]: self.log.error("Failed to list the current directory", res["Message"]) elif res["Value"][0]: @@ -560,7 +586,10 @@ def postProcess( self.failedFlag = False self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.APP_SUCCESS, sendFlag=True) - return S_OK() + if not (result := self.jobExecutionCoordinator.postProcess())["OK"]: + self.log.error("Failed to post-process the job", result["Message"]) + + return result ############################################################################# def execute(self): @@ -575,8 +604,6 @@ def execute(self): result = self.process( command=payloadParams["command"], - output=payloadParams["output"], - error=payloadParams["error"], env=payloadParams["env"], ) if not result["OK"]: @@ -596,7 +623,7 @@ def execute(self): return S_OK() ############################################################################# - def __sendFinalStdOut(self, payloadOutput): + def __sendFinalStdOut(self, payloadOutput: str): """After the Watchdog process has finished, this function sends a final report to be presented in the StdOut in the web page via the heartbeat mechanism. @@ -668,11 +695,11 @@ def resolveInputData(self): msg = "Job Wrapper cannot resolve local replicas of input data with null job input data parameter " self.log.error(msg) return S_ERROR(msg) - else: - if isinstance(inputData, str): - inputData = [inputData] - lfns = [fname.replace("LFN:", "") for fname in inputData] - self.log.verbose("Job input data requirement is \n%s" % ",\n".join(lfns)) + + if isinstance(inputData, str): + inputData = [inputData] + lfns = [fname.replace("LFN:", "") for fname in inputData] + self.log.verbose("Job input data requirement is \n%s" % ",\n".join(lfns)) # Does this site have local SEs? - not failing if it doesn't if "LocalSE" in self.ceArgs: @@ -858,6 +885,8 @@ def processJobOutputs(self): outputSandbox = [outputSandbox] if outputSandbox: self.log.verbose(f"OutputSandbox files are: {', '.join(outputSandbox)}") + outputSandboxFiles = [str(self.jobIDPath / output) for output in outputSandbox] + outputData = self.jobArgs.get("OutputData", []) if outputData and isinstance(outputData, str): outputData = outputData.split(";") @@ -865,16 +894,14 @@ def processJobOutputs(self): self.log.verbose(f"OutputData files are: {', '.join(outputData)}") # First resolve any wildcards for output files and work out if any files are missing - resolvedSandbox = self.__resolveOutputSandboxFiles(outputSandbox) - if not resolvedSandbox["OK"]: - self.log.warn("Output sandbox file resolution failed:") - self.log.warn(resolvedSandbox["Message"]) - self.__report(status=JobStatus.FAILED, minorStatus=JobMinorStatus.RESOLVING_OUTPUT_SANDBOX) - - fileList = resolvedSandbox["Value"]["Files"] - missingFiles = resolvedSandbox["Value"]["Missing"] + resolvedSandbox = self.__resolveOutputSandboxFiles(outputSandboxFiles) + + fileList = resolvedSandbox["Files"] + missingFiles = resolvedSandbox["Missing"] if missingFiles: - self.jobReport.setJobParameter("OutputSandboxMissingFiles", ", ".join(missingFiles), sendFlag=False) + self.jobReport.setJobParameter( + "OutputSandboxMissingFiles", ", ".join([Path(output).name for output in missingFiles]), sendFlag=False + ) if "Owner" not in self.jobArgs: msg = "Job has no owner specified" @@ -886,6 +913,7 @@ def processJobOutputs(self): self.__report(status=JobStatus.COMPLETING, minorStatus=JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) uploadOutputDataInAnyCase = False + result_sbUpload = None if fileList and self.jobID: self.outputSandboxSize = getGlobbedTotalSize(fileList) @@ -938,7 +966,7 @@ def processJobOutputs(self): # now that we (tried to) transfer the output files, # including possibly oversized Output Sandboxes, # we delete the local output sandbox tarfile in case it's still there. - if not result_sbUpload["OK"]: + if result_sbUpload and not result_sbUpload["OK"]: outputSandboxData = result_sbUpload.get("SandboxFileName") if outputSandboxData: try: @@ -987,8 +1015,7 @@ def __resolveOutputSandboxFiles(self, outputSandbox): if i not in missing: missing.append(i) - result = {"Missing": missing, "Files": okFiles} - return S_OK(result) + return {"Missing": missing, "Files": okFiles} ############################################################################# def __transferOutputDataFiles(self, outputData, outputSE, outputPath): @@ -1009,6 +1036,7 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath): nonlfnList.append(out) # Check whether list of outputData has a globbable pattern + nonlfnList = [str(self.jobIDPath / x) for x in nonlfnList] globbedOutputList = List.uniqueElements(getGlobbedFiles(nonlfnList)) if globbedOutputList != nonlfnList and globbedOutputList: self.log.info( @@ -1018,7 +1046,7 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath): outputData = lfnList + nonlfnList pfnGUID = {} - result = getGUID(outputData) + result = getGUID(outputData, str(self.jobIDPath.absolute())) if not result["OK"]: self.log.warn( "Failed to determine POOL GUID(s) for output file list (OK if not POOL files)", result["Message"] @@ -1035,9 +1063,9 @@ def __transferOutputDataFiles(self, outputData, outputSE, outputPath): # # file size localfileSize = getGlobbedTotalSize(localfile) - self.outputDataSize += getGlobbedTotalSize(localfile) + self.outputDataSize += localfileSize - outputFilePath = os.path.join(os.getcwd(), localfile) + outputFilePath = os.path.abspath(localfile) # # file GUID fileGUID = pfnGUID[localfile] if localfile in pfnGUID else None @@ -1155,7 +1183,6 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""): """Provides a generic convention for VO output data files if no path is specified. """ - if not re.search("^LFN:", outputFile): localfile = outputFile initial = self.owner[:1] @@ -1165,7 +1192,7 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""): ops = Operations(vo=vo) user_prefix = ops.getValue("LFNUserPrefix", "user") - basePath = "/" + vo + "/" + user_prefix + "/" + initial + "/" + self.owner + basePath = Path(f"/{vo}") / user_prefix / initial / self.owner if outputPath: # If output path is given, append it to the user path and put output files in this directory if outputPath.startswith("/"): @@ -1174,10 +1201,10 @@ def __getLFNfromOutputFile(self, outputFile, outputPath=""): # By default the output path is constructed from the job id subdir = str(int(self.jobID / 1000)) outputPath = subdir + "/" + str(self.jobID) - lfn = os.path.join(basePath, outputPath, os.path.basename(localfile)) + lfn = str(basePath / outputPath / os.path.basename(localfile)) else: # if LFN is given, take it as it is - localfile = os.path.basename(outputFile.replace("LFN:", "")) + localfile = str(self.jobIDPath / outputFile.replace("LFN:", "")) lfn = outputFile.replace("LFN:", "") return (lfn, localfile) @@ -1201,19 +1228,19 @@ def transferInputSandbox(self, inputSandbox): sandboxFiles.append(os.path.basename(isb)) self.log.info(f"Downloading InputSandbox for job {self.jobID}: {', '.join(sandboxFiles)}") - if os.path.exists(f"{self.root}/inputsandbox"): + if (self.root / "inputsandbox").exists(): # This is a debugging tool, get the file from local storage to debug Job Wrapper sandboxFiles.append("jobDescription.xml") for inputFile in sandboxFiles: - if os.path.exists(f"{self.root}/inputsandbox/{inputFile}"): + if (self.root / "inputsandbox" / inputFile).exists(): self.log.info(f"Getting InputSandbox file {inputFile} from local directory for testing") - shutil.copy(self.root + "/inputsandbox/" + inputFile, inputFile) + shutil.copy(self.root / "inputsandbox" / inputFile, self.jobIDPath / inputFile) result = S_OK(sandboxFiles) else: if registeredISB: for isb in registeredISB: self.log.info(f"Downloading Input SandBox {isb}") - result = SandboxStoreClient().downloadSandbox(isb) + result = SandboxStoreClient().downloadSandbox(isb, destinationDir=str(self.jobIDPath)) if not result["OK"]: self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX) return S_ERROR(f"Cannot download Input sandbox {isb}: {result['Message']}") @@ -1224,7 +1251,7 @@ def transferInputSandbox(self, inputSandbox): self.log.info("Downloading Input SandBox LFNs, number of files to get", len(lfns)) self.__report(minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX_LFN) lfns = [fname.replace("LFN:", "").replace("lfn:", "") for fname in lfns] - download = self.dm.getFile(lfns) + download = self.dm.getFile(lfns, destinationDir=str(self.jobIDPath)) if not download["OK"]: self.log.warn(download) self.__report(minorStatus=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX_LFN) @@ -1235,10 +1262,12 @@ def transferInputSandbox(self, inputSandbox): self.log.warn(failed) return S_ERROR(str(failed)) for lfn in lfns: - if os.path.exists(f"{self.root}/{os.path.basename(download['Value']['Successful'][lfn])}"): + if (self.root / os.path.basename(download["Value"]["Successful"][lfn])).exists(): sandboxFiles.append(os.path.basename(download["Value"]["Successful"][lfn])) - userFiles = sandboxFiles + [os.path.basename(lfn) for lfn in lfns] + userFiles = [str(self.jobIDPath / file) for file in sandboxFiles] + [ + str(self.jobIDPath / os.path.basename(lfn)) for lfn in lfns + ] for possibleTarFile in userFiles: if not os.path.exists(possibleTarFile): continue @@ -1247,7 +1276,7 @@ def transferInputSandbox(self, inputSandbox): self.log.info(f"Unpacking input sandbox file {possibleTarFile}") with tarfile.open(possibleTarFile, "r") as tarFile: for member in tarFile.getmembers(): - tarFile.extract(member, os.getcwd()) + tarFile.extract(member, self.jobIDPath) except Exception as x: return S_ERROR(f"Could not untar {possibleTarFile} with exception {str(x)}") @@ -1354,7 +1383,7 @@ def sendJobAccounting(self, status="", minorStatus=""): except ValueError: execTime = 0 - diskSpaceConsumed = getGlobbedTotalSize(os.path.join(self.root, str(self.jobID))) + diskSpaceConsumed = getGlobbedTotalSize(str(self.jobIDPath)) # Fill the data acData = { "User": self.owner, @@ -1469,7 +1498,7 @@ def sendFailoverRequest(self): ############################################################################# def __getRequestFiles(self): """Simple wrapper to return the list of request files.""" - return glob.glob("*_request.json") + return glob.glob(str(self.jobIDPath / "*_request.json")) ############################################################################# def __cleanUp(self): @@ -1482,11 +1511,10 @@ def __cleanUp(self): else: cleanUp = True - os.chdir(self.root) if cleanUp: self.log.verbose("Cleaning up job working directory") - if os.path.exists(str(self.jobID)): - shutil.rmtree(str(self.jobID)) + if self.root != self.jobIDPath and self.jobIDPath.exists(): + shutil.rmtree(self.jobIDPath) ############################################################################# def __report(self, status="", minorStatus="", sendFlag=False): @@ -1596,40 +1624,3 @@ def getOutput(self, lines=0): self.outputLines = self.outputLines[cut:] return S_OK(self.outputLines) return S_ERROR("No Job output found") - - -def rescheduleFailedJob(jobID, minorStatus, jobReport=None): - """Function for rescheduling a jobID, setting a minorStatus""" - - rescheduleResult = JobStatus.RESCHEDULED - - try: - gLogger.warn("Failure during", minorStatus) - - # Setting a job parameter does not help since the job will be rescheduled, - # instead set the status with the cause and then another status showing the - # reschedule operation. - - if not jobReport: - gLogger.info("Creating a new JobReport Object") - jobReport = JobReport(int(jobID), "JobWrapper") - - jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) - jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) - - # We must send Job States and Parameters before it gets reschedule - jobReport.sendStoredStatusInfo() - jobReport.sendStoredJobParameters() - - gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") - - result = JobManagerClient().rescheduleJob(int(jobID)) - if not result["OK"]: - gLogger.warn(result["Message"]) - if "Maximum number of reschedulings is reached" in result["Message"]: - rescheduleResult = JobStatus.FAILED - - return rescheduleResult - except Exception: - gLogger.exception("JobWrapperTemplate failed to reschedule Job") - return JobStatus.FAILED diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py index 4037f782cd9..bd2a5d297be 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperOfflineTemplate.py @@ -24,17 +24,17 @@ os.umask(0o22) -def execute(jobID: str, arguments: dict): +def execute(arguments: dict): """The only real function executed here""" payloadParams = arguments.pop("Payload", {}) if not payloadParams: return 1 - if not "PayloadResults" in arguments["Job"] or not "Checksum" in arguments["Job"]: + if "PayloadResults" not in arguments["Job"] or "Checksum" not in arguments["Job"]: return 1 try: - job = JobWrapper(jobID) + job = JobWrapper() job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR except Exception as exc: # pylint: disable=broad-except gLogger.exception("JobWrapper failed the initialization phase", lException=exc) @@ -76,11 +76,9 @@ def execute(jobID: str, arguments: dict): if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - jobID = jobArgs["Job"].get("JobID", 0) - jobID = int(jobID) - - ret = execute(jobID, jobArgs) -except Exception as exc: # pylint: disable=broad-except + ret = execute(jobArgs) +except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") + ret = -1 sys.exit(ret) diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py index 3198a879a65..94c9b2b8ae6 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py @@ -14,153 +14,55 @@ import sys import json import os -import errno -import time -import signal -sitePython = "@SITEPYTHON@" +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import ( + createAndEnterWorkingDirectory, + executePayload, + finalize, + getJobWrapper, + processJobOutputs, + resolveInputData, + transferInputSandbox, +) + +sitePython = os.path.realpath("@SITEPYTHON@") if sitePython: - sys.path.insert(0, "@SITEPYTHON@") + sys.path.insert(0, sitePython) from DIRAC.Core.Base.Script import Script Script.parseCommandLine() from DIRAC import gLogger -from DIRAC.Core.Utilities import DErrno - -from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper, rescheduleFailedJob from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport -from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus - - -gJobReport = None os.umask(0o22) -class JobWrapperError(Exception): - """Custom exception for handling JobWrapper "genuine" errors""" - - def __init__(self, value): - self.value = value - super().__init__() - - def __str__(self): - return str(self.value) - - -def killJobWrapper(job): - """Function that stops and ultimately kills the JobWrapper""" - # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process - time.sleep(60) - os.kill(job.currentPID, signal.SIGTERM) - # wait for half a minute and if worker is still alive use REAL silencer - time.sleep(30) - # now you're dead - os.kill(job.currentPID, signal.SIGKILL) - return 1 - - -def sendJobAccounting(job, status, minorStatus): - """safe sending job accounting (always catching exceptions)""" - try: - job.sendJobAccounting(status, minorStatus) - except Exception as exc: # pylint: disable=broad-except - gLogger.exception( - f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", - lException=exc, - ) - - -def execute(arguments): +def execute(jobID: int, arguments: dict, jobReport: JobReport): """The only real function executed here""" - global gJobReport - - jobID = arguments["Job"].get("JobID", 0) - os.environ["JOBID"] = str(jobID) - jobID = int(jobID) - if "WorkingDirectory" in arguments: - wdir = os.path.expandvars(arguments["WorkingDirectory"]) - if os.path.isdir(wdir): - os.chdir(wdir) - else: - try: - os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) - if os.path.isdir(wdir): - os.chdir(wdir) - except OSError as osError: - if osError.errno == errno.EEXIST and os.path.isdir(wdir): - gLogger.exception("JobWrapperTemplate found that the working directory already exists") - rescheduleResult = rescheduleFailedJob(jobID, "Working Directory already exists") - else: - gLogger.exception("JobWrapperTemplate could not create working directory") - rescheduleResult = rescheduleFailedJob(jobID, "Could Not Create Working Directory") - return 1 - - gJobReport = JobReport(jobID, "JobWrapper") + if not createAndEnterWorkingDirectory(jobID, arguments["WorkingDirectory"], jobReport): + return 1 - try: - job = JobWrapper(jobID, gJobReport) - job.initialize(arguments) # initialize doesn't return S_OK/S_ERROR - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper failed the initialization phase", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + job = getJobWrapper(jobID, arguments, jobReport) + if not job: return 1 if "InputSandbox" in arguments["Job"]: - gJobReport.commit() - try: - result = job.transferInputSandbox(arguments["Job"]["InputSandbox"]) - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to download input sandbox") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while downloading input sandbox", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + jobReport.commit() + if not transferInputSandbox(job, arguments["Job"]["InputSandbox"]): return 1 else: gLogger.verbose("Job has no InputSandbox requirement") - gJobReport.commit() + jobReport.commit() if "InputData" in arguments["Job"]: if arguments["Job"]["InputData"]: - try: - result = job.resolveInputData() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError: - gLogger.exception("JobWrapper failed to resolve input data") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while resolving input data", lException=exc) - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + if not resolveInputData(job): return 1 else: gLogger.verbose("Job has a null InputData requirement:") @@ -168,75 +70,18 @@ def execute(arguments): else: gLogger.verbose("Job has no InputData requirement") - gJobReport.commit() + jobReport.commit() - try: - result = job.execute() - if not result["OK"]: - gLogger.error("Failed to execute job", result["Message"]) - raise JobWrapperError((result["Message"], result["Errno"])) - except JobWrapperError as exc: - if exc.value[1] == 0 or str(exc.value[0]) == "0": - gLogger.verbose("JobWrapper exited with status=0 after execution") - if exc.value[1] == DErrno.EWMSRESC: - gLogger.warn("Asked to reschedule job") - rescheduleResult = rescheduleFailedJob( - jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=gJobReport - ) - job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) - return 1 - gLogger.exception("Job failed in execution phase") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) - return 1 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("Job raised exception during execution phase", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + if not executePayload(job): return 1 if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]: - try: - result = job.processJobOutputs() - if not result["OK"]: - gLogger.warn(result["Message"]) - raise JobWrapperError(result["Message"]) - except JobWrapperError as exc: - gLogger.exception("JobWrapper failed to process output files") - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) - - return 2 - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception while processing output files", lException=exc) - gJobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) - gJobReport.setJobStatus( - status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, sendFlag=False - ) - job.sendFailoverRequest() - job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + if not processJobOutputs(job): return 2 else: gLogger.verbose("Job has no OutputData or OutputSandbox requirement") - try: - # Failed jobs will return !=0 / successful jobs will return 0 - return job.finalize() - except Exception as exc: # pylint: disable=broad-except - gLogger.exception("JobWrapper raised exception during the finalization phase", lException=exc) - return 2 + return finalize(job) ########################################################## @@ -251,14 +96,19 @@ def execute(arguments): raise TypeError(f"jobArgs is of type {type(jobArgs)}") if "Job" not in jobArgs: raise ValueError(f"jobArgs does not contain 'Job' key: {str(jobArgs)}") - ret = execute(jobArgs) - gJobReport.commit() -except Exception as exc: # pylint: disable=broad-except + + jobID = jobArgs["Job"].get("JobID", 0) + jobID = int(jobID) + jobReport = JobReport(jobID, "JobWrapper") + + ret = execute(jobID, jobArgs, jobReport) + jobReport.commit() +except Exception: # pylint: disable=broad-except gLogger.exception("JobWrapperTemplate exception") try: - gJobReport.commit() + jobReport.commit() ret = -1 - except Exception as exc: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except gLogger.exception("Could not commit the job report") ret = -2 diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py new file mode 100644 index 00000000000..e3c7f971872 --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperUtilities.py @@ -0,0 +1,237 @@ +"""JobWrapperUtilities + +This module contains the functions that are used by the JobWrapperTemplate to execute the JobWrapper. +""" +import errno +import os +import signal +import time + +from DIRAC import gLogger +from DIRAC.Core.Utilities import DErrno +from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport +from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper + + +class JobWrapperError(Exception): + """Custom exception for handling JobWrapper "genuine" errors""" + + def __init__(self, value): + self.value = value + super().__init__() + + def __str__(self): + return str(self.value) + + +def killJobWrapper(job: JobWrapper) -> int: + """Function that stops and ultimately kills the JobWrapper""" + # Giving the JobWrapper some time to complete possible tasks, then trying to kill the process + time.sleep(60) + os.kill(job.currentPID, signal.SIGTERM) + # wait for half a minute and if worker is still alive use REAL silencer + time.sleep(30) + # now you're dead + os.kill(job.currentPID, signal.SIGKILL) + return 1 + + +def rescheduleFailedJob(jobID: str, minorStatus: str, jobReport: JobReport): + """Function for rescheduling a jobID, setting a minorStatus""" + + rescheduleResult = JobStatus.RESCHEDULED + + try: + gLogger.warn("Failure during", minorStatus) + + # Setting a job parameter does not help since the job will be rescheduled, + # instead set the status with the cause and then another status showing the + # reschedule operation. + + jobReport.setApplicationStatus(f"Failed {minorStatus} ", sendFlag=False) + jobReport.setJobStatus(status=JobStatus.RESCHEDULED, minorStatus=minorStatus, sendFlag=False) + + # We must send Job States and Parameters before it gets reschedule + jobReport.sendStoredStatusInfo() + jobReport.sendStoredJobParameters() + + gLogger.info("Job will be rescheduled after exception during execution of the JobWrapper") + + result = JobManagerClient().rescheduleJob(int(jobID)) + if not result["OK"]: + gLogger.warn(result["Message"]) + if "Maximum number of reschedulings is reached" in result["Message"]: + rescheduleResult = JobStatus.FAILED + + return rescheduleResult + except Exception: + gLogger.exception("JobWrapperTemplate failed to reschedule Job") + return JobStatus.FAILED + + +def sendJobAccounting(job: JobWrapper, status: str, minorStatus: str): + """safe sending job accounting (always catching exceptions)""" + try: + job.sendJobAccounting(status, minorStatus) + except Exception: # pylint: disable=broad-except + gLogger.exception( + f"JobWrapper failed sending job accounting for [status:minorStatus] [{status}:{minorStatus}]", + ) + + +def createAndEnterWorkingDirectory(jobID: str, workingDirectory: str, jobReport: JobReport) -> bool: + """Create the working directory and change to it""" + wdir = os.path.expandvars(workingDirectory) + if os.path.isdir(wdir): + os.chdir(wdir) + return True + + try: + os.makedirs(wdir) # this will raise an exception if wdir already exists (which is ~OK) + if os.path.isdir(wdir): + os.chdir(wdir) + except OSError as osError: + if osError.errno == errno.EEXIST and os.path.isdir(wdir): + gLogger.exception("JobWrapperTemplate found that the working directory already exists") + rescheduleFailedJob(jobID, "Working Directory already exists", jobReport) + else: + gLogger.exception("JobWrapperTemplate could not create working directory") + rescheduleFailedJob(jobID, "Could Not Create Working Directory", jobReport) + return False + return True + + +def getJobWrapper(jobID: int, arguments: dict, jobReport: JobReport) -> JobWrapper: + """Create a JobWrapper instance""" + try: + job = JobWrapper(jobID, jobReport) + job.initialize(arguments) + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper failed the initialization phase") + rescheduleResult = rescheduleFailedJob( + jobID=jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION, jobReport=jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_INITIALIZATION) + return None + return job + + +def transferInputSandbox(job: JobWrapper, inputSandbox: list) -> bool: + """Transfer the input sandbox""" + try: + result = job.transferInputSandbox(inputSandbox) + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to download input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while downloading input sandbox") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + return False + return True + + +def resolveInputData(job: JobWrapper) -> bool: + """Resolve the input data""" + try: + result = job.resolveInputData() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to resolve input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while resolving input data") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.INPUT_DATA_RESOLUTION) + return False + return True + + +def processJobOutputs(job: JobWrapper) -> bool: + """Process the job outputs""" + try: + result = job.processJobOutputs() + if not result["OK"]: + gLogger.warn(result["Message"]) + raise JobWrapperError(result["Message"]) + except JobWrapperError: + gLogger.exception("JobWrapper failed to process output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception while processing output files") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.UPLOADING_JOB_OUTPUTS) + return False + return True + + +def finalize(job: JobWrapper) -> int: + """Finalize the job""" + try: + # Failed jobs will return !=0 / successful jobs will return 0 + return job.finalize() + except Exception: # pylint: disable=broad-except + gLogger.exception("JobWrapper raised exception during the finalization phase") + return 2 + + +def executePayload(job: JobWrapper) -> bool: + """Execute the payload""" + try: + result = job.execute() + if not result["OK"]: + gLogger.error("Failed to execute job", result["Message"]) + raise JobWrapperError((result["Message"], result["Errno"])) + except JobWrapperError as exc: + if exc.value[1] == 0 or str(exc.value[0]) == "0": + gLogger.verbose("JobWrapper exited with status=0 after execution") + if exc.value[1] == DErrno.EWMSRESC: + gLogger.warn("Asked to reschedule job") + rescheduleResult = rescheduleFailedJob( + jobID=job.jobID, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION, jobReport=job.jobReport + ) + job.sendJobAccounting(status=rescheduleResult, minorStatus=JobMinorStatus.JOB_WRAPPER_EXECUTION) + return False + gLogger.exception("Job failed in execution phase") + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + except Exception as exc: # pylint: disable=broad-except + gLogger.exception("Job raised exception during execution phase") + job.jobReport.setJobParameter("Error Message", repr(exc), sendFlag=False) + job.jobReport.setJobStatus( + status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC, sendFlag=False + ) + job.sendFailoverRequest() + job.sendJobAccounting(status=JobStatus.FAILED, minorStatus=JobMinorStatus.EXCEPTION_DURING_EXEC) + return False + return True diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py index 5fbe5faa0c9..d9679d40627 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapper.py @@ -1,6 +1,7 @@ """ Test class for JobWrapper """ import os +from pathlib import Path import shutil import tempfile import time @@ -9,12 +10,13 @@ import pytest import DIRAC -from DIRAC import gLogger +from DIRAC import gLogger, S_OK from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.ReturnValues import S_ERROR from DIRAC.DataManagementSystem.Client.test.mock_DM import dm_mock from DIRAC.Resources.Catalog.test.mock_FC import fc_mock from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus +from DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator import JobExecutionCoordinator from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper getSystemSectionMock = MagicMock() @@ -22,108 +24,125 @@ gLogger.setLevel("DEBUG") -# PreProcess method +# ------------------------------------------------------------------------------------------------- -def test_preProcess(mocker): - """Test the pre process method of the JobWrapper class.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") +@pytest.fixture +def setup_job_wrapper(mocker): + """Fixture to create a JobWrapper instance with a JobExecutionCoordinator.""" + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) - echoLocation = shutil.which("echo") - diracJobExecLocation = shutil.which("dirac-jobexec") + def _setup(jobArgs=None, ceArgs=None): + jw = JobWrapper() + if jobArgs: + jw.jobArgs = jobArgs + if ceArgs: + jw.ceArgs = ceArgs + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + return jw - # Test a simple command without argument - jw = JobWrapper() - jw.jobArgs = {"Executable": echoLocation} + return _setup + + +def test_preProcess_no_arguments(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: no arguments.""" + ls = shutil.which("ls") + jw = setup_job_wrapper({"Executable": ls}) result = jw.preProcess() assert result["OK"] - assert result["Value"]["command"] == echoLocation - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" + assert result["Value"]["command"] == ls assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command with arguments - jw = JobWrapper() - jw.jobArgs = {"Executable": echoLocation, "Arguments": "hello"} + +def test_preProcess_with_arguments(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: with arguments.""" + echoLocation = shutil.which("echo") + jw = setup_job_wrapper({"Executable": echoLocation, "Arguments": "hello"}) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command that is included in the PATH - jw = JobWrapper() - jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} + +def test_preProcess_command_in_PATH(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: command in PATH.""" + echoLocation = shutil.which("echo") + jw = setup_job_wrapper({"Executable": "echo", "Arguments": "hello"}) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command and specify outputs - jw = JobWrapper() - jw.jobArgs = {"Executable": "echo", "Arguments": "hello", "StdError": "error.log", "StdOutput": "output.log"} + +def test_preProcess_specify_outputs(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: specify outputs.""" + echoLocation = shutil.which("echo") + jw = setup_job_wrapper( + {"Executable": "echo", "Arguments": "hello", "StdError": "error.log", "StdOutput": "output.log"} + ) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "error.log" - assert result["Value"]["output"] == "output.log" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command and specify number of processors - jw = JobWrapper() - jw.jobArgs = {"Executable": "echo", "Arguments": "hello"} - jw.ceArgs = {"Processors": 2} + +def test_preProcess_specify_processors(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: specify number of processors.""" + echoLocation = shutil.which("echo") + jw = setup_job_wrapper({"Executable": "echo", "Arguments": "hello"}, {"Processors": 2}) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "2" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command with environment variable in the executable - jw = JobWrapper() - jw.jobArgs = {"Executable": "${CMD}", "Arguments": "hello"} +def test_preProcess_with_env_variable(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: with environment variable in the executable.""" + echoLocation = shutil.which("echo") os.environ["CMD"] = echoLocation + jw = setup_job_wrapper({"Executable": "${CMD}", "Arguments": "hello"}) result = jw.preProcess() assert result["OK"] assert result["Value"]["command"] == f"{echoLocation} hello" - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test a command with an empty executable - jw = JobWrapper() - jw.jobArgs = {} + +def test_preProcess_empty_executable(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: empty executable.""" + jw = setup_job_wrapper({}) + result = jw.preProcess() assert not result["OK"] assert result["Message"] == "Job 0 has no specified executable" - # Test a command with an executable that does not exist - jw = JobWrapper() - jw.jobArgs = {"Executable": "pippo"} + +def test_preProcess_nonexistent_executable(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: nonexistent executable.""" + jw = setup_job_wrapper({"Executable": "pippo"}) + result = jw.preProcess() assert not result["OK"] assert result["Message"] == f"Path to executable {os.getcwd()}/pippo not found" - # Test dirac-jobexec - jw = JobWrapper() - jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} + +def test_preProcess_dirac_jobexec(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: dirac-jobexec.""" + diracJobExecLocation = shutil.which("dirac-jobexec") + jw = setup_job_wrapper({"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"}) result = jw.preProcess() assert result["OK"] @@ -132,22 +151,23 @@ def test_preProcess(mocker): f"-o /LocalSite/Site={DIRAC.siteName()}", "-o /LocalSite/GridCE=", "-o /LocalSite/CEQueue=", - "-o /LocalSite/RemoteExecution=False", ] assert ( result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" ) - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" - # Test dirac-jobexec with arguments - jw = JobWrapper() - jw.jobArgs = {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"} - jw.ceArgs = {"GridCE": "CE", "Queue": "Queue", "RemoteExecution": True} - result = jw.preProcess() +def test_preProcess_dirac_jobexec_with_args(setup_job_wrapper): + """Test the pre process method of the JobWrapper class: dirac-jobexec with arguments.""" + diracJobExecLocation = shutil.which("dirac-jobexec") + jw = setup_job_wrapper( + {"Executable": "dirac-jobexec", "Arguments": "jobDescription.xml"}, + {"GridCE": "CE", "Queue": "Queue", "SubmissionPolicy": "Application"}, + ) + + result = jw.preProcess() assert result["OK"] expectedOptions = [ "-o /LocalSite/CPUNormalizationFactor=0.0", @@ -159,20 +179,20 @@ def test_preProcess(mocker): assert ( result["Value"]["command"].strip() == f"{diracJobExecLocation} jobDescription.xml {' '.join(expectedOptions)}" ) - assert result["Value"]["error"] == "std.err" - assert result["Value"]["output"] == "std.out" assert result["Value"]["env"]["DIRAC_PROCESSORS"] == "1" assert result["Value"]["env"]["DIRAC_WHOLENODE"] == "False" -# Process method +# ------------------------------------------------------------------------------------------------- @pytest.mark.slow def test_processSuccessfulCommand(mocker): """Test the process method of the JobWrapper class: most common scenario.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -180,10 +200,10 @@ def test_processSuccessfulCommand(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{os.path.dirname(os.path.abspath(__file__))}/script-long.sh", - output=std_out.name, - error=std_err.name, env={}, ) @@ -200,8 +220,10 @@ def test_processSuccessfulCommand(mocker): @pytest.mark.slow def test_processSuccessfulDiracJobExec(mocker): """Test the process method of the JobWrapper class: most common scenario with dirac-jobexec.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -210,10 +232,10 @@ def test_processSuccessfulDiracJobExec(mocker): with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: executable = shutil.which("dirac-jobexec") + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{executable} {os.path.dirname(os.path.abspath(__file__))}/jobDescription.xml --o /DIRAC/Setup=Test", - output=std_out.name, - error=std_err.name, env={}, ) @@ -226,8 +248,10 @@ def test_processSuccessfulDiracJobExec(mocker): @pytest.mark.slow def test_processFailedCommand(mocker): """Test the process method of the JobWrapper class: the command fails.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -235,10 +259,10 @@ def test_processFailedCommand(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: + jw.outputFile = std_out.name + jw.errorFile = std_err.name result = jw.process( command=f"{os.path.dirname(os.path.abspath(__file__))}/script-fail.sh", - output=std_out.name, - error=std_err.name, env={}, ) @@ -258,8 +282,10 @@ def test_processFailedCommand(mocker): @pytest.mark.slow def test_processFailedSubprocess(mocker): """Test the process method of the JobWrapper class: the subprocess fails.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.systemCall") mock_system_call.return_value = S_ERROR("Any problem") mock_system_call = mocker.patch("DIRAC.Core.Utilities.Subprocess.Subprocess.getChildPID") @@ -272,13 +298,15 @@ def test_processFailedSubprocess(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process("mock_command", std_out.name, std_err.name, {}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process("mock_command", {}) assert result["OK"] assert not result["Value"]["payloadStatus"] assert not result["Value"]["payloadOutput"] assert result["Value"]["payloadExecutorError"] == "Any problem" - assert result["Value"]["cpuTimeConsumed"][0] == 0.0 + assert round(result["Value"]["cpuTimeConsumed"][0], 1) == 0.0 assert not result["Value"]["watchdogError"] assert not result["Value"]["watchdogStats"] @@ -286,8 +314,10 @@ def test_processFailedSubprocess(mocker): @pytest.mark.slow def test_processQuickExecutionNoWatchdog(mocker): """Test the process method of the JobWrapper class: the payload is too fast to start the watchdog.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs = {"CPUTime": 100, "Memory": 1} @@ -295,13 +325,15 @@ def test_processQuickExecutionNoWatchdog(mocker): mocker.patch.object(jw, "_JobWrapper__setJobParam") with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process(command=f"echo hello", output=std_out.name, error=std_err.name, env={}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process(command="echo hello", env={}) assert result["OK"] assert result["Value"]["payloadStatus"] == 0 assert result["Value"]["payloadOutput"] == "hello" assert not result["Value"]["payloadExecutorError"] - assert result["Value"]["cpuTimeConsumed"][0] >= 0.0 + assert round(result["Value"]["cpuTimeConsumed"][0], 1) == 0.0 assert not result["Value"]["watchdogError"] assert not result["Value"]["watchdogStats"] @@ -309,8 +341,10 @@ def test_processQuickExecutionNoWatchdog(mocker): @pytest.mark.slow def test_processSubprocessFailureNoPid(mocker): """Test the process method of the JobWrapper class: the subprocess fails and no PID is returned.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) # Test failure in starting the payload process jw = JobWrapper() jw.jobArgs = {} @@ -322,18 +356,19 @@ def test_processSubprocessFailureNoPid(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ExecutionThread", return_value=mock_exeThread) with tempfile.NamedTemporaryFile(delete=True) as std_out, tempfile.NamedTemporaryFile(delete=True) as std_err: - result = jw.process(command=f"mock_command", output=std_out.name, error=std_err.name, env={}) + jw.outputFile = std_out.name + jw.errorFile = std_err.name + result = jw.process(command="mock_command", env={}) assert not result["OK"] assert "Payload process could not start after 140 seconds" in result["Message"] -# PostProcess method +# ------------------------------------------------------------------------------------------------- -def test_postProcess(mocker): - """Test the post process method of the JobWrapper class.""" - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") - # Mimic the behaviour of __report and __setJobParam to get the arguments passed to them +@pytest.fixture +def mock_report_and_set_param(mocker): + """Fixture to mock the __report and __setJobParam methods.""" report_args = [] set_param_args = [] @@ -343,8 +378,13 @@ def report_side_effect(*args, **kwargs): def set_param_side_effect(*args, **kwargs): set_param_args.append((args, kwargs)) - # Test when the payload finished successfully - jw = JobWrapper() + return report_args, set_param_args, report_side_effect, set_param_side_effect + + +def test_postProcess_payload_success(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: payload success.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -364,8 +404,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["status"] == JobStatus.COMPLETING assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS - # Test when the payload failed - jw = JobWrapper() + +def test_postProcess_payload_failed(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: payload failed.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -385,8 +428,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["status"] == JobStatus.COMPLETING assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_ERRORS - # Test when the payload failed: should be rescheduled - jw = JobWrapper() + +def test_postProcess_payload_failed_reschedule(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: payload failed and reschedule.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -408,8 +454,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-2]["minorStatus"] == JobMinorStatus.APP_ERRORS assert report_args[-1]["minorStatus"] == JobMinorStatus.GOING_RESCHEDULE - # Test when there is no output - jw = JobWrapper() + +def test_postProcess_no_output(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: no output generated.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -429,8 +478,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["status"] == JobStatus.COMPLETING assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_SUCCESS - # Test when there is a watchdog error - jw = JobWrapper() + +def test_postProcess_watchdog_error(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: watchdog error.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -450,8 +502,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["status"] == JobStatus.FAILED assert report_args[-1]["minorStatus"] == payloadResult["watchdogError"] - # Test when the executor failed: no status defined - jw = JobWrapper() + +def test_postProcess_executor_failed_no_status(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: executor failed and no status defined.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -471,8 +526,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_THREAD_FAILED assert set_param_args[-1][0][1] == "None reported" - # Test when the executor failed: status defined - jw = JobWrapper() + +def test_postProcess_executor_failed_status_defined(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: executor failed and status defined.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -493,8 +551,11 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_ERRORS assert set_param_args[-3][0][1] == 126 - # Test when the subprocess did not complete - jw = JobWrapper() + +def test_postProcess_subprocess_not_complete(setup_job_wrapper, mocker, mock_report_and_set_param): + """Test the postProcess method of the JobWrapper class: subprocess not complete.""" + jw = setup_job_wrapper() + report_args, set_param_args, report_side_effect, set_param_side_effect = mock_report_and_set_param mocker.patch.object(jw, "_JobWrapper__report", side_effect=report_side_effect) mocker.patch.object(jw, "_JobWrapper__setJobParam", side_effect=set_param_side_effect) @@ -515,7 +576,7 @@ def set_param_side_effect(*args, **kwargs): assert report_args[-1]["minorStatus"] == JobMinorStatus.APP_THREAD_NOT_COMPLETE -# Execute method +# ------------------------------------------------------------------------------------------------- @pytest.mark.slow @@ -544,18 +605,21 @@ def test_execute(mocker, executable, args, src, expectedResult): """Test the status of the job after JobWrapper.execute(). The returned value of JobWrapper.execute() is not checked as it can apparently be wrong depending on the shell used. """ - - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", return_value="Value") mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.Watchdog.getSystemInstance", return_value="Value") + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) if src: shutil.copy(os.path.join(src, executable), executable) jw = JobWrapper() jw.jobArgs = {"Executable": executable} + jw.jobExecutionCoordinator = JobExecutionCoordinator(None, None) + if args: jw.jobArgs["Arguments"] = args - res = jw.execute() + jw.execute() assert expectedResult in jw.jobReport.jobStatusInfo[-1] if src: @@ -565,11 +629,260 @@ def test_execute(mocker, executable, args, src, expectedResult): os.remove("std.out") -def test_InputData(mocker): +# ------------------------------------------------------------------------------------------------- + + +@pytest.fixture +def jobIDPath(): + """Return the path to the job ID file.""" + # Create a temporary directory named ./123/ + jobid = "123" + p = Path(jobid) + if p.exists(): + shutil.rmtree(jobid) + p.mkdir() + + # Output sandbox files + (p / "std.out").touch() + (p / "std.err").touch() + (p / "summary_123.xml").touch() + (p / "result_dir").mkdir() + (p / "result_dir" / "file1").touch() + # Output data files + (p / "00232454_00000244_1.sim").touch() + (p / "1720442808testFileUpload.txt").touch() + + with open(p / "pool_xml_catalog.xml", "w") as f: + f.write( + """ + + + + + + + + + + +""" + ) + + yield int(jobid) + + # Remove the temporary directory + shutil.rmtree(jobid) + + +@pytest.fixture +def setup_another_job_wrapper(mocker, jobIDPath): + """Fixture to create a JobWrapper instance with the jobIDPath.""" mocker.patch( "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock ) + jw = JobWrapper(jobIDPath) + jw.jobIDPath = Path(str(jobIDPath)) + jw.failedFlag = False + return jw + + +def test_processJobOutputs_no_output(setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: no output files.""" + jw = setup_another_job_wrapper + jw.jobArgs = { + "OutputSandbox": [], + "OutputData": [], + } + + result = jw.processJobOutputs() + assert result["OK"] + assert jw.jobReport.jobStatusInfo == [] + assert jw.jobReport.jobParameters == [] + assert result["Value"] == "Job has no owner specified" + + +def test_processJobOutputs_no_output_with_owner(setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: no output files with owner.""" + jw = setup_another_job_wrapper + jw.jobArgs = { + "OutputSandbox": [], + "OutputData": [], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert result["OK"] + assert result["Value"] == "Job outputs processed" + assert len(jw.jobReport.jobStatusInfo) == 1 + assert jw.jobReport.jobStatusInfo[0][0] == JobStatus.COMPLETING + assert jw.jobReport.jobParameters == [] + + +def test_processJobOutputs_no_output_with_failure(setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: no output files with payload failure.""" + jw = setup_another_job_wrapper + # Set the failed flag to True + jw.failedFlag = True + + jw.jobArgs = { + "OutputSandbox": [], + "OutputData": [], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert result["OK"] + assert result["Value"] == "Job outputs processed" + assert jw.jobReport.jobStatusInfo == [] + assert jw.jobReport.jobParameters == [] + + +def test_processJobOutputs_output_sandbox(mocker, setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: output sandbox. + + The following files are expected to be uploaded: + - std.out + - std.err + - summary_123.xml + - result_dir.tar + + The following files are expected to be missing: + - test.log + - test_dir + """ + jw = setup_another_job_wrapper + # Mock the uploadFilesAsSandbox method + uploadFiles = mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient.SandboxStoreClient.uploadFilesAsSandbox", + return_value=S_OK(), + ) + + jw.jobArgs = { + "OutputSandbox": ["std.out", "std.err", "summary*.xml", "result_dir", "test.log", "test*.sim", "test_dir"], + "OutputData": [], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert result["OK"] + assert result["Value"] == "Job outputs processed" + + uploadFiles.assert_called_once() + args, _ = uploadFiles.call_args + upload_args = args[0] + assert sorted(["123/std.out", "123/std.err", "123/summary_123.xml", "123/result_dir.tar"]) == sorted(upload_args) + + assert len(jw.jobReport.jobStatusInfo) == 2 + assert jw.jobReport.jobStatusInfo[0][:-1] == (JobStatus.COMPLETING, JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) + assert jw.jobReport.jobStatusInfo[1][:-1] == (JobStatus.COMPLETING, JobMinorStatus.OUTPUT_SANDBOX_UPLOADED) + assert len(jw.jobReport.jobParameters) == 1 + assert jw.jobReport.jobParameters[0] == ("OutputSandboxMissingFiles", "test.log, test_dir") + + +def test_processJobOutputs_output_sandbox_upload_fails_no_sandbox_name(mocker, setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: output sandbox upload fails with no sandbox name.""" + jw = setup_another_job_wrapper + # Mock the uploadFilesAsSandbox method: upload failed + uploadFiles = mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient.SandboxStoreClient.uploadFilesAsSandbox", + return_value=S_ERROR("Upload failed"), + ) + + jw.jobArgs = { + "OutputSandbox": ["std.out", "std.err", "summary*.xml", "result_dir", "test.log", "test*.sim", "test_dir"], + "OutputData": [], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert not result["OK"] + assert "no file name supplied for failover to Grid storage" in result["Message"] + + uploadFiles.assert_called_once() + assert len(jw.jobReport.jobStatusInfo) == 1 + assert jw.jobReport.jobStatusInfo[0][:-1] == (JobStatus.COMPLETING, JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) + assert len(jw.jobReport.jobParameters) == 1 + assert jw.jobReport.jobParameters[0] == ("OutputSandboxMissingFiles", "test.log, test_dir") + + +def test_processJobOutputs_output_sandbox_upload_fails_with_sandbox_name_no_outputSE(mocker, setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: output sandbox upload fails with sandbox name + but there is no output SE defined. + """ + jw = setup_another_job_wrapper + # Mock the uploadFilesAsSandbox method: upload failed but with a sandbox name + uploadFiles = mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient.SandboxStoreClient.uploadFilesAsSandbox", + return_value={"OK": False, "Message": "Upload failed", "SandboxFileName": "/tmp/Sandbox1"}, + ) + + jw.jobArgs = { + "OutputSandbox": ["std.out", "std.err", "summary*.xml", "result_dir", "test.log", "test*.sim", "test_dir"], + "OutputData": [], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert not result["OK"] + assert "No output SEs defined" in result["Message"] + + uploadFiles.assert_called_once() + assert len(jw.jobReport.jobStatusInfo) == 1 + assert jw.jobReport.jobStatusInfo[0][:-1] == (JobStatus.COMPLETING, JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) + assert len(jw.jobReport.jobParameters) == 3 + assert jw.jobReport.jobParameters[0] == ("OutputSandboxMissingFiles", "test.log, test_dir") + assert jw.jobReport.jobParameters[1] == ("OutputSandbox", "Sandbox uploaded to grid storage") + assert jw.jobReport.jobParameters[2] == ("OutputSandboxLFN", "/dirac/user/u/unknown/0/123/Sandbox1") + + +def test_processJobOutputs_output_data_upload(mocker, setup_another_job_wrapper): + """Test the processJobOutputs method of the JobWrapper class: output sandbox upload fails with sandbox name + but there is no output SE defined. + """ + jw = setup_another_job_wrapper + jw.defaultFailoverSE = "TestFailoverSE" + + # Mock the transferAndRegisterFile method: transfer does not fail + transferFiles = mocker.patch.object( + jw.failoverTransfer, "transferAndRegisterFile", return_value=S_OK({"uploadedSE": jw.defaultFailoverSE}) + ) + + # TODO: LFNs does not seem to be well supported, they would not be extracted properly from the pool_xml_catalog.xml + # In getGuidByPfn(), pfn (which still contains LFN: at this moment) is compared to the value in pool_xml_catalog.xml + # (which does not contains LFN:) + # BTW, isn't the concept of pool_xml_catalog.xml from lhcbdirac? + jw.jobArgs = { + "OutputSandbox": [], + "OutputData": ["1720442808testFileUpload.txt", "LFN:00232454_00000244_1.sim"], + "Owner": "Jane Doe", + } + + result = jw.processJobOutputs() + assert result["OK"] + assert "Job outputs processed" in result["Value"] + + # how many times transferAndRegisterFile was called: 2 times + transferFiles.assert_called() + assert len(jw.jobReport.jobStatusInfo) == 3 + # TODO: Uploading output sandbox is reported whereas there was no output sandbox + assert jw.jobReport.jobStatusInfo[0][:-1] == (JobStatus.COMPLETING, JobMinorStatus.UPLOADING_OUTPUT_SANDBOX) + assert jw.jobReport.jobStatusInfo[1][:-1] == ("", JobMinorStatus.UPLOADING_OUTPUT_DATA) + assert jw.jobReport.jobStatusInfo[2][:-1] == (JobStatus.COMPLETING, JobMinorStatus.OUTPUT_DATA_UPLOADED) + assert len(jw.jobReport.jobParameters) == 1 + assert jw.jobReport.jobParameters[0] == ( + "UploadedOutputData", + "00232454_00000244_1.sim, /dirac/user/u/unknown/0/123/1720442808testFileUpload.txt", + ) + + +# ------------------------------------------------------------------------------------------------- + + +def test_resolveInputData(mocker): mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) + mocker.patch( + "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock + ) jw = JobWrapper() jw.jobArgs["InputData"] = "" @@ -593,6 +906,59 @@ def test_InputData(mocker): assert res["OK"] +# ------------------------------------------------------------------------------------------------- + + +def test_transferInputSandbox_no_sandbox(setup_another_job_wrapper): + """Test the transferInputSandbox method of the JobWrapper class: no sandbox to transfer.""" + jw = setup_another_job_wrapper + + res = jw.transferInputSandbox([""]) + assert res["OK"] + assert res["Value"] == "InputSandbox downloaded" + + assert len(jw.jobReport.jobStatusInfo) == 1 + assert jw.jobReport.jobStatusInfo[0][:-1] == ("", JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + + +def test_transferInputSandbox_invalid_sb_url(setup_another_job_wrapper): + """Test the transferInputSandbox method of the JobWrapper class: invalid sandbox URL.""" + jw = setup_another_job_wrapper + + # SB:anotherfile.txt is not formatted correctly: should be SB:| + res = jw.transferInputSandbox(["SB:anotherfile.txt"]) + assert not res["OK"] + assert "Invalid sandbox" in res["Message"] + + assert len(jw.jobReport.jobStatusInfo) == 2 + assert jw.jobReport.jobStatusInfo[0][:-1] == ("", JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + assert jw.jobReport.jobStatusInfo[1][:-1] == ("", JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX) + + +def test_transferInputSandbox(mocker, setup_another_job_wrapper): + """Test the transferInputSandbox method of the JobWrapper class.""" + jw = setup_another_job_wrapper + + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient.SandboxStoreClient.downloadSandbox", + return_values=S_OK(100), + ) + mocker.patch.object( + jw.dm, "getFile", return_value=S_OK({"Failed": [], "Successful": {"file1.txt": "/path/to/file1.txt"}}) + ) + + res = jw.transferInputSandbox(["jobDescription.xml", "script.sh", "LFN:file1.txt", "SB:se|anotherfile.txt"]) + assert res["OK"] + assert res["Value"] == "InputSandbox downloaded" + + assert len(jw.jobReport.jobStatusInfo) == 2 + assert jw.jobReport.jobStatusInfo[0][:-1] == ("", JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) + assert jw.jobReport.jobStatusInfo[1][:-1] == ("", JobMinorStatus.DOWNLOADING_INPUT_SANDBOX_LFN) + + +# ------------------------------------------------------------------------------------------------- + + @pytest.mark.parametrize( "failedFlag, expectedRes, finalStates", [ @@ -601,10 +967,10 @@ def test_InputData(mocker): ], ) def test_finalize(mocker, failedFlag, expectedRes, finalStates): + mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) mocker.patch( "DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.getSystemSection", side_effect=getSystemSectionMock ) - mocker.patch("DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper.ObjectLoader", side_effect=MagicMock()) jw = JobWrapper() jw.jobArgs = {"Executable": "/bin/ls"} diff --git a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py index 9a5e5a21175..d6d8538e16d 100644 --- a/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py +++ b/src/DIRAC/WorkloadManagementSystem/JobWrapper/test/Test_JobWrapperTemplate.py @@ -51,8 +51,6 @@ } payloadParams = { "command": "dirac-jobexec helloworld.xml -o LogLevel=DEBUG", - "error": "std.err", - "output": "std.out", "env": {}, } @@ -104,7 +102,7 @@ def test_createAndExecuteJobWrapperTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent + assert f'os.path.realpath("{os.getcwd()}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -172,7 +170,7 @@ def test_createAndExecuteJobWrapperTemplate_missingExtraOptions(): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{os.getcwd()}")' in jobWrapperContent + assert f'os.path.realpath("{os.getcwd()}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -251,7 +249,7 @@ def test_createAndExecuteRelocatedJobWrapperTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f'sys.path.insert(0, "{rootLocation}")' in jobWrapperContent + assert f'os.path.realpath("{rootLocation}")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"]["JobWrapperConfigPath"] @@ -368,7 +366,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): jobWrapperContent = f.read() assert "@SITEPYTHON@" not in jobWrapperContent - assert f"sys.path.insert(0, sitePython)" in jobWrapperContent + assert 'os.path.realpath(".")' in jobWrapperContent # Test job wrapper configuration path jobWrapperConfigPath = res["Value"].get("JobWrapperConfigPath") @@ -437,7 +435,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 1, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" not in result.stdout, result.stdout assert result.stderr == b"", result.stderr # 4. We recreate the job wrapper offline template with the payload params now @@ -467,7 +465,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 1, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" not in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" not in result.stdout, result.stdout assert result.stderr == b"", result.stderr # The root location should contain: @@ -508,7 +506,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): result = subprocess.run(jobExecutableRelocatedPath, shell=True, capture_output=True) assert result.returncode == 0, result.stderr - assert b"Starting Job Wrapper Initialization for Job 1" in result.stdout, result.stdout + assert b"JobID is not defined, running in current directory" in result.stdout, result.stdout assert b"Job Wrapper is starting the processing phase for job" in result.stdout, result.stdout assert result.stderr == b"", result.stderr @@ -517,13 +515,11 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): # - the job wrapper configuration # - the job executable # - the job/Wrapper directory - # - the directory - assert len(os.listdir(rootLocation)) == numberOfFiles + 5 - assert os.path.exists(os.path.join(rootLocation, "1")) - assert os.path.exists(os.path.join(rootLocation, "1", "payloadResults.json")) - assert os.path.exists(os.path.join(rootLocation, "1", "checksum.json")) + assert len(os.listdir(rootLocation)) == numberOfFiles + 8 + assert os.path.exists(os.path.join(rootLocation, "payloadResults.json")) + assert os.path.exists(os.path.join(rootLocation, "checksum.json")) - with open(os.path.join(rootLocation, "1", "payloadResults.json")) as f: + with open(os.path.join(rootLocation, "payloadResults.json")) as f: payloadResults = json.load(f) assert payloadResults["OK"] @@ -532,7 +528,7 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): assert "payloadOutput" in payloadResults["Value"] assert "payloadStatus" in payloadResults["Value"] - with open(os.path.join(rootLocation, "1", "checksum.json")) as f: + with open(os.path.join(rootLocation, "checksum.json")) as f: checksums = json.load(f) assert jobParams["PayloadResults"] in checksums @@ -540,5 +536,8 @@ def test_createAndExecuteJobWrapperOfflineTemplate_success(extraOptions): os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperPath))) os.unlink(os.path.join(rootLocation, os.path.basename(jobWrapperConfigPath))) os.unlink(os.path.join(rootLocation, os.path.basename(jobExecutablePath))) - shutil.rmtree(os.path.join(rootLocation, "1")) + os.unlink(os.path.join(rootLocation, "payloadResults.json")) + os.unlink(os.path.join(rootLocation, "checksum.json")) + os.unlink(os.path.join(rootLocation, "std.err")) + os.unlink(os.path.join(rootLocation, "job.info")) shutil.rmtree(os.path.join(os.getcwd(), "job")) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index 2655697dbaa..d20476f768c 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -22,6 +22,7 @@ def createJobWrapper( defaultWrapperLocation: str | None = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py", log: Logging | None = gLogger, logLevel: str | None = "INFO", + cfgPath: str | None = None, ): """This method creates a job wrapper filled with the CE and Job parameters to execute the job. Main user is the JobAgent. @@ -38,6 +39,7 @@ def createJobWrapper( :param defaultWrapperLocation: Location of the default job wrapper template :param log: Logger :param logLevel: Log level + :param cfgPath: Path to a specific configuration file :return: S_OK with the path to the job wrapper and the path to the job wrapper json file """ if isinstance(extraOptions, str) and extraOptions.endswith(".cfg"): @@ -91,13 +93,15 @@ def createJobWrapper( jobWrapperDirect = os.path.join(rootLocation, f"Wrapper_{jobID}") jobExeFile = os.path.join(wrapperPath, f"Job{jobID}") jobFileContents = """#!/bin/sh -{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no +{} {} {} -o LogLevel={} -o /DIRAC/Security/UseServerCertificate=no {} """.format( pythonPath, jobWrapperDirect, extraOptions if extraOptions else "", logLevel, + cfgPath if cfgPath else "", ) + with open(jobExeFile, "w") as jobFile: jobFile.write(jobFileContents) diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py index acc4d0e66ab..e4d7ae198df 100755 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_jobexec.py @@ -1,8 +1,4 @@ #!/usr/bin/env python -######################################################################## -# File : dirac-jobexec -# Author : Stuart Paterson -######################################################################## """ The dirac-jobexec script is equipped to execute workflows that are specified via their XML description. The main client of this script is the Job Wrapper.