-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-9300] Add DatafusionPipelineStateSensor and aync option to the CloudDataFusionStartPipelineOperator #17787
Conversation
eec3e6b
to
72175e6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to add howto explaining it and better approach for the DAG structure used.
instance_name=INSTANCE_NAME, | ||
location=LOCATION, | ||
) | ||
# [END howto_cloud_data_fusion_start_pipeline_async] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those START/END markers are here for a reason - they are used as entries in https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst to automatically extract pieces of the example dag to the "Howto" documentation. The Howto documentation needs to get now a separate sync/async sections.
@@ -234,6 +254,14 @@ | |||
|
|||
create_instance >> get_instance >> restart_instance >> update_instance >> sleep | |||
sleep >> create_pipeline >> list_pipelines >> start_pipeline >> stop_pipeline >> delete_pipeline | |||
( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no repeated create/start/stop/delete here with dependencies between them.
It is quite confusing what the example is trying to do and how it will behave and how the DAG will look like.
This dag structure below seems to repeat the same dependencies again for sleep --> create, stop --> delete. and this is totally needless . Plus I am not sure what happens if the async and sync will work in parallel for the same pipeline at the same time (which looks like is going to happen here).
Sounds like recipe for disaster trying to run both sync and async run for the same pipeline at the same time (but maybe it will work who knows - i do not know DataFusion that well:D ).
In order to avoid confusion about the dag definition, this should look more like:
sleep >> create_pipeline >> list_pipelines >> start_pipeline_async >> start_pipeline_sensor >> start_pipeline >> stop_pipeline >> delete_pipeline
or if you really want (and can) run sync/async pipelines in parallel,something like that:
sleep >> create_pipeline >> list_pipelines >> start_pipeline >> stop_pipeline >> delete_pipeline
list_pipelines >> start_pipeline_async >> start_pipeline_sensor >> stop_pipeline
This shows much better the intentions , and allows to run first synchronous version of the pipeline and then asynchronous one.
Or maybe even you should separate it out and prepare a separate "example_datafusion_async.py" - as I am not sure if the same pipeline can be run twice in succession (probably yes and if that's the case then single example with both sync and async version is fine).
…he CloudDataFusionStartPipelineOperator
72175e6
to
d0f1f5f
Compare
@potiuk Thank you for review, I updated my changes. |
We recently fixed provider.yaml validation and PR apache#17787 added a new Data Fusion integration without flagging this as error because it was based on earlier version of the change without the fix to static checks. This PR adds the missing integration
We recently fixed provider.yaml validation and PR #17787 added a new Data Fusion integration without flagging this as error because it was based on earlier version of the change without the fix to static checks. This PR adds the missing integration
I added new sensor for the Google Data Fusion service, and I added the possibility to run the
CloudDataFusionStartPipelineOperator
in the async way.related: #9300
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.