Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runMonitoring take more types addition #2390

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 32 additions & 33 deletions ganga/GangaCore/Core/MonitoringComponent/Local_GangaMC_Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,9 +715,9 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300):
jobs: a registry slice to be monitored (None -> all jobs), it may be passed by the user so ._impl is stripped if needed
Return:
False, if the loop cannot be started or the timeout occured while waiting for monitoring termination
True, if the monitoring steps were successfully executed
Note:
This method is meant to be used in Ganga scripts to request monitoring on demand.
saadkhi marked this conversation as resolved.
Show resolved Hide resolved
True, if the monitoring steps were successfully executed
Note:
This method is meant to be used in Ganga scripts to request monitoring on demand.
"""

log.debug("runMonitoring")
Expand All @@ -726,14 +726,13 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300):
# Detect New Jobs from other sessions.
new_jobs = stripProxy(self.registry_slice).objects.repository.update_index(True, True)
self.newly_discovered_jobs = list(set(self.newly_discovered_jobs) | set(new_jobs))
# Only load jobs from disk which are in new state currently.
for i in self.newly_discovered_jobs:
j = stripProxy(self.registry_slice(i))
job_status = lazyLoadJobStatus(j)
if job_status in ['new']:
stripProxy(self.registry_slice).objects.repository.load([i])

if not isType(steps, int) and steps < 0:
if not isType(steps, int) or steps <= 0:
log.warning("The number of monitor steps should be a positive (non-zero) integer")
return False

Expand All @@ -757,59 +756,59 @@ def runMonitoring(self, jobs=None, steps=1, timeout=300):
log.error("Cannot run the monitoring loop. The following credentials are required: %s" % _missingCreds)
return False

#log.debug("jobs: %s" % str(jobs))
#log.debug("self.__mainLoopCond: %s" % str(self.__mainLoopCond))
#log.debug("jobs: %s" % str(jobs))
#log.debug("self.__mainLoopCond: %s" % str(self.__mainLoopCond))

# Handle the `jobs` input (new logic to handle int, list, job object)
if jobs is not None:
if isinstance(jobs, int):
# If jobs is a single int, convert it to a slice (equivalent to one job)
log.debug(f"Converting job ID {jobs} to a registry slice")
m_jobs = self.registry_slice(jobs)
elif isinstance(jobs, list):
saadkhi marked this conversation as resolved.
Show resolved Hide resolved
# If jobs is a list of int, ensure all elements are int and convert to registry slice
if all(isinstance(job, int) for job in jobs):
log.debug(f"Converting job IDs {jobs} to a registry slice")
m_jobs = [self.registry_slice(job) for job in jobs]
else:
log.warning("List must contain integers representing job IDs")
return False
else:
# Handle job slices (existing behavior)
from GangaCore.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice
if not isinstance(jobs, RegistrySlice):
log.warning('jobs argument must be a registry slice, int, list of int, or job object')
return False
m_jobs = jobs
# Pass the new `m_jobs` (registry slice) for further processing
self.makeUpdateJobStatusFunction(jobSlice=m_jobs)
saadkhi marked this conversation as resolved.
Show resolved Hide resolved

with self.__mainLoopCond:
log.debug('Monitoring loop lock acquired. Enabling mon loop')
if self.enabled or self.__isInProgress():
log.error("The monitoring loop is already running.")
return False

if jobs is not None:
m_jobs = jobs

# additional check if m_jobs is really a registry slice
# the underlying code is not prepared to handle correctly the
# situation if it is not
from GangaCore.GPIDev.Lib.Registry.RegistrySlice import RegistrySlice
if not isType(m_jobs, RegistrySlice):
log.warning(
'runMonitoring: jobs argument must be a registry slice such as a result of jobs.select() or jobs[i1:i2]')
return False

#self.registry_slice = m_jobs
#log.debug("m_jobs: %s" % str(m_jobs))
self.makeUpdateJobStatusFunction(jobSlice=m_jobs)

log.debug("Enable Loop, Clear Iterators and setCallbackHook")
# enable mon loop
log.debug("Enable Loop, Clear Iterators, and setCallbackHook")
self.enabled = True
# set how many steps to run
saadkhi marked this conversation as resolved.
Show resolved Hide resolved
self.steps = steps
# enable job list iterators
self.stopIter.clear()
# Start backend update timeout checking.
self.setCallbackHook(UpdateDict.timeoutCheck, {'thisDict': self.updateDict_ts}, True)

log.debug("Waking up Main Loop")
# wake up the mon loop
self.__mainLoopCond.notify_all()

log.debug("Waiting to execute steps")
# wait to execute the steps
self.__monStepsTerminatedEvent.wait()
self.__monStepsTerminatedEvent.clear()

log.debug("Test for timeout")
# wait the steps to be executed or timeout to occur
if not self.__awaitTermination(timeout):
log.warning("Monitoring loop started but did not complete in the given timeout.")
# force loops termination
self.stopIter.set()
return False
return True


def enableMonitoring(self):
"""
Run the monitoring loop continuously
Expand Down
31 changes: 29 additions & 2 deletions ganga/GangaCore/test/GPI/Monitoring/TestMonitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_a_JobConstruction(self):
self.assertNotEqual(j.status, 'new')

def test_b_EnableMonitoring(self):
from GangaCore.GPI import enableMonitoring, Job, jobs
from GangaCore.GPI import enableMonitoring, Job, jobs, runMonitoring

enableMonitoring()

Expand All @@ -46,7 +46,9 @@ def test_b_EnableMonitoring(self):

dummySleep(j)

self.assertNotEqual(jobs(0).status, 'submitted')
saadkhi marked this conversation as resolved.
Show resolved Hide resolved
job_id = j.id
result = runMonitoring(jobs=job_id)
self.assertTrue(result, "runMonitoring with job ID failed to execute successfully.")

def test_c_disableMonitoring(self):

Expand Down Expand Up @@ -94,3 +96,28 @@ def test_f_reallyDisabled(self):
dummySleep(j)

self.assertEqual(j.status, 'completed')

def test_g_runMonitoring_withJobIDList(self):
from GangaCore.GPI import enableMonitoring, Job, runMonitoring

enableMonitoring()
job_ids = []
for _ in range(2):
j = Job()
j.submit()
dummySleep(j)
job_ids.append(j.id)

result = runMonitoring(jobs=job_ids)
self.assertTrue(result, "runMonitoring with list of job IDs failed to execute successfully.")

def test_h_runMonitoring_withJobObject(self):
from GangaCore.GPI import enableMonitoring, Job, runMonitoring

enableMonitoring()
j = Job()
j.submit()
dummySleep(j)

result = runMonitoring(jobs=j)
self.assertTrue(result, "runMonitoring with job object failed to execute successfully.")
Loading