Skip to content

Commit

Permalink
Merge pull request #25 from AlanCoding/release_handling
Browse files Browse the repository at this point in the history
Exception handling to always release work units
  • Loading branch information
shanemcd authored Jan 26, 2021
2 parents ac1c22b + a326e18 commit 031d770
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3021,25 +3021,35 @@ def __init__(self, task, runner_params):
self.runner_params['settings'].update(execution_environment_params)

def run(self):
# We establish a connection to the Receptor socket
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')

try:
return self._run_internal(receptor_ctl)
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None:
receptor_ctl.simple_command(f"work release {self.unit_id}")

def _run_internal(self, receptor_ctl):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
sockin, sockout = socket.socketpair()

threading.Thread(target=self.transmit, args=[sockin]).start()

# We establish a connection to the Receptor socket and submit our work, passing
# submit our work, passing
# in the right side of our socketpair for reading.
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
result = receptor_ctl.submit_work(worktype=self.work_type,
payload=sockout.makefile('rb'),
params=self.receptor_params)
unit_id = result['unitid']
self.unit_id = result['unitid']

sockin.close()
sockout.close()

resultsock, resultfile = receptor_ctl.get_work_results(unit_id,
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id,
return_socket=True,
return_sockfile=True)
# Both "processor" and "cancel_watcher" are spawned in separate threads.
Expand All @@ -3057,15 +3067,14 @@ def run(self):

res = list(first_future.done)[0].result()
if res.status == 'canceled':
receptor_ctl.simple_command(f"work cancel {unit_id}")
receptor_ctl.simple_command(f"work cancel {self.unit_id}")
resultsock.shutdown(socket.SHUT_RDWR)
resultfile.close()
elif res.status == 'error':
# TODO: There should be a more efficient way of getting this information
receptor_work_list = receptor_ctl.simple_command("work list")
raise RuntimeError(receptor_work_list[unit_id]['Detail'])
raise RuntimeError(receptor_work_list[self.unit_id]['Detail'])

receptor_ctl.simple_command(f"work release {unit_id}")
return res

# Spawned in a thread so Receptor can start reading before we finish writing, we
Expand Down

0 comments on commit 031d770

Please sign in to comment.