-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ingestion/airflow-plugin): airflow remove old tasks #10485
fix(ingestion/airflow-plugin): airflow remove old tasks #10485
Conversation
ebd7762
to
49c689c
Compare
logger.debug("Initiating the cleanup of obsselete data from datahub") | ||
|
||
ingested_dataflow_urns = list( | ||
self.graph.get_urns_by_filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should filter for cluster
as well; otherwise if user has multiple Airflow instance you will delete dags which you shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am filtering the entire URN which is already having the cluster
i.e. urn:li:dataFlow:(airflow,simple_dag,prod)
So, still we need to match/filter cluster
explicitly? or my understanding is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you check here we use cluster or env to generate the DataFlow Urns, so it is part of the urn. ->
self.urn = DataFlowUrn.create_from_ids( |
This means if the env or cluster is set and has multiple Airflow environments like DEV
and PROD
, then your query will return the urns for both PROD and DEV, which we don't want in this case as these are different Airflow environment.
You should add cluster/env as a filter parameter.
airflow_job_urns: List = [] | ||
|
||
for dag in all_airflow_dags: | ||
flow_urn = builder.make_data_flow_urn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cluster should be passed in if exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as other comment
logger.debug("Initiating the cleanup of obsselete data from datahub") | ||
|
||
ingested_dataflow_urns = list( | ||
self.graph.get_urns_by_filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you check here we use cluster or env to generate the DataFlow Urns, so it is part of the urn. ->
self.urn = DataFlowUrn.create_from_ids( |
This means if the env or cluster is set and has multiple Airflow environments like DEV
and PROD
, then your query will return the urns for both PROD and DEV, which we don't want in this case as these are different Airflow environment.
You should add cluster/env as a filter parameter.
…e tasks and pipelines based on the cluster
Checklist