Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add map_index to XCom model and interface #22112

Merged
merged 2 commits into from
Mar 11, 2022
Merged

Conversation

uranusjr
Copy link
Member

@uranusjr uranusjr commented Mar 9, 2022

This adds an additional (optional) map_index argument to XCom's get/set/clear interface so mapped task instances can push to the correct entries, and have them pulled correctly by a downstream.

To make the XCom interface easier to use for common scenarios, a convenience method get_value is added to take a TaskInstanceKey that automatically performs argument unpacking and call get_one underneath. This is not done as a get_one overload to simplify the implementation and typing.

@boring-cyborg boring-cyborg bot added area:core-operators Operators, Sensors and hooks within Core Airflow area:providers area:serialization area:webserver Webserver related Issues kind:documentation provider:amazon-aws AWS/Amazon - related issues provider:microsoft-azure Azure-related issues provider:google Google (including GCP) related issues labels Mar 9, 2022
@uranusjr uranusjr marked this pull request as draft March 9, 2022 10:17
@uranusjr
Copy link
Member Author

uranusjr commented Mar 9, 2022

Wow this conflicts very fast.

@uranusjr uranusjr force-pushed the xcom-map-index-2 branch 4 times, most recently from 33153e1 to e494600 Compare March 9, 2022 15:56
@uranusjr uranusjr marked this pull request as ready for review March 9, 2022 15:56
@uranusjr uranusjr requested a review from dstandish March 10, 2022 07:50
@@ -87,7 +88,7 @@ class BaseXCom(Base, LoggingMixin):
# but it goes over MySQL's index length limit. So we instead create indexes
# separately, and enforce uniqueness with DagRun.id instead.
Index("idx_xcom_key", key),
Index("idx_xcom_ti_id", dag_id, task_id, run_id),
Index("idx_xcom_ti_id", dag_id, task_id, run_id, map_index),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column order here doesn't match the migrations -- this matters from one of our DBs (sqlite? mssql?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we have an FK to TaskInstance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There wasn’t a foreign key before. I think previously (when XCom was based on execution date) it was possible to push an XCom to a future date, so a foreign key did not make sense back then; now it’s based on run ID perhaps it makes sense to have a ti-xcom relation, but that should be a separate discussion regardless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was possible to push an XCom to a future date

Already done https://lists.apache.org/thread/gofj3g6m6vvksy6n0cmgq1qxd309bbbl (I don't think it ever actually worked.)

airflow/models/xcom.py Outdated Show resolved Hide resolved
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherise.

@ashb ashb removed area:webserver Webserver related Issues kind:documentation labels Mar 10, 2022
@uranusjr uranusjr force-pushed the xcom-map-index-2 branch 2 times, most recently from bdee31a to 3f1c4b6 Compare March 10, 2022 13:43
@uranusjr uranusjr requested a review from ashb March 10, 2022 13:43
@dstandish
Copy link
Contributor

heads up @uranusjr that migration has been renamed to 0102_c306b5b5ae4a_switch_xcom_table_to_use_run_id.py in main

This is not actually stored correctly yet. We still need to fix the XCom
interface.
This adds an additional (optional) map_index argument to XCom's
get/set/clear interface so mapped task instances can push to the
correct entries, and have them pulled correctly by a downstream.

To make the XCom interface easier to use for common scenarios, a
convenience method get_value is added to take a TaskInstanceKey that
automatically performs argument unpacking and call get_one underneath.
This is not done as a get_one overload to simplify the implementation
and typing.
@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Mar 11, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@ashb ashb merged commit d08284e into apache:main Mar 11, 2022
@ashb ashb deleted the xcom-map-index-2 branch March 11, 2022 14:25
@potiuk
Copy link
Member

potiuk commented Mar 14, 2022

Hello @uranusjr @ashb - while preparaing the "Throve classifier" release for Providers i noticed that backwards-incompatible change sneaked in for a number of providers here.

The get_value method of XCom is only added in 2.3.0, yet a number of providers have been switched to use it in some places in this change. I think we we should implement a backwards-compatibility here and add protection that this new XCom method iis not used accidentally by the providers.

@uranusjr
Copy link
Member Author

Those get_value() usages should only happen in if ti_key is not None blocks. The ti_key argument is only passed by Airflow 2.3+, so this check acts as feature detection to maintain backward compatibility. Please do tell me if you spot any usages that are not in such blocks.

@potiuk
Copy link
Member

potiuk commented Mar 14, 2022

Those get_value() usages should only happen in if ti_key is not None blocks. The ti_key argument is only passed by Airflow 2.3+, so this check acts as feature detection to maintain backward compatibility. Please do tell me if you spot any usages that are not in such blocks.

Ah ok. Just thinking how to automate the check in this case so that it is not used accidentally. Let me take a look maybe I will come up with something. I do not want to spot it myself, our backwards compatibility checks should do it automatically for us I think. Same way as we check if "imports" are working.

@potiuk
Copy link
Member

potiuk commented Mar 14, 2022

Added PR for that @uranusjr #22244

@potiuk
Copy link
Member

potiuk commented Mar 14, 2022

(BTW. there were no mis-uses, but I standardized the use of if not None

@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Apr 11, 2022
@jedcunningham jedcunningham added this to the Airflow 2.3.0 milestone Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow area:dynamic-task-mapping AIP-42 area:providers area:serialization changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge provider:amazon-aws AWS/Amazon - related issues provider:google Google (including GCP) related issues provider:microsoft-azure Azure-related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants