Skip to content

Commit

Permalink
Merge pull request #7406 from aldbr/v9.0_FIX_SiteDirector-hackathon-f…
Browse files Browse the repository at this point in the history
…ixes

[9.0] fix: Site Director hackathon fixes
  • Loading branch information
fstagni authored Jan 18, 2024
2 parents d805616 + d8ae775 commit a9c084a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
12 changes: 9 additions & 3 deletions src/DIRAC/Resources/Computing/CloudComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import uuid
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import requests

import yaml
from libcloud.compute.providers import get_driver, set_driver
Expand Down Expand Up @@ -525,9 +526,14 @@ def getCEStatus(self):
"""
driver = self._getDriver()
count = 0
for node in driver.list_nodes():
if node.name.startswith(VM_NAME_PREFIX):
count += 1
try:
for node in driver.list_nodes():
if node.name.startswith(VM_NAME_PREFIX):
count += 1
except requests.exceptions.ConnectTimeout as err:
self.log.error("Cannot get CE Status. Connection timeout occurred:", str(err))
return S_ERROR("Cannot get CE Status. Connection timeout occurred")

result = S_OK()
result["SubmittedJobs"] = 0
result["RunningJobs"] = count
Expand Down
28 changes: 20 additions & 8 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def submitPilots(self):
self.log.verbose("Submission: Queues treated are", ",".join(self.queueDict))

errors = []
totalSubmittedPilots = 0
with ThreadPoolExecutor(max_workers=len(self.queueDict)) as executor:
futures = []
for queue in self.queueDict:
Expand All @@ -291,6 +292,10 @@ def submitPilots(self):
result = future.result()
if not result["OK"]:
errors.append(result["Message"])
else:
totalSubmittedPilots += result["Value"]

self.log.info("Total number of pilots submitted", f"to all queues: {totalSubmittedPilots}")

if errors:
self.log.error("The following errors occurred during the pilot submission operation", "\n".join(errors))
Expand All @@ -313,7 +318,7 @@ def _submitPilotsPerQueue(self, queueName: str):
f"{queueName} ==> {self.failedQueueCycleFactor - failedCount}",
)
self.failedQueues[queueName] += 1
return S_OK()
return S_OK(0)

# Adjust queueCPUTime: needed to generate the proxy
if "CPUTime" not in queueDictionary["ParametersDict"]:
Expand All @@ -338,14 +343,21 @@ def _submitPilotsPerQueue(self, queueName: str):
totalSlots = self._getQueueSlots(queueName)
if totalSlots <= 0:
self.log.verbose(f"{queueName}: No slot available")
return S_ERROR(f"{queueName}: No slot available")
self.log.info(f"{queueName}: to submit={totalSlots}")
return S_OK(0)

# Apply the submission policy
totalSlots = self.submissionPolicy.apply(totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters)
submittablePilots = self.submissionPolicy.apply(
totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters
)

if submittablePilots <= 0:
self.log.verbose(f"{queueName}: Nothing to submit")
return S_OK(0)

self.log.info(f"{queueName}: slots available={totalSlots} to submit={submittablePilots}")

# Limit the number of pilots to submit to self.maxPilotsToSubmit
pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots)
pilotsToSubmit = min(self.maxPilotsToSubmit, submittablePilots)

# Now really submitting
result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName)
Expand All @@ -359,9 +371,9 @@ def _submitPilotsPerQueue(self, queueName: str):
if not result["OK"]:
return result

# Summary after the cycle over queues
self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}")
return S_OK()
submittedPilots = len(pilotList)
self.log.info("Total number of pilots submitted", f"to {queueName}: {submittedPilots}")
return S_OK(submittedPilots)

def _getQueueSlots(self, queue: str):
"""Get the number of available slots in the queue"""
Expand Down

0 comments on commit a9c084a

Please sign in to comment.