Skip to content

Commit

Permalink
style: added few comments, re-ordered imports
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Oct 30, 2024
1 parent 111f2ca commit 084e019
Showing 1 changed file with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@

import datetime

from DIRAC import S_OK, S_ERROR
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Security.Properties import SecurityProperty
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.JEncode import encode as jencode
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation


TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted(
set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES)
Expand Down Expand Up @@ -396,27 +395,46 @@ def export_extendTransformation(self, transName, nTasks):
types_getTasksToSubmit = [[int, str], int]

def export_getTasksToSubmit(self, transName, numTasks, site=""):
"""Get information necessary for submission for a given number of tasks for a given transformation"""
"""
Retrieve the necessary information for the submission of a specified number of tasks
for a given transformation. This includes reserving tasks to avoid race conditions.
:param int | str transName: Name of the transformation
:param int numTasks: Number of tasks to retrieve for submission
:param str site: Optional site specification
:return: S_OK Dictionary containing transformation and task submission details
"""
# Get the transformation details
res = self.transformationDB.getTransformation(transName)
if not res["OK"]:
return res
transDict = res["Value"]

submitDict = {}
# applying few seconds delay to avoid race conditions

# Apply a delay to avoid race conditions
older = datetime.datetime.now() - datetime.timedelta(seconds=30)

# Retrieve tasks that are ready for submission
res = self.transformationDB.getTasksForSubmission(
transName, numTasks=numTasks, site=site, statusList=["Created"], older=older
)
if not res["OK"]:
return res
tasksDict = res["Value"]


# Reserve each task for submission
for taskID, taskDict in tasksDict.items():
res = self.transformationDB.reserveTask(transName, int(taskID))
if not res["OK"]:
return res
else:
submitDict[taskID] = taskDict
# Add reserved task to the submission dictionary
submitDict[taskID] = taskDict

# Add the job dictionary to the transformation details
transDict["JobDictionary"] = submitDict

return S_OK(transDict)

####################################################################
Expand Down

0 comments on commit 084e019

Please sign in to comment.