Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix _DockerDecoratedOperator module type attribute pickle error #35293

Merged
merged 3 commits into from
Oct 31, 2023

Conversation

phi-friday
Copy link
Contributor

#35285

Changed the existing pickling_library attribute to property and added use_dill attribute instead, as the property that _DockerDecoratedOperator has is not serializable and causes an error in the scheduler.

origin

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2023-10-31T10:21:35.996+0000] {executor_loader.py:117} INFO - Loaded executor: LocalExecutor
[2023-10-31 10:21:36 +0000] [140] [INFO] Starting gunicorn 21.2.0
[2023-10-31 10:21:36 +0000] [140] [INFO] Listening at: http://[::]:8793 (140)
[2023-10-31 10:21:36 +0000] [140] [INFO] Using worker: sync
[2023-10-31T10:21:36.038+0000] {scheduler_job_runner.py:803} INFO - Starting the scheduler
[2023-10-31T10:21:36.039+0000] {scheduler_job_runner.py:810} INFO - Processing each file at most -1 times
[2023-10-31 10:21:36 +0000] [141] [INFO] Booting worker with pid: 141
[2023-10-31 10:21:36 +0000] [148] [INFO] Booting worker with pid: 148
[2023-10-31T10:21:36.112+0000] {manager.py:169} INFO - Launched DagFileProcessorManager with pid: 193
[2023-10-31T10:21:36.124+0000] {scheduler_job_runner.py:1611} INFO - Adopting or resetting orphaned tasks for active dag runs
[2023-10-31T10:21:36.203+0000] {settings.py:61} INFO - Configured default timezone Timezone('UTC')
[2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution:
        <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
[2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks
[2023-10-31T10:21:51.192+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state:
        <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
[2023-10-31T10:21:51.193+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task no_error because previous state change time has not been saved
[2023-10-31T10:21:51.194+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default
[2023-10-31T10:21:51.194+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:21:51.195+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:21:51.229+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py
Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=no_error permission to 509
[2023-10-31T10:21:51.466+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd
[2023-10-31T10:21:51.694+0000] {local_executor.py:138} ERROR - Failed to execute task cannot pickle 'module' object.
Traceback (most recent call last):
  File "/opt/airflow/airflow/executors/local_executor.py", line 134, in _execute_work_in_fork
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 114, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 436, in task_run
    task_return_code = _run_task_by_selected_method(args, _dag, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 214, in _run_task_by_selected_method
    return _run_task_by_local_task_job(args, ti)
  File "/opt/airflow/airflow/cli/commands/task_command.py", line 276, in _run_task_by_local_task_job
    ret = run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/jobs/job.py", line 393, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/opt/airflow/airflow/jobs/job.py", line 422, in execute_job
    ret = execute_callable()
  File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 197, in _execute
    self.handle_task_exit(return_code)
  File "/opt/airflow/airflow/jobs/local_task_job_runner.py", line 237, in handle_task_exit
    self.task_instance.schedule_downstream_tasks(max_tis_per_query=self.job.max_tis_per_query)
  File "/opt/airflow/airflow/utils/session.py", line 79, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/airflow/airflow/models/taskinstance.py", line 3162, in schedule_downstream_tasks
    partial_dag = task.dag.partial_subset(
  File "/opt/airflow/airflow/models/dag.py", line 2476, in partial_subset
    dag.task_dict = {
  File "/opt/airflow/airflow/models/dag.py", line 2477, in <dictcomp>
    t.task_id: _deepcopy_task(t)
  File "/opt/airflow/airflow/models/dag.py", line 2474, in _deepcopy_task
    return copy.deepcopy(t, memo)
  File "/usr/local/lib/python3.8/copy.py", line 153, in deepcopy
    y = copier(memo)
  File "/opt/airflow/airflow/models/baseoperator.py", line 1215, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/local/lib/python3.8/copy.py", line 161, in deepcopy
    rv = reductor(4)
TypeError: cannot pickle 'module' object
[2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution:
        <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
[2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks
[2023-10-31T10:21:52.290+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state:
        <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [scheduled]>
[2023-10-31T10:21:52.292+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task pickle_error because previous state change time has not been saved
[2023-10-31T10:21:52.292+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
[2023-10-31T10:21:52.293+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:21:52.295+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:21:51.102283+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:21:52.298+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state failed for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1)
[2023-10-31T10:21:52.305+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, run_start_date=2023-10-31 10:21:51.573970+00:00, run_end_date=2023-10-31 10:21:51.655716+00:00, run_duration=0.081746, state=success, executor_state=failed, try_number=1, max_tries=0, job_id=27, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-10-31 10:21:51.193015+00:00, queued_by_job_id=26, pid=300
[2023-10-31T10:21:52.330+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py
Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:21:51.102283+00:00/task_id=pickle_error permission to 509
[2023-10-31T10:21:52.452+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:21:51.102283+00:00 [queued]> on host ef12b15f26fd
[2023-10-31T10:21:53.357+0000] {dagrun.py:729} INFO - Marking run <DagRun test_docker_task_error @ 2023-10-31 10:21:51.102283+00:00: manual__2023-10-31T10:21:51.102283+00:00, state:running, queued_at: 2023-10-31 10:21:51.110909+00:00. externally triggered: True> successful
[2023-10-31T10:21:53.357+0000] {dagrun.py:780} INFO - DagRun Finished: dag_id=test_docker_task_error, execution_date=2023-10-31 10:21:51.102283+00:00, run_id=manual__2023-10-31T10:21:51.102283+00:00, run_start_date=2023-10-31 10:21:51.176241+00:00, run_end_date=2023-10-31 10:21:53.357481+00:00, run_duration=2.18124, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-10-31 10:21:51.102283+00:00, data_interval_end=2023-10-31 10:21:51.102283+00:00, dag_hash=ecb0232f1acdec9dce7b278087864bff
[2023-10-31T10:21:53.362+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:21:51.102283+00:00', try_number=1, map_index=-1)
[2023-10-31T10:21:53.364+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, run_id=manual__2023-10-31T10:21:51.102283+00:00, map_index=-1, run_start_date=2023-10-31 10:21:52.502395+00:00, run_end_date=2023-10-31 10:21:53.228269+00:00, run_duration=0.725874, state=success, executor_state=success, try_number=1, max_tries=0, job_id=28, pool=default_pool, queue=default, priority_weight=1, operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 10:21:52.291449+00:00, queued_by_job_id=26, pid=302

new

[2023-10-31T10:17:12.335+0000] {executor_loader.py:117} INFO - Loaded executor: LocalExecutor
[2023-10-31 10:17:12 +0000] [369] [INFO] Starting gunicorn 21.2.0
[2023-10-31 10:17:12 +0000] [369] [INFO] Listening at: http://[::]:8793 (369)
[2023-10-31 10:17:12 +0000] [369] [INFO] Using worker: sync
[2023-10-31 10:17:12 +0000] [370] [INFO] Booting worker with pid: 370
[2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:803} INFO - Starting the scheduler
[2023-10-31T10:17:12.363+0000] {scheduler_job_runner.py:810} INFO - Processing each file at most -1 times
[2023-10-31T10:17:12.411+0000] {manager.py:169} INFO - Launched DagFileProcessorManager with pid: 419
[2023-10-31T10:17:12.412+0000] {scheduler_job_runner.py:1611} INFO - Adopting or resetting orphaned tasks for active dag runs
[2023-10-31 10:17:12 +0000] [481] [INFO] Booting worker with pid: 481
[2023-10-31T10:17:12.466+0000] {settings.py:61} INFO - Configured default timezone Timezone('UTC')
[2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution:
        <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
[2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks
[2023-10-31T10:17:19.805+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state:
        <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
[2023-10-31T10:17:19.806+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task no_error because previous state change time has not been saved
[2023-10-31T10:17:19.806+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default
[2023-10-31T10:17:19.806+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:17:19.807+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'no_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:17:19.833+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py
Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=no_error permission to 509
[2023-10-31T10:17:20.003+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.no_error manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857
[2023-10-31T10:17:20.888+0000] {scheduler_job_runner.py:419} INFO - 1 tasks up for execution:
        <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
[2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:482} INFO - DAG test_docker_task_error has 0/16 running and queued tasks
[2023-10-31T10:17:20.889+0000] {scheduler_job_runner.py:598} INFO - Setting the following tasks to queued state:
        <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [scheduled]>
[2023-10-31T10:17:20.890+0000] {taskinstance.py:2177} WARNING - cannot record scheduled_duration for task pickle_error because previous state change time has not been saved
[2023-10-31T10:17:20.890+0000] {scheduler_job_runner.py:641} INFO - Sending TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1) to executor with priority 1 and queue default
[2023-10-31T10:17:20.891+0000] {base_executor.py:146} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:17:20.892+0000] {local_executor.py:89} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'test_docker_task_error', 'pickle_error', 'manual__2023-10-31T10:17:19.415567+00:00', '--local', '--subdir', 'DAGS_FOLDER/test_dag.py']
[2023-10-31T10:17:20.895+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='no_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1)
[2023-10-31T10:17:20.900+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=no_error, run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, run_start_date=2023-10-31 10:17:20.059029+00:00, run_end_date=2023-10-31 10:17:20.140250+00:00, run_duration=0.081221, state=success, executor_state=success, try_number=1, max_tries=0, job_id=22, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2023-10-31 10:17:19.805657+00:00, queued_by_job_id=21, pid=519
[2023-10-31T10:17:20.924+0000] {dagbag.py:538} INFO - Filling up the DagBag from /files/dags/test_dag.py
Changing /root/airflow/logs/dag_id=test_docker_task_error/run_id=manual__2023-10-31T10:17:19.415567+00:00/task_id=pickle_error permission to 509
[2023-10-31T10:17:21.046+0000] {task_command.py:421} INFO - Running <TaskInstance: test_docker_task_error.pickle_error manual__2023-10-31T10:17:19.415567+00:00 [queued]> on host c482888bf857
[2023-10-31T10:17:21.949+0000] {dagrun.py:729} INFO - Marking run <DagRun test_docker_task_error @ 2023-10-31 10:17:19.415567+00:00: manual__2023-10-31T10:17:19.415567+00:00, state:running, queued_at: 2023-10-31 10:17:19.429351+00:00. externally triggered: True> successful
[2023-10-31T10:17:21.950+0000] {dagrun.py:780} INFO - DagRun Finished: dag_id=test_docker_task_error, execution_date=2023-10-31 10:17:19.415567+00:00, run_id=manual__2023-10-31T10:17:19.415567+00:00, run_start_date=2023-10-31 10:17:19.789176+00:00, run_end_date=2023-10-31 10:17:21.950050+00:00, run_duration=2.160874, state=success, external_trigger=True, run_type=manual, data_interval_start=2023-10-31 10:17:19.415567+00:00, data_interval_end=2023-10-31 10:17:19.415567+00:00, dag_hash=ecb0232f1acdec9dce7b278087864bff
[2023-10-31T10:17:21.954+0000] {scheduler_job_runner.py:691} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test_docker_task_error', task_id='pickle_error', run_id='manual__2023-10-31T10:17:19.415567+00:00', try_number=1, map_index=-1)
[2023-10-31T10:17:21.955+0000] {scheduler_job_runner.py:728} INFO - TaskInstance Finished: dag_id=test_docker_task_error, task_id=pickle_error, run_id=manual__2023-10-31T10:17:19.415567+00:00, map_index=-1, run_start_date=2023-10-31 10:17:21.098075+00:00, run_end_date=2023-10-31 10:17:21.852766+00:00, run_duration=0.754691, state=success, executor_state=success, try_number=1, max_tries=0, job_id=23, pool=default_pool, queue=default, priority_weight=1, operator=_DockerDecoratedOperator, queued_dttm=2023-10-31 10:17:20.889633+00:00, queued_by_job_id=21, pid=521
[2023-10-31 10:18:35 +0000] [369] [INFO] Handling signal: winch

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg
Copy link

boring-cyborg bot commented Oct 31, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@potiuk
Copy link
Member

potiuk commented Oct 31, 2023

Can you please add unit tests for it showing the case? The fact that the tests are passing before and after the change mean that we missed a test case for it.

@phi-friday
Copy link
Contributor Author

Can you please add unit tests for it showing the case? The fact that the tests are passing before and after the change mean that we missed a test case for it.

I want to add a test, but I don't know exactly what case I should add a test for. What I found is that when some dag define a task_dict attribute, I get an error in the part where dag run deepcopy.

Should I add a test that runs deepcopy the same way but verifies that no error occurs?
(in tests/providers/docker/decorators/test_docker.py)

@phi-friday phi-friday force-pushed the fix-docker-decorator-operator branch from a91a123 to ab2af83 Compare October 31, 2023 13:42
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Thanks a lot for diagnosis and fix @phi-friday !

@potiuk potiuk merged commit 1c9d1c2 into apache:main Oct 31, 2023
44 checks passed
Copy link

boring-cyborg bot commented Oct 31, 2023

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@phi-friday
Copy link
Contributor Author

@potiuk Thank you for guiding me. Have a good day.

@phi-friday phi-friday deleted the fix-docker-decorator-operator branch October 31, 2023 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants