Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify query for orphaned tasks #36566

Merged

Conversation

dstandish
Copy link
Contributor

@dstandish dstandish commented Jan 3, 2024

This query runs periodically (controlled by orphaned_tasks_check_interval setting).

We observed that it can take around a minute to run for very large TI tables. While looking into that issue, I noticed this query was more complicated than it needed to be. This PR simplifies the query, though it may not meaningfully help performance.

Two changes here.

First, previously we ended up with two joins to DagRun because the dag_run relationship attr is lazy="joined" and sqlalchemy was not using it. By setting to be lazy, we eliminate the extra join and we also don't ask for any columns in dag run (previously the generated query asked for all of them, even though we try to limit via options further down).

Second, we use inner join for queued by job. It seems that the outer join was there when this adoption logic was first added in #10729, and the comment added at the time indicated that it was only there to handle tasks in flight during upgrade to 2.0:

                   # outerjoin is because we didn't use to have queued_by_job
                   # set, so we need to pick up anything pre upgrade. This (and the
                   # "or queued_by_job_id IS NONE") can go as soon as scheduler HA is
                   # released.

Before:

SELECT
    task_instance.task_id,
    task_instance.dag_id,
    task_instance.run_id,
    task_instance.map_index,
    dag_run_1.state,
    dag_run_1.id,
    dag_run_1.dag_id AS dag_id_1,
    dag_run_1.queued_at,
    dag_run_1.execution_date,
    dag_run_1.start_date,
    dag_run_1.end_date,
    dag_run_1.run_id AS run_id_1,
    dag_run_1.creating_job_id,
    dag_run_1.external_trigger,
    dag_run_1.run_type,
    dag_run_1.conf,
    dag_run_1.data_interval_start,
    dag_run_1.data_interval_end,
    dag_run_1.last_scheduling_decision,
    dag_run_1.dag_hash,
    dag_run_1.log_template_id,
    dag_run_1.updated_at,
    dag_run_1.clear_number
FROM task_instance
LEFT OUTER JOIN job ON job.id = task_instance.queued_by_job_id
JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id
JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE
    task_instance.state IN (__[postcompile_state_1])
    AND (task_instance.queued_by_job_id IS NULL OR job.state != :state_2)
    AND dag_run.run_type != :run_type_1
    AND dag_run.state = :state_3

After:

SELECT
    task_instance.task_id,
    task_instance.dag_id,
    task_instance.run_id,
    task_instance.map_index
FROM task_instance
JOIN job ON job.id = task_instance.queued_by_job_id
JOIN dag_run ON dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id
WHERE
    task_instance.state IN (__[postcompile_state_1])
    AND job.state != :state_2
    AND dag_run.run_type != :run_type_1
    AND dag_run.state = :state_3

Two changes here.

First, previously we ended up with two joins to DagRun because the dag_run relationship attr is `lazy="joined"` and sqlalchemy was not using it.  By setting to be lazy, we eliminate the extra join and we also don't ask for any columns in dag run (previously the generated query asked for all of them, even though we try to limit via options further down).

Second, we use inner join for queued by job.  The outer was only there to handle tasks in flight during upgrade to 2.0.
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jan 3, 2024
Copy link
Collaborator

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good optimization. I see related test cases are failing. It would be great if you can fix them.

Copy link
Collaborator

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dstandish
Copy link
Contributor Author

can i get an amen @jedcunningham @uranusjr @ephraimbuddy

@dstandish dstandish merged commit 63e93d7 into apache:main Jan 22, 2024
53 checks passed
@dstandish dstandish deleted the simplify-adopt-or-reset-orphaned-tasks-query branch January 22, 2024 18:30
flacode pushed a commit to flacode/airflow that referenced this pull request Jan 22, 2024
Two changes here.

First, previously we ended up with two joins to DagRun because the dag_run relationship attr is `lazy="joined"` and sqlalchemy was not using it.  By setting to be lazy, we eliminate the extra join and we also don't ask for any columns in dag run (previously the generated query asked for all of them, even though we try to limit via options further down).

Second, we use inner join for queued by job.  The outer was only there to handle tasks in flight during upgrade to 2.0.
@eladkal eladkal added this to the Airflow 2.9.0 milestone Jan 22, 2024
@eladkal eladkal added the type:improvement Changelog: Improvements label Jan 22, 2024
abhishekbhakat pushed a commit to abhishekbhakat/my_airflow that referenced this pull request Mar 5, 2024
Two changes here.

First, previously we ended up with two joins to DagRun because the dag_run relationship attr is `lazy="joined"` and sqlalchemy was not using it.  By setting to be lazy, we eliminate the extra join and we also don't ask for any columns in dag run (previously the generated query asked for all of them, even though we try to limit via options further down).

Second, we use inner join for queued by job.  The outer was only there to handle tasks in flight during upgrade to 2.0.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler type:improvement Changelog: Improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants