Skip to content

Commit

Permalink
Adjustin IRate; improvements in pipeline runner
Browse files Browse the repository at this point in the history
Several improvements in pipeline runner:

* fix a bug with "skipping task" ... now much faster
* provide option to make core-dump checkpoint on failure
  and upload to ALIEN
* offer possibility to re-schedule a failed task when
  it matches a certain condition
  (for instance random failures that happen very rarely)
  • Loading branch information
sawenzel committed Apr 30, 2021
1 parent 0a48333 commit 550b592
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 40 deletions.
161 changes: 123 additions & 38 deletions MC/bin/o2_dpg_workflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import os
import signal
import socket
import sys
import traceback
try:
Expand Down Expand Up @@ -401,6 +402,8 @@ def __init__(self, workflowfile, args, jmax=100):
self.nicevalues = [ os.nice(0) for tid in range(len(self.taskuniverse)) ]
self.internalmonitorcounter = 0 # internal use
self.internalmonitorid = 0 # internal use
self.tids_marked_toretry = [] # sometimes we might want to retry a failed task (simply because it was "unlucky") and we put them here
self.retry_counter = [ 0 for tid in range(len(self.taskuniverse)) ] # we keep track of many times retried already

def SIGHandler(self, signum, frame):
# basically forcing shut down of all child processes
Expand Down Expand Up @@ -529,18 +532,23 @@ def ok_to_skip(self, tid):
def try_job_from_candidates(self, taskcandidates, process_list, finished):
self.scheduling_iteration = self.scheduling_iteration + 1

# the ordinary process list part
initialcandidates=taskcandidates.copy()
for tid in initialcandidates:
actionlogger.debug ("trying to submit " + str(tid) + ':' + str(self.idtotask[tid]))
# check early if we could skip
# better to do it here (instead of relying on taskwrapper)
# remove "done / skippable" tasks immediately
tasks_skipped = False
for tid in taskcandidates.copy(): # <--- the copy is important !! otherwise this loop is not doing what you think
if self.ok_to_skip(tid):
finished.append(tid)
taskcandidates.remove(tid)
break #---> we break in order to preserve some ordering (the next candidate tried should be daughters of skipped job)
tasks_skipped = True
actionlogger.info("Skipping task " + str(self.idtotask[tid]))

# if tasks_skipped:
# return # ---> we return early in order to preserve some ordering (the next candidate tried should be daughters of skipped jobs)

elif (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid):
# the ordinary process list part
initialcandidates=taskcandidates.copy()
for tid in initialcandidates:
actionlogger.debug ("trying to submit " + str(tid) + ':' + str(self.idtotask[tid]))
if (len(self.process_list) + len(self.backfill_process_list) < self.max_jobs_parallel) and self.ok_to_submit(tid):
p=self.submit(tid)
if p!=None:
self.curmembooked+=float(self.maxmemperid[tid])
Expand Down Expand Up @@ -679,49 +687,120 @@ def waitforany(self, process_list, finished):

for p in list(process_list):
pid = p[1].pid
tid = p[0] # the task id of this process
returncode = 0
if not self.args.dry_run:
returncode = p[1].poll()
if returncode!=None:
actionlogger.info ('Task ' + str(pid) + ' ' + str(p[0])+':'+str(self.idtotask[p[0]]) + ' finished with status ' + str(returncode))
actionlogger.info ('Task ' + str(pid) + ' ' + str(tid)+':'+str(self.idtotask[tid]) + ' finished with status ' + str(returncode))
# account for cleared resources
if self.nicevalues[p[0]]==os.nice(0):
self.curmembooked-=float(self.maxmemperid[p[0]])
self.curcpubooked-=float(self.cpuperid[p[0]])
if self.nicevalues[tid]==os.nice(0):
self.curmembooked-=float(self.maxmemperid[tid])
self.curcpubooked-=float(self.cpuperid[tid])
else:
self.curmembooked_backfill-=float(self.maxmemperid[p[0]])
self.curcpubooked_backfill-=float(self.cpuperid[p[0]])
self.procstatus[p[0]]='Done'
finished.append(p[0])
self.curmembooked_backfill-=float(self.maxmemperid[tid])
self.curcpubooked_backfill-=float(self.cpuperid[tid])
self.procstatus[tid]='Done'
finished.append(tid)
process_list.remove(p)
if returncode!=0:
failuredetected = True
failingpids.append(pid)
failingtasks.append(p[0])
if returncode != 0:
print (str(tid) + ' failed ... checking retry')
# we inspect if this is something "unlucky" which could be resolved by a simple rebsumit
if self.is_worth_retrying(tid) and self.retry_counter[tid] < 2:
print (str(tid) + ' to be retried')
actionlogger.info ('Task ' + str(self.idtotask[tid]) + ' failed but marked to be retried ')
self.tids_marked_toretry.append(tid)
self.retry_counter[tid] += 1

else:
failuredetected = True
failingpids.append(pid)
failingtasks.append(tid)

if failuredetected and self.stoponfailure:
actionlogger.info('Stoping pipeline due to failure in stages with PID ' + str(failingpids))
# self.analyse_files_and_connections()
self.cat_logfiles_tostdout(failingtasks)

self.send_checkpoint(failingtasks, self.args.checkpoint_on_failure)
self.stop_pipeline_and_exit(process_list)

# empty finished means we have to wait more
return len(finished)==0


def get_logfile(self, tid):
# determines the logfile name for this task
taskspec = self.workflowspec['stages'][tid]
taskname = taskspec['name']
filename = taskname + '.log'
directory = taskspec['cwd']
return directory + '/' + filename


def is_worth_retrying(self, tid):
# This checks for some signatures in logfiles that indicate that a retry of this task
# might have a chance.
# Ideally, this should be made user configurable. Either the user could inject a lambda
# or a regular expression to use. For now we just put a hard coded list
logfile = self.get_logfile(tid)

# 1) ZMQ_EVENT + interrupted system calls (DPL bug during shutdown)
# Not sure if grep is faster than native Python text search ...
status = os.system('grep "failed setting ZMQ_EVENTS" ' + logfile + ' &> /dev/null')
if os.WEXITSTATUS(status) == 0:
return True

return False


def cat_logfiles_tostdout(self, taskids):
# In case of errors we can cat the logfiles for this taskname
# to stdout. Assuming convention that "taskname" translates to "taskname.log" logfile.
for tid in taskids:
taskspec = self.workflowspec['stages'][tid]
taskname = taskspec['name']
filename = taskname + '.log'
directory = taskspec['cwd']
path = directory + '/' + filename
if os.path.exists(path):
print (' ----> START OF LOGFILE ', path, ' -----')
os.system('cat ' + path)
print (' <---- END OF LOGFILE ', path, ' -----')
logfile = self.get_logfile(tid)
if os.path.exists(logfile):
print (' ----> START OF LOGFILE ', logfile, ' -----')
os.system('cat ' + logfile)
print (' <---- END OF LOGFILE ', logfile, ' -----')

def send_checkpoint(self, taskids, location):
# Makes a tarball containing all files in the base dir
# (timeframe independent) and the dir with corrupted timeframes
# and copies it to a specific ALIEN location. Not are core function
# just some tool get hold on error conditions appearing on the GRID.

def get_tar_command(dir='./', flags='cf', filename='checkpoint.tar'):
return 'find ' + str(dir) + ' -maxdepth 1 -type f -print0 | xargs -0 tar ' + str(flags) + ' ' + str(filename)

if location != None:
print ('Making a failure checkpoint')
# let's determine a filename from ALIEN_PROC_ID - hostname - and PID

aliprocid=os.environ.get('ALIEN_PROC_ID')
if aliprocid == None:
aliprocid = 0

fn='pipeline_checkpoint_ALIENPROC' + str(aliprocid) + '_PID' + str(os.getpid()) + '_HOST' + socket.gethostname() + '.tar'
actionlogger.info("Checkpointing to file " + fn)
tarcommand = get_tar_command(filename=fn)
actionlogger.info("Taring " + tarcommand)

# first of all the base directory
os.system(tarcommand)
# then we add stuff for the specific timeframes ids if any
for tid in taskids:
taskspec = self.workflowspec['stages'][tid]
directory = taskspec['cwd']
if directory != "./":
tarcommand = get_tar_command(dir=directory, flags='rf', filename=fn)
actionlogger.info("Tar command is " + tarcommand)
os.system(tarcommand)

# location needs to be an alien path of the form alien:///foo/bar/
copycommand='alien.py cp ' + fn + ' ' + str(location) + '@disk:1'
actionlogger.info("Copying to alien " + copycommand)
os.system(copycommand)


def analyse_files_and_connections(self):
for p,s in self.pid_to_files.items():
Expand Down Expand Up @@ -835,8 +914,8 @@ def execute(self):
exit (0)

if args.produce_script != None:
self.produce_script(args.produce_script)
exit (0)
self.produce_script(args.produce_script)
exit (0)

if args.rerun_from:
reruntaskfound=False
Expand All @@ -858,7 +937,7 @@ def execute(self):

self.process_list=[] # list of tuples of nodes ids and Popen subprocess instances

finishedtasks=[]
finishedtasks=[] # global list of finished tasks
try:

while True:
Expand All @@ -868,15 +947,15 @@ def execute(self):
# remove weights
candidates = [ tid for tid,_ in candidates ]

finished = []
finished = [] # --> to account for finished because already done or skipped
actionlogger.debug('Sorted current candidates: ' + str([(c,self.idtotask[c]) for c in candidates]))
self.try_job_from_candidates(candidates, self.process_list, finished)
if len(candidates) > 0 and len(self.process_list) == 0:
actionlogger.info("Not able to make progress: Nothing scheduled although non-zero candidate set")
send_webhook(self.args.webhook,"Unable to make further progress: Quitting")
break

finished_from_started = []
finished_from_started = [] # to account for finished when actually started
while self.waitforany(self.process_list, finished_from_started):
if not args.dry_run:
self.monitor(self.process_list) # ---> make async to normal operation?
Expand All @@ -886,9 +965,13 @@ def execute(self):

finished = finished + finished_from_started
actionlogger.debug("finished now :" + str(finished_from_started))
finishedtasks=finishedtasks + finished

# someone returned
finishedtasks = finishedtasks + finished

# if a task was marked as "retry" we simply put it back into the candidate list
if len(self.tids_marked_toretry) > 0:
candidates = candidates + self.tids_marked_toretry
self.tids_marked_toretry = []

# new candidates
for tid in finished:
if self.possiblenexttask.get(tid)!=None:
Expand Down Expand Up @@ -939,6 +1022,8 @@ def execute(self):
ust exist and the tasks file must be writable to with the current user.')
parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,')
parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel
parser.add_argument('--checkpoint-on-failure', help=argparse.SUPPRESS) # debug option making a debug-tarball and sending to specified address
# argument is alien-path

args = parser.parse_args()
print (args)
Expand Down
2 changes: 1 addition & 1 deletion MC/run/PWGHF/create_embedding_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys

# we simply delegate to main script with some PWGHF settings
command='${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 13000 -col pp -proc ccbar --embedding '
command='${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 13000 -col pp -proc ccbar --embedding -interactionRate 50000 '

# and add given user options
for i in range(1, len(sys.argv)):
Expand Down
2 changes: 1 addition & 1 deletion MC/run/PWGHF/embedding_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ SIMENGINE=${SIMENGINE:-TGeant4}
# create workflow
${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py -eCM 13000 -col pp -proc ccbar -tf ${NTIMEFRAMES} -nb ${NBKGEVENTS} \
-ns ${NSIGEVENTS} -e ${SIMENGINE} \
-j ${NWORKERS} --embedding
-j ${NWORKERS} --embedding --interactionRate 50000

# run workflow
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json

0 comments on commit 550b592

Please sign in to comment.