From 86ce1af8afea91f239a06b34e358a2eb0c7aaf21 Mon Sep 17 00:00:00 2001 From: aldbr Date: Wed, 8 May 2024 10:22:51 +0200 Subject: [PATCH] feat: Make RemoteRunner more resilient to CE issues --- .../Utilities/RemoteRunner.py | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 03d5425defd..7125887fd4d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -91,12 +91,22 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo # Request the whole directory as output outputs = ["/"] - # Submit the command as a job - if not (result := workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs))[ - "OK" - ]: + # Interactions with the CE might be unstable, we need to retry the operations + maxRetries = 10 + timeBetweenRetries = 120 + + # Submit the command as a job with retries + for _ in range(maxRetries): + result = workloadCE.submitJob(self.executable, workloadCE.proxy, inputs=inputs, outputs=outputs) + if result["OK"]: + break + else: + self.log.warn("Failed to submit job, retrying...") + time.sleep(timeBetweenRetries) + else: result["Errno"] = DErrno.EWMSSUBM return result + jobID = result["Value"][0] stamp = result["PilotStampDict"][jobID] self.log.info("The command has been wrapped in a job and sent. Remote JobID: ", jobID) @@ -106,18 +116,33 @@ def execute(self, command, workingDirectory=".", numberOfProcessors=1, cleanRemo jobStatus = PilotStatus.RUNNING while jobStatus not in PilotStatus.PILOT_FINAL_STATES: time.sleep(120) - result = workloadCE.getJobStatus([jobID]) - if not result["OK"]: + for _ in range(maxRetries): + result = workloadCE.getJobStatus([jobID]) + if result["OK"]: + break + else: + self.log.warn("Failed to get job status, retrying...") + time.sleep(timeBetweenRetries) + else: result["Errno"] = DErrno.EWMSSTATUS return result + jobStatus = result["Value"][jobID] self.log.info("The final status of the application/script is: ", jobStatus) # Get job outputs self.log.info("Getting the outputs of the command...") - if not (result := workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath(".")))["OK"]: + for _ in range(maxRetries): + result = workloadCE.getJobOutput(f"{jobID}:::{stamp}", os.path.abspath(".")) + if result["OK"]: + break + else: + self.log.warn("Failed to get job output, retrying...") + time.sleep(timeBetweenRetries) + else: result["Errno"] = DErrno.EWMSJMAN return result + output, error = result["Value"] # Make sure the output is correct