-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filter by state in DagRun REST API (List Dag Runs) #20485
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test to filter by none
state? I believe this is the state that causes the most problems (because the internal representation is not str). And there is actually an util function somewhere (for task instances? @ephraimbuddy may remember better) for this problem.
result = session.query(DagRun).all() | ||
assert len(result) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result = session.query(DagRun).all() | |
assert len(result) == 2 | |
assert session.query(DagRun).count() == 2 |
if state: | ||
query = query.filter(DagRun.state == state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if state: | |
query = query.filter(DagRun.state == state) | |
if state: | |
query = query.filter(DagRun.state.in_(state)) |
The FilterState
query component is an array.
I think we are fine because we are dealing with dagruns which has only 3 states |
@uranusjr @ephraimbuddy thank you very much for your recommendations, I just made them in a new commit. Finally I did not add a test to filter by state None as @ephraimbuddy mentioned. Thanks |
self._create_test_dag_run() | ||
assert session.query(DagRun).count() == 2 | ||
response = self.client.get( | ||
"api/v1/dags/TEST_DAG_ID/dagRuns?state=running,failed", environ_overrides={'REMOTE_USER': "test"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The failed state is not tested here. Can we fail one dagrun or set the state to queued and adjust the test to match
@ephraimbuddy @uranusjr also I got a failed result on static checks with this details but I do not understand:
Indeed I did not touch any hook in this PR, could you please help me to understand this issue? Thanks! |
You have trailing whitespaces in your code. |
@uranusjr, @ephraimbuddy all your guidelines were done. Is everything ok now or should I do something else? Thanks |
Hi @mik-laj, looks like this PR is pending of your review. Is there anything else I should check/change? This is my first PR so I am a little bit lost. Thank you for your help!! |
@@ -149,6 +149,7 @@ def get_dag_runs( | |||
execution_date_lte: Optional[str] = None, | |||
end_date_gte: Optional[str] = None, | |||
end_date_lte: Optional[str] = None, | |||
state: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state: Optional[str] = None, | |
states: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have state
in open API doc instead of states
, I don't think it'll work to change it to states
if state: | ||
query = query.filter(DagRun.state.in_(state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if state: | |
query = query.filter(DagRun.state.in_(state)) | |
if states: | |
query = query.filter(DagRun.state.in_(states)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kaxil, thanks for your recommendations. I will change the datatype to Optional[List[str]], however I am not sure it is a good idea to change to plural as the FilterState query component is defined in singular in the specification v1.yml, even being an array:
FilterState:
in: query
**name: state**
schema:
**type: array**
items:
type: string
required: false
description:
The value can be repeated to retrieve multiple matching values (OR condition).
To change that to plural I would need to create a new FilterStates query component....
On the other hand in the batch method I did use plural to match with the other existing fields (for instance dag_ids):
{
"order_by": "string",
"page_offset": 0,
"page_limit": 100,
"**dag_ids**": [
"string"
],
...
"states": [
"string"
]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kaxil, I just did the change on the datatype to Optional[List[str]] as you mentioned but now when I try to run the static ckecks I am getting hundreds of errors (mypy). Is this normal?, what am I doing wrong? Maybe it is nothing to do with my change...? Thanks so much for any help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as CI passes it should be OK (not optimal, but OK if you don’t want to fix those).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm, 2 minor suggestions
if data.get("states"): | ||
states = set(data["states"]) | ||
query = query.filter(DagRun.state.in_(states)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if data.get("states"): | |
states = set(data["states"]) | |
query = query.filter(DagRun.state.in_(states)) | |
states = data.get("states") | |
if states: | |
query = query.filter(DagRun.state.in_(states)) |
Python variables are function-scoped, not block-scoped, so it’s not useful to put the variable inside the if
block. And moving the variable out avoids one unnecessary member access overhead.
One minor change, otherwise lgtm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid the duplication of the create dagruns method
with create_session() as session: | ||
session.add_all(dag_runs) | ||
session.add_all(dags) | ||
return dag_runs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this duplication, we can use _create_test_dag_run
instead. I understand you want to limit it to 2 dagruns but that's the essence of the PR, we can filter down to 2. _create_test_dag_run
takes state
argument. You can call it twice or more with different states thereby creating more dagruns and then test the added filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ephraimbuddy , If we follow that approach we get these errors:
E MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'TEST_DAG_ID-2020-06-11 18:00:00.000000' for key 'dag_run_dag_id_execution_date_key'")
The problem is that the method _create_test_dag_run uses the self.default_time and self.default_time_2 as execution time statically. So when we call the method twice, we have a violation of unique index.
To make this dinamically we should change also the way of assigning the execution dates...
How do you advice to proceed?
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about changing the method _create_test_dag_run to avoid that, if it's possible let's do it instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @ephraimbuddy, let's try that. Thanks! I will try to minimize changes on the other parts of the code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dear @ephraimbuddy, I just made a new commit with the changes. Please let me know if you agree with them. Thanks so much.
@@ -307,6 +336,41 @@ def test_should_respond_200(self, session): | |||
"total_entries": 2, | |||
} | |||
|
|||
def test_filter_by_state(self, session): | |||
self._create_test_dag_run_with_queued() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._create_test_dag_run_with_queued() | |
self._create_test_dag_run() | |
self._create_test_dag_run(state='queued') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ephraimbuddy, understood this but we need to clarify the previous point before going further with this. Thx.
"api/v1/dags/TEST_DAG_ID/dagRuns?state=running,queued", environ_overrides={'REMOTE_USER': "test"} | ||
) | ||
assert response.status_code == 200 | ||
assert response.json == { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may not need to check and list every fields. You can check important things like total returned, and states of the items returned
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ephraimbuddy, OK understood. I can change that according to recommendation. Thanks.
…s recomendation with testing queued
…s recomendation with testing queued and removing trailing whitespaces
…s recomendation with testing queued and removing trailing whitespaces
…s recomendation List Optional
…s recomendation of refactoring _create_test_dag_run method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
Awesome work, congrats on your first merged pull request! |
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.