-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor dangling row check to use SQLA queries #19808
Refactor dangling row check to use SQLA queries #19808
Conversation
a1ee16a
to
cfdb1bb
Compare
Honestly I feel this is a massive degrade on readability, but I guess that’s more or less how ORMs work in general to balance between code duplication and hairy string manipulation.
Including the leading whitespace? That’s… uh. |
def _from_name(from_) -> str: | ||
if isinstance(from_, Join): | ||
return str(from_.compile(bind=bind)) | ||
return str(from_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLAlchemy really has a lot of rough edges in unexpected places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this one wasn't even the worse -- the .whereclause ... except ._whereclause
annoyed me more.
from sqlalchemy import column, select, table | ||
from sqlalchemy.sql.selectable import Join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not import globally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not strictly required, but this is in the hot import path for airflow
so I want to minimize the global imports in this file
Yeah, I kind of agree, but to add the XCom migration check (which I've already got written, just not in this PR I had to make a change like this: - source_to_dag_run_join_cond = and_(
- source_table.c.dag_id == dagrun_table.c.dag_id,
- source_table.c.execution_date == dagrun_table.c.execution_date,
- )
- invalid_rows_query = (
- session.query(source_table.c.dag_id, source_table.c.task_id, source_table.c.execution_date)
- .select_from(outerjoin(source_table, dagrun_table, source_to_dag_run_join_cond))
- .filter(dagrun_table.c.dag_id.is_(None))
+ if "task_id" in fk_target.columns:
+ join_condition &= to_migrate.c.task_id == fk_target.c.task_id
+ query = session.query(
+ to_migrate.c.dag_id, to_migrate.c.task_id, to_migrate.c.execution_date, to_migrate.c.task_id
+ )
+ else:
+ query = session.query(to_migrate.c.dag_id, to_migrate.c.task_id, to_migrate.c.execution_date)
+
+ if "execution_date" in fk_target.columns:
+ join_target = fk_target
+ join_condition &= to_migrate.c.execution_date == fk_target.c.execution_date
+ else:
+ # Target Table doesn't have execution_date column (anymore?) i.e. TaskInstance after 2.2.0
+ join_target = fk_target.join(
+ dagrun_table,
+ and_(
+ dagrun_table.c.dag_id == fk_target.c.dag_id, dagrun_table.c.run_id == fk_target.c.run_id
+ ),
+ isouter=True,
+ )
+ join_condition &= join_target.c.dag_run_execution_date == to_migrate.c.execution_date Which makes it even less readable, but the other option is that we have to have a lot of if/else and cases for Xcom pre 2.2, Xcom post 2.2, TI, etc. |
@uranusjr Any further thoughts on this? Should we go with this approach or stick with the string-based we have now? |
As long as someone figures out the internals (you do), this is probably better than string-formatting, so +1 to moving this forward. |
966a96d
to
94b4ee8
Compare
I've rebased this, and I'll need to re-run these checks still work as it's been a while. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
6a444d5
to
9be6974
Compare
This is a prepaoratory refactor to have the move dangling rows pre-upgrade check make better use of the SQLA Queries -- this is needed because in a future PR we will add a check for dangling XCom rows, and that will need to conditionally join against DagRun to get execution_date (depending on if it is run pre- or post-2.2). This has been tested with Postgres 9.6, SQLite, MSSQL 2017 and MySQL 5.7 codespell didn't like `froms` as it thinks it is a typo of forms, and most other cases it would be, except here. Codespell doesn't currently have a method of ignoring a _single_ line without ignoring the word everywhere (which we don't want to do) so I have to ignore the exact _line_. Sad panda
9be6974
to
082e67e
Compare
Tested this, all still working. |
Damn. Good catch TP. |
This is a prepaorator refactor to have the move dangling rows
pre-upgrade check make better use of the SQLA Queries -- this is needed
because in a future PR we will add a check for dangling XCom rows, and
that will need to conditionally join against DagRun to get
execution_date (depending on if it is run pre- or post-2.2).
This has been tested with Postgres 9.6, SQLite, MSSQL 2017 and MySQL 5.7
codespell didn't like
froms
as it thinks it is a typo of forms, andmost other cases it would be, except here. Codespell doesn't currently
have a method of ignoring a single line without ignoring the word
everywhere (which we don't want to do) so I have to ignore the exact
line. Sad panda
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.