Skip to content

Commit

Permalink
Fix stall problem in pipeliner runner in environments with nice>0
Browse files Browse the repository at this point in the history
This is fixing a stall problem in the pipeline runner, which was
observed on systems (GRID), where the runner was launched with an
increased nice value (>0).

In this scenario, allocated resources per task were not correctly
"un-accounted' and so the runner stalled at some moment since it couldn't
find free resources.

This commit is also adding some debugging mechanism with webhooks.
Later integration into the standard logging system would be nice.
  • Loading branch information
sawenzel committed Apr 27, 2021
1 parent 8d0bae4 commit 0a48333
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def setup_logger(name, log_file, level=logging.INFO):
# second file logger
metriclogger = setup_logger('pipeline_metric_logger', 'pipeline_metric.log')

# for debugging without terminal access
# TODO: integrate into standard logger
def send_webhook(hook, t):
if hook!=None:
command="curl -X POST -H 'Content-type: application/json' --data '{\"text\":\" " + str(t) + "\"}' " + str(hook) + " &> /dev/null"
os.system(command)

# A fallback solution to getting all child procs
# in case psutil has problems (PermissionError).
Expand Down Expand Up @@ -570,7 +576,6 @@ def stop_pipeline_and_exit(self, process_list):

exit(1)


def monitor(self, process_list):
self.internalmonitorcounter+=1
if self.internalmonitorcounter % 5 != 0:
Expand Down Expand Up @@ -650,6 +655,7 @@ def monitor(self, process_list):

resources_per_task[tid]={'iter':self.internalmonitorid, 'name':self.idtotask[tid], 'cpu':totalCPU, 'uss':totalUSS/1024./1024., 'pss':totalPSS/1024./1024, 'nice':proc.nice(), 'swap':totalSWAP, 'label':self.workflowspec['stages'][tid]['labels']}
metriclogger.info(resources_per_task[tid])
send_webhook(self.args.webhook, resources_per_task)

for r in resources_per_task.values():
if r['nice']==os.nice(0):
Expand Down Expand Up @@ -679,7 +685,7 @@ def waitforany(self, process_list, finished):
if returncode!=None:
actionlogger.info ('Task ' + str(pid) + ' ' + str(p[0])+':'+str(self.idtotask[p[0]]) + ' finished with status ' + str(returncode))
# account for cleared resources
if self.nicevalues[p[0]]==0: # --> change for a more robust way
if self.nicevalues[p[0]]==os.nice(0):
self.curmembooked-=float(self.maxmemperid[p[0]])
self.curcpubooked-=float(self.cpuperid[p[0]])
else:
Expand Down Expand Up @@ -865,6 +871,10 @@ def execute(self):
finished = []
actionlogger.debug('Sorted current candidates: ' + str([(c,self.idtotask[c]) for c in candidates]))
self.try_job_from_candidates(candidates, self.process_list, finished)
if len(candidates) > 0 and len(self.process_list) == 0:
actionlogger.info("Not able to make progress: Nothing scheduled although non-zero candidate set")
send_webhook(self.args.webhook,"Unable to make further progress: Quitting")
break

finished_from_started = []
while self.waitforany(self.process_list, finished_from_started):
Expand All @@ -889,6 +899,7 @@ def execute(self):
candidates.append(candid)

actionlogger.debug("New candidates " + str( candidates))
send_webhook(self.args.webhook, "New candidates " + str(candidates))

if len(candidates)==0 and len(self.process_list)==0:
break
Expand Down Expand Up @@ -927,6 +938,7 @@ def execute(self):
parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\
ust exist and the tasks file must be writable to with the current user.')
parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,')
parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel

args = parser.parse_args()
print (args)
Expand Down

0 comments on commit 0a48333

Please sign in to comment.