Skip to content

Commit

Permalink
[AIRFLOW-1024] Ignore celery executor errors (apache#49)
Browse files Browse the repository at this point in the history
Code defensively around the interactions with celery so that
we just log errors instead of crashing the scheduler.
It might makes sense to make the try catches one level higher
(to catch errors from all executors), but this needs some investigation.
  • Loading branch information
aoen committed Jun 8, 2017
1 parent e3e6aa7 commit a1df343
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import subprocess
import ssl
import time
import traceback

from celery import Celery
from celery import states as celery_states
Expand Down Expand Up @@ -101,23 +102,27 @@ def sync(self):
self.logger.debug(
"Inquiring about {} celery task(s)".format(len(self.tasks)))
for key, async in list(self.tasks.items()):
state = async.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.FAILURE:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.REVOKED:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
else:
self.logger.info("Unexpected state: " + async.state)
self.last_state[key] = async.state
try:
state = async.state
if self.last_state[key] != state:
if state == celery_states.SUCCESS:
self.success(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.FAILURE:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
elif state == celery_states.REVOKED:
self.fail(key)
del self.tasks[key]
del self.last_state[key]
else:
self.logger.info("Unexpected state: " + async.state)
self.last_state[key] = async.state
except Exception as e:
logging.error("Error syncing the celery executor, ignoring "
"it:\n{}\n".format(e, traceback.format_exc()))

def end(self, synchronous=False):
if synchronous:
Expand Down

0 comments on commit a1df343

Please sign in to comment.