diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 77ee01f1b6f3..72ca84146f1f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3021,6 +3021,17 @@ def __init__(self, task, runner_params): self.runner_params['settings'].update(execution_environment_params) def run(self): + # We establish a connection to the Receptor socket + receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') + + try: + return self._run_internal(receptor_ctl) + finally: + # Make sure to always release the work unit if we established it + if self.unit_id is not None: + receptor_ctl.simple_command(f"work release {self.unit_id}") + + def _run_internal(self, receptor_ctl): # Create a socketpair. Where the left side will be used for writing our payload # (private data dir, kwargs). The right side will be passed to Receptor for # reading. @@ -3028,18 +3039,17 @@ def run(self): threading.Thread(target=self.transmit, args=[sockin]).start() - # We establish a connection to the Receptor socket and submit our work, passing + # submit our work, passing # in the right side of our socketpair for reading. - receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params) - unit_id = result['unitid'] + self.unit_id = result['unitid'] sockin.close() sockout.close() - resultsock, resultfile = receptor_ctl.get_work_results(unit_id, + resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True) # Both "processor" and "cancel_watcher" are spawned in separate threads. @@ -3057,15 +3067,14 @@ def run(self): res = list(first_future.done)[0].result() if res.status == 'canceled': - receptor_ctl.simple_command(f"work cancel {unit_id}") + receptor_ctl.simple_command(f"work cancel {self.unit_id}") resultsock.shutdown(socket.SHUT_RDWR) resultfile.close() elif res.status == 'error': # TODO: There should be a more efficient way of getting this information receptor_work_list = receptor_ctl.simple_command("work list") - raise RuntimeError(receptor_work_list[unit_id]['Detail']) + raise RuntimeError(receptor_work_list[self.unit_id]['Detail']) - receptor_ctl.simple_command(f"work release {unit_id}") return res # Spawned in a thread so Receptor can start reading before we finish writing, we