Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery tasks stuck in queued state after worker crash (Set changed size during iteration) #29833

Closed
1 of 2 tasks
hterik opened this issue Mar 1, 2023 · 7 comments
Closed
1 of 2 tasks
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report

Comments

@hterik
Copy link
Contributor

hterik commented Mar 1, 2023

Apache Airflow version

2.5.0

What happened

  1. Task was started and sent to celery by scheduler
  2. Celery worker picked up the task
  3. Celery worker lost database connection
  4. Celery worker crashes with error below (RuntimeError: Set changed size during iteration)
  5. Task is stuck in queued state for over 14 hours.

Following log and screenshot shows a more recent example of the situation above, where 5) has not reached 14h yet. Though we've observed a few such situations recently.


[2023-03-01 09:55:27,163: INFO/MainProcess] Task airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138] received
[2023-03-01 09:55:27,364: INFO/ForkPoolWorker-1] [6e43c201-b325-4538-ae94-3cf9a583f138] Executing command in Celery: ['airflow', 'tasks', 'run', XXXXX
[2023-03-01 09:55:28,263: INFO/ForkPoolWorker-1] Filling up the DagBag from /XXXXX
[2023-03-01 09:55:30,265: INFO/ForkPoolWorker-1] Running <TaskInstance: XXXXX [queued]> on host worker4
[2023-03-01 10:00:23,487: ERROR/ForkPoolWorker-1] [6e43c201-b325-4538-ae94-3cf9a583f138] Failed to execute task (psycopg2.OperationalError) connection to server at "XXXXXXXXX failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/14/e3q8).
Traceback (most recent call last):
  File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "sqlalchemy/pool/base.py", line 325, in connect
    return _ConnectionFairy._checkout(self)
  File "sqlalchemy/pool/base.py", line 888, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "sqlalchemy/pool/base.py", line 491, in checkout
    rec = pool._do_get()
  File "sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "sqlalchemy/pool/base.py", line 271, in _create_connection
    return _ConnectionRecord(self)
  File "sqlalchemy/pool/base.py", line 386, in __init__
    self.__connect()
  File "sqlalchemy/pool/base.py", line 684, in __connect
    with util.safe_reraise():
  File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/pool/base.py", line 680, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "xxxxxx port 5432 failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "airflow/executors/celery_executor.py", line 130, in _execute_in_fork
    args.func(args)
  File "airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "airflow/utils/cli.py", line 108, in wrapper
    return f(*args, **kwargs)
  File "airflow/cli/commands/task_command.py", line 396, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "airflow/cli/commands/task_command.py", line 193, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "airflow/cli/commands/task_command.py", line 252, in _run_task_by_local_task_job
    run_job.run()
  File "airflow/jobs/base_job.py", line 259, in run
    session.merge(self)
  File "sqlalchemy/orm/session.py", line 3051, in merge
    return self._merge(
  File "sqlalchemy/orm/session.py", line 3131, in _merge
    merged = self.get(
  File "sqlalchemy/orm/session.py", line 2848, in get
    return self._get_impl(
  File "sqlalchemy/orm/session.py", line 2970, in _get_impl
    return db_load_fn(
  File "sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "sqlalchemy/orm/session.py", line 1713, in execute
    conn = self._connection_for_bind(bind)
  File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
    return self._transaction._connection_for_bind(
  File "sqlalchemy/orm/session.py", line 747, in _connection_for_bind
    conn = bind.connect()
  File "sqlalchemy/engine/base.py", line 3315, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "sqlalchemy/engine/base.py", line 96, in __init__
    else engine.raw_connection()
  File "sqlalchemy/engine/base.py", line 3394, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
    Connection._handle_dbapi_exception_noconnection(
  File "sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
    util.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
    return fn()
  File "sqlalchemy/pool/base.py", line 325, in connect
    return _ConnectionFairy._checkout(self)
  File "sqlalchemy/pool/base.py", line 888, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "sqlalchemy/pool/base.py", line 491, in checkout
    rec = pool._do_get()
  File "sqlalchemy/pool/impl.py", line 256, in _do_get
    return self._create_connection()
  File "sqlalchemy/pool/base.py", line 271, in _create_connection
    return _ConnectionRecord(self)
  File "sqlalchemy/pool/base.py", line 386, in __init__
    self.__connect()
  File "sqlalchemy/pool/base.py", line 684, in __connect
    with util.safe_reraise():
  File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "sqlalchemy/pool/base.py", line 680, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
  File "sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "sqlalchemy/engine/default.py", line 598, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "XXXX, port 5432 failed: Connection timed out
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2023-03-01 10:00:23,705: ERROR/ForkPoolWorker-1] Task airflow.executors.celery_executor.execute_command[6e43c201-b325-4538-ae94-3cf9a583f138] raised unexpected: AirflowException('Celery command failed on host: worker4 with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138')
Traceback (most recent call last):
  File "celery/app/trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "celery/app/trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "airflow/executors/celery_executor.py", line 96, in execute_command
    _execute_in_fork(command_to_exec, celery_task_id)
  File "airflow/executors/celery_executor.py", line 111, in _execute_in_fork
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Celery command failed on host: worker4 with celery_task_id 6e43c201-b325-4538-ae94-3cf9a583f138
[2023-03-01 10:00:24,109: CRITICAL/MainProcess] Unrecoverable error: RuntimeError('Set changed size during iteration')
Traceback (most recent call last):
  File "celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "celery/worker/consumer/consumer.py", line 628, in start
    c.loop(*c.loop_args())
  File "celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "kombu/asynchronous/hub.py", line 294, in create_loop
    for tick_callback in on_tick:
RuntimeError: Set changed size during iteration
[2023-03-01 10:00:25 +0000] [14] [INFO] Handling signal: term
[2023-03-01 10:00:25 +0000] [15] [INFO] Worker exiting (pid: 15)
[2023-03-01 10:00:25 +0000] [16] [INFO] Worker exiting (pid: 16)
[2023-03-01 10:00:25 +0000] [14] [INFO] Shutting down: Master

image

What you think should happen instead

A. Celery worker should reconnect to the database in case of intermittent network errors
B. In case of unrecoverable errors, scheduler should eventually retry or fail the task.

How to reproduce

Difficult, happens intermittently.

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

apache-airflow==2.5.0
apache-airflow-providers-celery==3.1.0
redis==3.5.3
celery==5.2.7
kombu==5.2.4

Deployment

Other Docker-based deployment

Deployment details

airflow.cfg, celery options

[celery]
worker_concurrency = 1
worker_prefetch_multiplier = 1
worker_autoscale = 1,1
celery_config_options = ...see below

[celery_broker_transport_options]
socket_connect = 240
socket_keepalive = True
socket_connect_timeout = 240
retry_on_timeout = True
celery_config_options = {
    **airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG,
    "result_backend_always_retry": True,
    "result_backend_max_retries": 20,
    "redis_socket_keepalive": True,
    "redis_retry_on_timeout": True,
    "redis_socket_connect_timeout": 240,
    "worker_deduplicate_successful_tasks": True,
}

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@hterik hterik added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 1, 2023
@hterik
Copy link
Contributor Author

hterik commented Mar 2, 2023

After digging more i found following possibly related issues:
#28022
#28120
#23690

@hussein-awala
Copy link
Member

Can you extract the ti state from the DB for a task stuck in queued state?

SELECT *
FROM task_instance
WHERE dag_id='<dag id>' AND task_id='<task id>' AND execution_date='<execution date>'

@hussein-awala hussein-awala added pending-response and removed needs-triage label for new issues that we didn't triage yet labels Mar 3, 2023
@hterik
Copy link
Contributor Author

hterik commented Mar 3, 2023

@hussein-awala here is the db extract
Note it's a different instance from the original log, but this also produces same type of failing log on the worker (set changed size during iteration)

task_id     |dag_id                |run_id                              |start_date|end_date|duration|state |try_number|hostname|unixname|job_id|pool        |queue           |priority_weight|operator                 |queued_dttm                  |pid|max_tries|executor_config|pool_slots|queued_by_job_id|external_executor_id                |trigger_id|trigger_timeout|next_method|next_kwargs|map_index|updated_at                   |
------------+----------------------+------------------------------------+----------+--------+--------+------+----------+--------+--------+------+------------+----------------+---------------+-------------------------+-----------------------------+---+---------+---------------+----------+----------------+------------------------------------+----------+---------------+-----------+-----------+---------+-----------------------------+
test_device0|xxxxxxxx              |scheduled__2023-03-02T22:00:00+00:00|          |        |        |queued|         0|        |airflow |      |default_pool|yyyyy           |              1|PythonOperatorInWorkspace|2023-03-03 00:00:01.210 +0100|   |        0|  } .          |         1|          196211|5a2f3e09-5a73-46c7-a6a6-4f3c09431aae|          |               |           |           |       -1|2023-03-03 00:00:02.394 +0100|

@hussein-awala
Copy link
Member

I think this can happen when the Airflow worker lost completely the access to the Metadata, where in this case it will not be able to update the task state, and if we use it as a result backend for Celery commands, then the Celery workers will not be able to send events to the DB which the scheduler processes to re-queue the failed tasks.

I'm not sure if we check for workers heartbeats, where I didn't find this check in the method _process_executor_events, @potiuk can you please confirm this or send a link to the part which checks if the worker is still alive?

@potiuk
Copy link
Member

potiuk commented Mar 5, 2023

I'm not sure if we check for workers heartbeats, where I didn't find this check in the method _process_executor_events, @potiuk can you please confirm this or send a link to the part which checks if the worker is still alive?

I am not THAT knowledgeable about this part, so take it with a grain of salt, so let me explain how I understand what's going on. @ephraimbuddy @ashb - maybe you can take a look and confirm if my understanding is correct/wrong?

Everything related to managing celery task state happens in the Celery Executor.
I don't think we are monitoring workers in any way. Each executors monitors tasks for their state and either see if they have been stalled or whether they need adoption (when they were monitored in another executor).

Eventually - if the task does not update its state (when for example worker crashed, then it should be rescheduled as stalled (by own executor) or adopted (by another one). That's how much details I know from the top of my head.

There are a few race conditions that might occur (no distributed system is ever fool proof) and I think the original design is that eventually even if a very nasty race condition happens, the tasks will eventually be rescheduled.

@eladkal eladkal added area:Scheduler including HA (high availability) scheduler affected_version:2.5 Issues Reported for 2.5 labels Apr 15, 2023
Copy link

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report
Projects
None yet
Development

No branches or pull requests

4 participants