From 30aea53412cedce23531431232858e1b1c6d8806 Mon Sep 17 00:00:00 2001 From: j-mao Date: Wed, 8 Jan 2020 18:24:35 +1100 Subject: [PATCH] Recycle failed jobs back into the queue Temporarily alleviates #194 by not hanging while waiting for the file to appear (it had entered race condition avoidance which blocked everything) Also need to do something about HTTP 400 on compilation_update because we don't want to recycle jobs that "fail" because of a resubmit. --- infrastructure/worker/app/compile_server.py | 7 +------ infrastructure/worker/app/subscription.py | 15 ++++++++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/infrastructure/worker/app/compile_server.py b/infrastructure/worker/app/compile_server.py index 80af61f9..eb2d84cf 100755 --- a/infrastructure/worker/app/compile_server.py +++ b/infrastructure/worker/app/compile_server.py @@ -60,13 +60,8 @@ def compile_worker(submissionid): # Obtain compressed archive of the submission try: os.mkdir(sourcedir) - blob = None - while blob is None: - # This file is guaranteed to exist - # If it doesn't, just try again, to avoid race condition - blob = bucket.get_blob(os.path.join(submissionid, 'source.zip')) with open(os.path.join(rootdir, 'source.zip'), 'wb') as file_obj: - blob.download_to_file(file_obj) + bucket.get_blob(os.path.join(submissionid, 'source.zip')).download_to_file(file_obj) except: compile_log_error(submissionid, 'Could not retrieve source file from bucket') diff --git a/infrastructure/worker/app/subscription.py b/infrastructure/worker/app/subscription.py index 63fa242f..8fb18397 100644 --- a/infrastructure/worker/app/subscription.py +++ b/infrastructure/worker/app/subscription.py @@ -35,7 +35,7 @@ def subscribe(subscription_name, worker): process = multiprocessing.Process(target=worker, args=(message.message.data.decode(),)) process.start() - logging.info('Job {}: beginning'.format(message.message.data)) + logging.info('Beginning: {}'.format(message.message.data.decode())) while True: # If the process is still running, give it more time to finish @@ -46,12 +46,17 @@ def subscribe(subscription_name, worker): [message.ack_id], ack_deadline_seconds=SUB_ACK_DEADLINE) logging.debug('Reset ack deadline for {} for {}s'.format( - message.message.data, SUB_ACK_DEADLINE)) + message.message.data.decode(), SUB_ACK_DEADLINE)) - # If the process is finished, acknowledge it + # The process is finished else: - client.acknowledge(subscription_path, [message.ack_id]) - logging.info('Job {}: ending and acknowledged'.format(message.message.data)) + if process.exitcode == 0: + # Success; acknowledge and return + client.acknowledge(subscription_path, [message.ack_id]) + logging.info('Ending and acknowledged: {}'.format(message.message.data.decode())) + else: + # Failure; refuse to acknowledge + logging.error('Failed, not acknowledged: {}'.format(message.message.data.decode())) break # Sleep the thread before checking again