-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix CeleryKubernetesExecutor
#16700
Fix CeleryKubernetesExecutor
#16700
Conversation
airflow/jobs/base_job.py
Outdated
if self.__class__.__name__ != "LocalTaskJob": | ||
self.executor = executor or ExecutorLoader.get_default_executor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other option is to keep it None
in BaseJob
and override it in SchedulerJob
and BackfillJob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option would be to make executor
a cached property and executor_class
a property. this would simply get them out of init, which i think would be enough if they are not accessed in LocalTaskJob
anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do also wonder.... why is it that we only get this issue with CKE? is there perhaps something about the way in which CKE is designed that causes this problem?
The issue is that it tries to create an instance of KubernetesExecutor
inside CeleryExecutor
and KubernetesExecutor
creates a multiprocessing Manager & Queue in its __init__
which creates issues with Celery as explained in celery/celery#4525
airflow/jobs/base_job.py
Outdated
if self.__class__.__name__ != "LocalTaskJob": | ||
self.executor = executor or ExecutorLoader.get_default_executor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option would be to make executor
a cached property and executor_class
a property. this would simply get them out of init, which i think would be enough if they are not accessed in LocalTaskJob
anyway (though if one wanted to be explicit they could override those properties with not implemented in LocalTaskJob)
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store init param `executor` as private attr `_executor`
this has two benefits, one is you don't have to reference the subclass name here (as in your first approach) and the other is you don't have to make as many changes re executor in init params)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know this part of the code that well but looks like when celery runs a task it runs using LocalTaskJob
and in this case we don't need an executor. I offer a third option above. Small notes in this review.
Looking forward to trying this executor out at long last :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As dstandish mentioned.
Updated the PR with |
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
closes apache#16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "<string>", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "<string>", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it.
closes #16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "<string>", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "<string>", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it. (cherry picked from commit 7857a9b)
closes apache#16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "<string>", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "<string>", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it. (cherry picked from commit 7857a9b)
closes #16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "<string>", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "<string>", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it. (cherry picked from commit 7857a9b)
closes apache#16326 Currently when running celery tasks when running with ``CeleryKubernetesExecutor``, we see the following error. This error occurs as the ``BaseJob`` (via ``LocalTaskJob``) tries to needlessly instantiate a `KubernetesExecutor` which in turn tries to create a multiprocessing process/Manager which fails. ``` [2021-06-29 00:23:45,301: ERROR/ForkPoolWorker-16] Failed to execute task daemonic processes are not allowed to have children. Traceback (most recent call last): File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 116, in _execute_in_fork args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command return func(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 237, in task_run _run_task_by_selected_method(args, dag, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method _run_task_by_local_task_job(args, ti) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 117, in _run_task_by_local_task_job pool=args.pool, File "<string>", line 4, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 433, in _initialize_instance manager.dispatch.init_failure(self, args, kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ with_traceback=exc_tb, File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ raise exception File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/state.py", line 430, in _initialize_instance return manager.original_init(*mixed[1:], **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 76, in __init__ super().__init__(*args, **kwargs) File "<string>", line 6, in __init__ File "/home/airflow/.local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 97, in __init__ self.executor = executor or ExecutorLoader.get_default_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 62, in get_default_executor cls._default_executor = cls.load_executor(executor_name) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 79, in load_executor return cls.__load_celery_kubernetes_executor() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/executor_loader.py", line 116, in __load_celery_kubernetes_executor kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])() File "/home/airflow/.local/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 421, in __init__ self._manager = multiprocessing.Manager() File "/usr/local/lib/python3.6/multiprocessing/context.py", line 56, in Manager m.start() File "/usr/local/lib/python3.6/multiprocessing/managers.py", line 513, in start self._process.start() File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start 'daemonic processes are not allowed to have children' AssertionError: daemonic processes are not allowed to have children ``` We don't need to instantiate an executor when running ``LocalTaskJob`` as executor isn't used in it. (cherry picked from commit 7857a9b) (cherry picked from commit e264ef1)
closes #16326
Currently when running celery tasks when running with
CeleryKubernetesExecutor
,we see the following error. This error occurs as the
BaseJob
(viaLocalTaskJob
) tries to needlesslyinstantiate a
KubernetesExecutor
which in turn tries to create a multiprocessing process/Managerwhich fails.
We don't need to instantiate an executor when running
LocalTaskJob
as executor isn't used in it.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.