Skip to content

Commit

Permalink
Merge pull request DIRACGrid#7587 from aldbr/v9.0_FEAT_PJA-new-mode
Browse files Browse the repository at this point in the history
[v9.0] feat: add a new job management mechanism in the PushJobAgent
  • Loading branch information
fstagni authored Sep 12, 2024
2 parents ab23bf1 + 09fd596 commit 5bc53a0
Show file tree
Hide file tree
Showing 19 changed files with 1,974 additions and 663 deletions.
71 changes: 56 additions & 15 deletions docs/source/AdministratorGuide/Resources/supercomputers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ This case has not been addressed yet.
No outbound connectivity
------------------------

Submission management
---------------------

Solutions seen in the previous section cannot work in an environment without external connectivity.
The well-known Pilot-Job paradigm on which the DIRAC WMS is based does not apply in these circumstances: the Pilot-Jobs cannot fetch jobs from DIRAC.
Thus, such supercomputers require slightly changes in the WMS: we reintroduced the push model.
Expand All @@ -126,6 +129,12 @@ To leverage the Push model, one has to add the :mod:`~DIRAC.WorkloadManagementSy
# Control the number of jobs handled on the machine
MaxJobsToSubmit = 100
Module = PushJobAgent
# SubmissionPolicy can be "Application" or "JobWrapper"
# - Application (soon deprecated): the agent will submit a workflow to a PoolCE, the workflow is responsible for interacting with the remote site
# - JobWrapper (default): the agent will submit a JobWrapper directly to the remote site, it is responsible of the remote execution
SubmissionPolicy = <SubmissionPolicy>
# The CVMFS location to be used for the job execution on the remote site
CVMFSLocation = "/cvmfs/dirac.egi.eu/dirac/pro"
}

One has also to authorize the machine hosting the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` to process jobs via the ``Registry/Hosts/<Host>`` CS section::
Expand Down Expand Up @@ -155,25 +164,57 @@ One has to specify the concerned VO, the platform and the CPU Power in the targe

Finally, one has to make sure that job scheduling parameters are correctly fine-tuned. Further details in the :ref:`JobScheduling section <jobscheduling>`.

:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` inherits from :mod:`~DIRAC.WorkloadManagementSystem.Agent.JobAgent` and proposes a similar structure: it fetches a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service and submit it to a :mod:`~DIRAC.Resources.Computing.PoolComputingElement`.
- It provides an additional parameter in ``/LocalSite`` named ``RemoteExecution`` that can be used later in the process to identify computing resources with no external connectivity.
- There is no ``timeLeft`` attribute: it runs on the DIRAC side as an ``Agent``.
- ``MaxJobsToSubmit`` corresponds to the maximum number of jobs the agent can handle at the same time.
- To fetch a job, :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` sends the dictionary of the target CE to the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service.
The :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` class extends the functionality of the :mod:`~DIRAC.WorkloadManagementSystem.Agent.JobAgent` and operates specifically on a VO box. It follows a similar architecture by retrieving jobs from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service and submitting them to a :mod:`~DIRAC.Resources.Computing.ComputingElement`. Although `PushJobAgent` does not inherit directly from the :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector`, it incorporates several comparable features, including:

- Supervising specific Sites, Computing Elements (CEs), or Queues.
- Placing problematic queues on hold and retrying after a pre-defined number of cycles.

The :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` supports two distinct submission modes: **JobWrapper** and **Application**.

:mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` does not inherit from :mod:`~DIRAC.WorkloadManagementSystem.Agent.SiteDirector` but embeds similar features:
- It supervises specific Sites/CEs/Queues.
- If there is an error with a queue, it puts it on hold and waits for a certain number of cycles before trying again.
JobWrapper Mode (Default and Recommended)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Internally, the workflow modules originally in charge of executing the script/application (:mod:`~DIRAC.Workflow.Modules.Script`) check whether the workload should be
sent to a remote location before acting.
:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` attempts to extract the value from the environment variable initialized by the :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper`.
If the variable is not set, then the application is run locally via ``systemCall()``, else the application is submitted to a remote Computing Element such as ARC.
:mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` wraps the script/application command in an executable, gets all the files of the working directory that correspond to input files and submits the executable along with the input files. It gets the status of the application submitted every 2 minutes until it is finished and finally gets the outputs.
The **JobWrapper** mode is the default and recommended submission method, due to its reliability and efficiency. The workflow for this mode includes:

What if the :mod:`~DIRAC.WorkloadManagementSystem.Agent.PushJobAgent` is suddenly stopped while processing jobs? Jobs would be declared as ``Stalled``. Administrators would have to manually clean up input directories (by default, they should be located in ``/opt/dirac/runit/WorkloadManagement/PushJobAgent/<JobID>``).
Administrators may also have to kill processes related to the execution of the jobs: ``dirac-jobexec``.
1. **Job Retrieval**: Fetch a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service.
2. **Pre-processing**: Pre-process the job by fetching the input sandbox and any necessary data.
3. **Template Generation**: Create a :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperOfflineTemplate` designed to execute the job’s payload.
4. **Submission**: Submit the generated `JobWrapperOfflineTemplate` along with the inputs to the target Computing Element (CE).
5. **Monitoring**: Continuously monitor the status of the submitted jobs until they are completed.
6. **Output Retrieval**: Retrieve the outputs of the finished jobs from the target CE.
7. **Post-processing**: Conduct any necessary post-processing of the outputs.

Certainly! Here's an enhanced version of the reStructuredText (reST) content:

.. warning:: The `JobWrapper` mode assumes that the job can execute without external connectivity. As an administrator, if any step of your job workflow requires external connectivity, it is crucial to review and adjust your logic accordingly. The :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator` can assist in this process. It enables you to define custom pre-processing and post-processing logic based on specific job and CE attributes. For more detailed information, refer to the :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobExecutionCoordinator` documentation.

.. image:: ../../_static/pja_jobwrapper_submission.png
:alt: PushJobAgent JobWrapper submission
:align: center

Application Mode (Deprecated)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The **Application** mode is deprecated and slated for removal in future versions. It is considered less reliable due to higher memory consumption and sensitivity to CE-related issues, which restrict the number of jobs that can be processed concurrently. The workflow for this mode is as follows:

1. **Job Retrieval**: Fetch a job from the :mod:`~DIRAC.WorkloadManagementSystem.Service.MatcherHandler` service using the target CE attributes.
2. **Template Generation**: Generate a :mod:`~DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperTemplate`.
3. **Submission**: Submit the generated `JobWrapperTemplate` to a :mod:`~DIRAC.Resources.Computing.PoolComputingElement`.
- The submission includes an additional parameter in ``/LocalSite`` named ``RemoteExecution``, used to identify computing resources lacking external connectivity.
- The ``MaxJobsToSubmit`` setting defines the maximum number of jobs the agent can handle simultaneously.
4. **Execution**: The :mod:`~DIRAC.Resources.Computing.PoolComputingElement` executes the `JobWrapperTemplate` in a new process.
5. **Script Execution**: Within this context, the `JobWrapperTemplate` can only execute the :mod:`~DIRAC.WorkloadManagementSystem.scripts.dirac_jobexec` script in a new process.
6. **Workflow Module Processing**: Workflow modules responsible for script or application execution (:mod:`~DIRAC.Workflow.Modules.Script`) determine whether the payload needs to be offloaded to a remote location.
7. **Remote Execution**: :mod:`~DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner` checks for the environment variable initialized by the `JobWrapper`.
- If the variable is unset, the application runs locally via ``systemCall()``; otherwise, it is submitted to a remote CE such as ARC.
- `RemoteRunner` wraps the script or application command in an executable, gathers input files from the working directory, and submits these along with the executable to the remote CE.
- The status of the submitted application is monitored every 2 minutes until completion, after which the outputs are retrieved.

.. warning:: If the `PushJobAgent` is interrupted while processing jobs, administrators must manually clean up input directories (usually located at ``/opt/dirac/runit/WorkloadManagement/PushJobAgent/<JobID>``) and terminate any associated processes (e.g., ``dirac-jobexec``).

.. image:: ../../_static/pja_application_submission.png
:alt: PushJobAgent Application submission
:align: center

Multi-core/node allocations
---------------------------
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions src/DIRAC/Core/Utilities/Proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def undecoratedFunction(foo='bar'):
"""
import functools
import os

from DIRAC import gConfig, gLogger, S_ERROR, S_OK
Expand Down Expand Up @@ -61,6 +62,7 @@ def executeWithUserProxy(fcn):
:param bool executionLock: flag to execute with a lock for the time of user proxy application ( default False )
"""

@functools.wraps(fcn)
def wrapped_fcn(*args, **kwargs):
userName = kwargs.pop("proxyUserName", "")
userDN = kwargs.pop("proxyUserDN", "")
Expand Down
4 changes: 4 additions & 0 deletions src/DIRAC/Resources/Computing/AREXComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
f"{jobReference} to CE {self.ceName}",
)

# Remove the bundled preamble
if self.preamble:
os.remove(executableFile)

if batchIDList:
result = S_OK(batchIDList)
result["PilotStampDict"] = stampDict
Expand Down
24 changes: 7 additions & 17 deletions src/DIRAC/Workflow/Modules/Script.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from DIRAC import gLogger, gConfig
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC.WorkloadManagementSystem.Utilities.RemoteRunner import RemoteRunner
from DIRAC.Workflow.Modules.ModuleBase import ModuleBase


Expand Down Expand Up @@ -83,22 +82,13 @@ def _executeCommand(self):
"""execute the self.command (uses systemCall)"""
failed = False

# Check whether the execution should be done remotely
if gConfig.getValue("/LocalSite/RemoteExecution", False):
remoteRunner = RemoteRunner(
gConfig.getValue("/LocalSite/Site"),
gConfig.getValue("/LocalSite/GridCE"),
gConfig.getValue("/LocalSite/CEQueue"),
)
retVal = remoteRunner.execute(self.command)
else:
retVal = systemCall(
timeout=0,
cmdSeq=shlex.split(self.command),
env=self.environment,
callbackFunction=self.callbackFunction,
bufferLimit=self.bufferLimit,
)
retVal = systemCall(
timeout=0,
cmdSeq=shlex.split(self.command),
env=self.environment,
callbackFunction=self.callbackFunction,
bufferLimit=self.bufferLimit,
)

if not retVal["OK"]:
failed = True
Expand Down
81 changes: 0 additions & 81 deletions src/DIRAC/Workflow/Modules/UploadOutputs.py

This file was deleted.

Loading

0 comments on commit 5bc53a0

Please sign in to comment.