From 6d1d660fe9a2d80e412edb5636bdfd282a85e048 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Thu, 28 Apr 2022 11:25:03 +0530 Subject: [PATCH 1/2] Add is_mapped field to Task response. --- airflow/api_connexion/schemas/task_schema.py | 1 + .../endpoints/test_task_endpoint.py | 45 ++++++++++++++++++- .../api_connexion/schemas/test_task_schema.py | 2 + 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/airflow/api_connexion/schemas/task_schema.py b/airflow/api_connexion/schemas/task_schema.py index 600d28ac0d6c6..aa9a4703088b0 100644 --- a/airflow/api_connexion/schemas/task_schema.py +++ b/airflow/api_connexion/schemas/task_schema.py @@ -58,6 +58,7 @@ class TaskSchema(Schema): sub_dag = fields.Nested(DAGSchema, dump_only=True) downstream_task_ids = fields.List(fields.String(), dump_only=True) params = fields.Method('get_params', dump_only=True) + is_mapped = fields.Boolean(dump_only=True) def _get_class_reference(self, obj): result = ClassReferenceSchema().dump(obj) diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py index d61af25d52f1a..28326d3d71153 100644 --- a/tests/api_connexion/endpoints/test_task_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_endpoint.py @@ -52,8 +52,10 @@ def configured_app(minimal_app_for_api): class TestTaskEndpoint: dag_id = "test_dag" + mapped_dag_id = "test_mapped_task" task_id = "op1" task_id2 = 'op2' + mapped_task_id = "mapped_task" task1_start_date = datetime(2020, 6, 15) task2_start_date = datetime(2020, 6, 16) @@ -63,9 +65,12 @@ def setup_dag(self, configured_app): task1 = EmptyOperator(task_id=self.task_id, params={'foo': 'bar'}) task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date) + with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag: + mapped_task = EmptyOperator.partial(task_id=self.mapped_task_id).expand() # noqa + task1 >> task2 dag_bag = DagBag(os.devnull, include_examples=False) - dag_bag.dags = {dag.dag_id: dag} + dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag} configured_app.dag_bag = dag_bag # type:ignore @staticmethod @@ -120,6 +125,7 @@ def test_should_respond_200(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, } response = self.client.get( f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"} @@ -127,6 +133,40 @@ def test_should_respond_200(self): assert response.status_code == 200 assert response.json == expected + def test_mapped_task(self): + expected = { + "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"}, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "is_mapped": True, + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00+00:00", + "task_id": "mapped_task", + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + } + response = self.client.get( + f"/api/v1/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}", + environ_overrides={'REMOTE_USER': "test"}, + ) + assert response.status_code == 200 + assert response.json == expected + def test_should_respond_200_serialized(self): # Get the dag out of the dagbag before we patch it to an empty one @@ -170,6 +210,7 @@ def test_should_respond_200_serialized(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, } response = self.client.get( f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"} @@ -235,6 +276,7 @@ def test_should_respond_200(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, }, { "class_ref": { @@ -263,6 +305,7 @@ def test_should_respond_200(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, }, ], "total_entries": 2, diff --git a/tests/api_connexion/schemas/test_task_schema.py b/tests/api_connexion/schemas/test_task_schema.py index 6724ec7d0b73d..07f4b592dab02 100644 --- a/tests/api_connexion/schemas/test_task_schema.py +++ b/tests/api_connexion/schemas/test_task_schema.py @@ -56,6 +56,7 @@ def test_serialize(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, } assert expected == result @@ -101,6 +102,7 @@ def test_serialize(self): "ui_fgcolor": "#000", "wait_for_downstream": False, "weight_rule": "downstream", + "is_mapped": False, } ], "total_entries": 1, From 6285d7df58903179c14bf975c20f8ef0a8e61e54 Mon Sep 17 00:00:00 2001 From: Karthikeyan Singaravelan Date: Thu, 28 Apr 2022 16:03:08 +0530 Subject: [PATCH 2/2] Add is_mapped to schema file and add test for GetTasks. --- airflow/api_connexion/openapi/v1.yaml | 3 + .../endpoints/test_task_endpoint.py | 69 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index a0c3150be1edb..d19c11aeba94e 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -3013,6 +3013,9 @@ components: depends_on_past: type: boolean readOnly: true + is_mapped: + type: boolean + readOnly: true wait_for_downstream: type: boolean readOnly: true diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py index 28326d3d71153..9748305d8c0af 100644 --- a/tests/api_connexion/endpoints/test_task_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_endpoint.py @@ -55,6 +55,7 @@ class TestTaskEndpoint: mapped_dag_id = "test_mapped_task" task_id = "op1" task_id2 = 'op2' + task_id3 = "op3" mapped_task_id = "mapped_task" task1_start_date = datetime(2020, 6, 15) task2_start_date = datetime(2020, 6, 16) @@ -66,6 +67,7 @@ def setup_dag(self, configured_app): task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date) with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag: + task3 = EmptyOperator(task_id=self.task_id3) # noqa mapped_task = EmptyOperator.partial(task_id=self.mapped_task_id).expand() # noqa task1 >> task2 @@ -316,6 +318,73 @@ def test_should_respond_200(self): assert response.status_code == 200 assert response.json == expected + def test_get_tasks_mapped(self): + expected = { + "tasks": [ + { + "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"}, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "is_mapped": True, + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00+00:00", + "task_id": "mapped_task", + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + }, + { + "class_ref": { + "class_name": "EmptyOperator", + "module_path": "airflow.operators.empty", + }, + "depends_on_past": False, + "downstream_task_ids": [], + "end_date": None, + "execution_timeout": None, + "extra_links": [], + "owner": "airflow", + "params": {}, + "pool": "default_pool", + "pool_slots": 1.0, + "priority_weight": 1.0, + "queue": "default", + "retries": 0.0, + "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0}, + "retry_exponential_backoff": False, + "start_date": "2020-06-15T00:00:00+00:00", + "task_id": self.task_id3, + "template_fields": [], + "trigger_rule": "all_success", + "ui_color": "#e8f7e4", + "ui_fgcolor": "#000", + "wait_for_downstream": False, + "weight_rule": "downstream", + "is_mapped": False, + }, + ], + "total_entries": 2, + } + response = self.client.get( + f"/api/v1/dags/{self.mapped_dag_id}/tasks", environ_overrides={'REMOTE_USER': "test"} + ) + assert response.status_code == 200 + assert response.json == expected + def test_should_respond_200_ascending_order_by_start_date(self): response = self.client.get( f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",