You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
the program raise exception:
[2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
self._execute_helper()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1453, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1515, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 155, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 803, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "", line 2, in put
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
But, if I remove executor_config in default_args of Dag, the program is working!!!
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
)
why???????
please help me!!!!
The text was updated successfully, but these errors were encountered:
when set dag use KubernetesExecutor with:
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
'executor_config': {
'KubernetesExecutor': {
'request_cpu': "200m",
'limit_cpu': "200m",
'request_memory': "500Mi",
'limit_memory': "500Mi"
}
}
},
)
the program raise exception:
[2020-12-10 10:02:08,142] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
self._execute_helper()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1453, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1515, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 155, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/home/aicv/.jumbo/lib/python3.6/site-packages/airflow/executors/kubernetes_executor.py", line 803, in execute_async
self.task_queue.put((key, command, kube_executor_config))
File "", line 2, in put
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/managers.py", line 756, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/aicv/.jumbo/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
But, if I remove executor_config in default_args of Dag, the program is working!!!
dag = DAG(
"example_using_k8s_executor_new",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "zhangxiao",
"depends_on_past": False,
"start_date": datetime(2020, 12, 5),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
)
why???????
please help me!!!!
The text was updated successfully, but these errors were encountered: