Skip to content

Commit

Permalink
Recycle failed jobs back into the queue
Browse files Browse the repository at this point in the history
Temporarily alleviates #194 by not hanging while waiting for the file to
appear (it had entered race condition avoidance which blocked everything)

Also need to do something about HTTP 400 on compilation_update because
we don't want to recycle jobs that "fail" because of a resubmit.
  • Loading branch information
j-mao committed Jan 8, 2020
1 parent d870aa8 commit 30aea53
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
7 changes: 1 addition & 6 deletions infrastructure/worker/app/compile_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,8 @@ def compile_worker(submissionid):
# Obtain compressed archive of the submission
try:
os.mkdir(sourcedir)
blob = None
while blob is None:
# This file is guaranteed to exist
# If it doesn't, just try again, to avoid race condition
blob = bucket.get_blob(os.path.join(submissionid, 'source.zip'))
with open(os.path.join(rootdir, 'source.zip'), 'wb') as file_obj:
blob.download_to_file(file_obj)
bucket.get_blob(os.path.join(submissionid, 'source.zip')).download_to_file(file_obj)
except:
compile_log_error(submissionid, 'Could not retrieve source file from bucket')

Expand Down
15 changes: 10 additions & 5 deletions infrastructure/worker/app/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def subscribe(subscription_name, worker):

process = multiprocessing.Process(target=worker, args=(message.message.data.decode(),))
process.start()
logging.info('Job {}: beginning'.format(message.message.data))
logging.info('Beginning: {}'.format(message.message.data.decode()))

while True:
# If the process is still running, give it more time to finish
Expand All @@ -46,12 +46,17 @@ def subscribe(subscription_name, worker):
[message.ack_id],
ack_deadline_seconds=SUB_ACK_DEADLINE)
logging.debug('Reset ack deadline for {} for {}s'.format(
message.message.data, SUB_ACK_DEADLINE))
message.message.data.decode(), SUB_ACK_DEADLINE))

# If the process is finished, acknowledge it
# The process is finished
else:
client.acknowledge(subscription_path, [message.ack_id])
logging.info('Job {}: ending and acknowledged'.format(message.message.data))
if process.exitcode == 0:
# Success; acknowledge and return
client.acknowledge(subscription_path, [message.ack_id])
logging.info('Ending and acknowledged: {}'.format(message.message.data.decode()))
else:
# Failure; refuse to acknowledge
logging.error('Failed, not acknowledged: {}'.format(message.message.data.decode()))
break

# Sleep the thread before checking again
Expand Down

0 comments on commit 30aea53

Please sign in to comment.