Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task stuck in upstream_failed #18011

Closed
1 task done
Tonkonozhenko opened this issue Sep 3, 2021 · 19 comments
Closed
1 task done

Task stuck in upstream_failed #18011

Tonkonozhenko opened this issue Sep 3, 2021 · 19 comments
Labels
affected_version:2.1 Issues Reported for 2.1 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug

Comments

@Tonkonozhenko
Copy link
Contributor

Tonkonozhenko commented Sep 3, 2021

Apache Airflow version

2.1.3 (latest released)

Operating System

Debian

Versions of Apache Airflow Providers

No response

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

  1. task prepare_timestamps failed with unknown k8s-related issue (most likely pod was killed)
  2. task prepare_timestamps succeeded on the second try
  3. Downstream task create_athena_partition stuck in upstream_failed state. Also, we experienced the same issue in 2.1.2 and then the task stuck in the queued state.

image

What you expected to happen

The task should not be stuck

How to reproduce

No response

Anything else

No response

Code of Conduct

@Tonkonozhenko Tonkonozhenko added area:core kind:bug This is a clearly a bug labels Sep 3, 2021
@rpfernandezjr
Copy link

I just recently upgraded to 2.1.3, and I'm seeing the same thing.

on some cases, upstream tasks succeeds, but the downstream tasks are stuck in an upstream_failed state.

@ephraimbuddy
Copy link
Contributor

Related: #16625
PR trying to address this: #17819

@eladkal eladkal added area:Scheduler including HA (high availability) scheduler affected_version:2.1 Issues Reported for 2.1 and removed area:core labels Sep 4, 2021
@ephraimbuddy
Copy link
Contributor

Following the discussions at #17819 , you applied the fix then in your DAG above, prepare_timestamps failed and retried but create_athena_partition is stuck in up_for_retry?

@ephraimbuddy
Copy link
Contributor

@Tonkonozhenko Do you have wait_for_downstream args set?

@WattsInABox
Copy link

We're also seeing this. Most of our DAGs have well under 100 tasks, a few just under 200 tasks, 673 active DAGs, 179 paused DAGs. We do not use wait_for_downstream anywhere.

We started seeing this after upgrading to 2.1.3 which we upgraded to specifically get the bug fix PR #16301, not sure if that bug might be related since we seem to be having weird status issues all over Airflow...

We see this in all manner of DAGs, some with a very linear path, some that branch into 100 tasks and then back to 1, others with 2 pre-requisite tasks into the final task.

Behavior:

  • upstream tasks all successful
  • downstream task(s) marked as upstream_failed
  • sometimes an upstream task will have a previous run marked as failed but then it retries as successful, almost as if the downstream tasks get marked as upstream_failed on that run but then don't get cleared for the subsequent retry. But this does not always happen: we have seen multiple dagruns a night have upstream_failed tasks where all tasks prior worked on their first attempt (or at least only have logs for 1 attempt).

Please advise on what other information we can provide.

@ephraimbuddy
Copy link
Contributor

@WattsInABox. If you can get scheduler logs when this happens, that would be very helpful.

@Tonkonozhenko
Copy link
Contributor Author

@ephraimbuddy, @WattsInABox perfectly explained what happens. We have the completely same situation.

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Sep 17, 2021

@Tonkonozhenko @WattsInABox Do you see FATAL: sorry, too many clients already. in your scheduler logs when this happens?

If there’s reproducible step please share

@Tonkonozhenko
Copy link
Contributor Author

@ephraimbuddy unfortunately, I don't have 2.1.3 logs now, but for 2.1.2 no such error and no fatal errors at all

@WattsInABox
Copy link

Trying to get to a reproducible step here...

Is there an existing "unit" test (or could you help me write a unit test) for:

  1. A -> B dag
  2. A set to fail with retries more than 1

And then see if the failure & retry handlers do what I think they're doing? That is:

  1. A set to failed
  2. B set to upstream_failed
  3. A retries
  4. B is untouched
  5. A succeeds
  6. B left in upstream_failed

@taylorfinnell
Copy link

taylorfinnell commented Sep 18, 2021

Hi @ephraimbuddy - I work with @WattsInABox. We don't see FATAL: sorry, too many clients already. but we do see:

Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/base_job.py", line 202, in heartbeat
    session.merge(self)
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 2166, in merge
    return self._merge(
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 2244, in _merge
    merged = self.query(mapper.class_).get(key[1])
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 1018, in get
    return self._get_impl(ident, loading.load_on_pk_identity)

....

psycopg2.OperationalError: could not connect to server: Connection timed out

This causes the job to be SIGTERM'ed (most of the time, it seems). The tasks will now retry since we have #16301, and will eventually succeed. Sometimes it is SIGTERM'ed 5 times or more before success - which is not ideal for tasks that take an hour plus. I suspect also at times this results in the downstream tasks being set to upstream_failed when in fact the upstream is all successful - but I can't prove it.

We tried to bump the AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC to 60 to maybe ease up on hitting the database with no luck. This error also happens when only a couple DAGs are running so there is not much load on our nodes or the database. We don't think it's a networking issue.

Our pool sqlalchemy pool size is 350, this might be high - but my understanding is the pool does not create connections until they are needed, and according to AWS monitoring the max connections we ever hit at peak time is ~300-370 which should be totally manageable on our db.m6g.4xlarge instance. However, if it's a 350 pool for each worker and each worker opens tons of connections that are then alive in the pool - perhaps we are exhausting PG memory

Do you have any additional advice on things to try?

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Sep 19, 2021

Trying to get to a reproducible step here...

Is there an existing "unit" test (or could you help me write a unit test) for:

  1. A -> B dag
  2. A set to fail with retries more than 1

And then see if the failure & retry handlers do what I think they're doing? That is:

  1. A set to failed
  2. B set to upstream_failed
  3. A retries
  4. B is untouched
  5. A succeeds
  6. B left in upstream_failed

It's not supposed to set B to upstream_failed if A has retries. What I believe happened is that the executor reported that A has failed but A is still queued in Scheduler. Currently, A is failed directly which we are trying to fix at #17819.

You can temporarily add a patch that removes this two lines:

self.log.info('Setting task instance %s state to %s as reported by executor', ti, state)
ti.set_state(state)

and wait for #17819 to be fixed.

EDIT
Since you’re getting SIGTERM as explained by @taylorfinnel, this seems related to pending pod timeout deletion. Increase this interval worker_pods_pending_timeout

@ephraimbuddy
Copy link
Contributor

Hi @ephraimbuddy - I work with @WattsInABox. We don't see FATAL: sorry, too many clients already. but we do see:

Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.8/site-packages/airflow/jobs/base_job.py", line 202, in heartbeat
    session.merge(self)
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 2166, in merge
    return self._merge(
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 2244, in _merge
    merged = self.query(mapper.class_).get(key[1])
  File "/opt/app-root/lib64/python3.8/site-packages/sqlalchemy/orm/query.py", line 1018, in get
    return self._get_impl(ident, loading.load_on_pk_identity)

....

psycopg2.OperationalError: could not connect to server: Connection timed out

This causes the job to be SIGTERM'ed (most of the time, it seems). The tasks will now retry since we have #16301, and will eventually succeed. Sometimes it is SIGTERM'ed 5 times or more before success - which is not ideal for tasks that take an hour plus. I suspect also at times this results in the downstream tasks being set to upstream_failed when in fact the upstream is all successful - but I can't prove it.

We tried to bump the AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC to 60 to maybe ease up on hitting the database with no luck. This error also happens when only a couple DAGs are running so there is not much load on our nodes or the database. We don't think it's a networking issue.

Our pool sqlalchemy pool size is 350, this might be high - but my understanding is the pool does not create connections until they are needed, and according to AWS monitoring the max connections we ever hit at peak time is ~300-370 which should be totally manageable on our db.m6g.4xlarge instance. However, if it's a 350 pool for each worker and each worker opens tons of connections that are then alive in the pool - perhaps we are exhausting PG memory

Do you have any additional advice on things to try?

In 2.1.4 we added some limits( to the number of queued dagruns the scheduler can create and I'm suspecting that the issue we have on database connections will go with it. I was having FATAL: sorry, too many clients already. db error until the queued dagruns was limited in this PR #18065.

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Sep 20, 2021

@taylorfinnell , I will suggest you to increase the value of this configuration worker_pods_pending_timeout, not sure if it’ll resolve it but it’s also connected with sending SIGTERM to task runner because pods are deleted by it.

@taylorfinnell
Copy link

Thanks! It seems to me that setting is specific to the k8s executor - but we are using the CeleryExecutor

@kaxil
Copy link
Member

kaxil commented Sep 20, 2021

@Tonkonozhenko @taylorfinnell

psycopg2.OperationalError: could not connect to server: Connection timed out

That ERROR basically says it can't connect to metadata DB -- Where do you have your Metadata DB?

@WattsInABox
Copy link

Our metadata DB is in AWS and is a db.4xlarge that mostly looks like its chilling out doing nothing every day. The most action we see is spikes to 350 connections (there's enough RAM for 1750 connections). We're working on weeding out if the spikes are causing issues, but IMHO Airflow should not be falling over in the heartbeats b/c of a first-time missed connection. There should be some intelligent retry logic in the heartbeats...

@kaxil
Copy link
Member

kaxil commented Sep 20, 2021

Our metadata DB is in AWS and is a db.4xlarge that mostly looks like its chilling out doing nothing every day. The most action we see is spikes to 350 connections (there's enough RAM for 1750 connections). We're working on weeding out if the spikes are causing issues, but IMHO Airflow should not be falling over in the heartbeats b/c of a first-time missed connection. There should be some intelligent retry logic in the heartbeats...

Indeed, we do have some retries in few place, this might not be the one and needs improving. Does this error occur without those network blips / DB connectivity issues?

Can someone comments steps to reproduce please

@potiuk
Copy link
Member

potiuk commented Sep 20, 2021

IMHO Airflow should not be falling over in the heartbeats b/c of a first-time missed connection. There should be some intelligent retry logic in the heartbeats...

Actually I do not agree with that statement.

Airflow should rely on the metadata database being available at all times and loosing connectivity in the middle of transaction should not be handled by Airflow. That adds terrible complexity to your code and IMHO is not needed to deal with this kind of (apparent) instabilities of connectivity. Especially that it is a timeout on trying to connect to the database. In case of SQLAlchemy and ORM database level we often do not have control on when your session and connection is going to be established and trying to handle all such failures on application level is complex

AND also it is not needed on application level - especially in case of Postgres. For quite some time (and also in our Helm Chart - for a long time we recommend everyone using Postgres to use PGBouncer as a proxy to your Postgres database. It deals nicely also with a number of connections open (Postgres is not good in handling many parallel connections - it's connection model is process based and thus it is resource hungry when there are many connections opened)

PGBouncer does not only handle managing of connections pools shared between components, but also allows to react on similar network connection conditions - first of all, it will reuse existing connections, so there will be far less connection open/close events between PGBouncer and the Database. All the connections opened by airflow will go to locally available PGBouncer which will make them toally resilient to networking issue. Then PGBouncer will handle errors which you can fine-tune if you have connectivity problems to your database.

@WattsInABox - can you please add PGBouncer (s) to your deployment and let us know if that improved the situation. I think this is not even a workaround - it's actually a good solution (which we generally recommend for any deployment with Postgres).

I will convert it into discussion until we hear back from you - with your experiences with PGBouncer and if those problems are still occuring after you get PGBouncer running, with some reproducible case.

@apache apache locked and limited conversation to collaborators Sep 20, 2021
@potiuk potiuk closed this as completed Sep 20, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
affected_version:2.1 Issues Reported for 2.1 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

8 participants