From 6b6129b91b5eabe875833a60476455c6505aa942 Mon Sep 17 00:00:00 2001 From: Chethan UK Date: Sun, 5 Jun 2022 00:55:46 +0100 Subject: [PATCH] AIP-47 - Migrate kylin DAGs to new design #22439 --- .../apache/kylin/example_dags/__init__.py | 16 --- .../kylin/example_dags/example_kylin_dag.py | 114 ----------------- .../index.rst | 2 +- .../apache/kylin/example_kylin_dag.py | 120 ++++++++++++++++++ 4 files changed, 121 insertions(+), 131 deletions(-) delete mode 100644 airflow/providers/apache/kylin/example_dags/__init__.py delete mode 100644 airflow/providers/apache/kylin/example_dags/example_kylin_dag.py create mode 100644 tests/system/providers/apache/kylin/example_kylin_dag.py diff --git a/airflow/providers/apache/kylin/example_dags/__init__.py b/airflow/providers/apache/kylin/example_dags/__init__.py deleted file mode 100644 index 13a83393a9124..0000000000000 --- a/airflow/providers/apache/kylin/example_dags/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py deleted file mode 100644 index 0d68b36d6534e..0000000000000 --- a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -""" -This is an example DAG which uses the KylinCubeOperator. -The tasks below include kylin build, refresh, merge operation. -""" -from datetime import datetime - -from airflow import DAG -from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator - -dag = DAG( - dag_id='example_kylin_operator', - schedule_interval=None, - start_date=datetime(2021, 1, 1), - catchup=False, - default_args={'project': 'learn_kylin', 'cube': 'kylin_sales_cube'}, - tags=['example'], -) - - -@dag.task -def gen_build_time(): - """ - Gen build time and push to XCom (with key of "return_value") - :return: A dict with build time values. - """ - return {'date_start': '1325347200000', 'date_end': '1325433600000'} - - -gen_build_time_task = gen_build_time() -gen_build_time_output_date_start = gen_build_time_task['date_start'] -gen_build_time_output_date_end = gen_build_time_task['date_end'] - -build_task1 = KylinCubeOperator( - task_id="kylin_build_1", - command='build', - start_time=gen_build_time_output_date_start, - end_time=gen_build_time_output_date_end, - is_track_job=True, - dag=dag, -) - -build_task2 = KylinCubeOperator( - task_id="kylin_build_2", - command='build', - start_time=gen_build_time_output_date_end, - end_time='1325520000000', - is_track_job=True, - dag=dag, -) - -refresh_task1 = KylinCubeOperator( - task_id="kylin_refresh_1", - command='refresh', - start_time=gen_build_time_output_date_start, - end_time=gen_build_time_output_date_end, - is_track_job=True, - dag=dag, -) - -merge_task = KylinCubeOperator( - task_id="kylin_merge", - command='merge', - start_time=gen_build_time_output_date_start, - end_time='1325520000000', - is_track_job=True, - dag=dag, -) - -disable_task = KylinCubeOperator( - task_id="kylin_disable", - command='disable', - dag=dag, -) - -purge_task = KylinCubeOperator( - task_id="kylin_purge", - command='purge', - dag=dag, -) - -build_task3 = KylinCubeOperator( - task_id="kylin_build_3", - command='build', - start_time=gen_build_time_output_date_end, - end_time='1328730000000', - dag=dag, -) - -build_task1 >> build_task2 >> refresh_task1 >> merge_task >> disable_task >> purge_task >> build_task3 - -# Task dependency created via `XComArgs`: -# gen_build_time >> build_task1 -# gen_build_time >> build_task2 -# gen_build_time >> refresh_task1 -# gen_build_time >> merge_task -# gen_build_time >> build_task3 diff --git a/docs/apache-airflow-providers-apache-kylin/index.rst b/docs/apache-airflow-providers-apache-kylin/index.rst index 02fb13b255947..e5b0d53bd5050 100644 --- a/docs/apache-airflow-providers-apache-kylin/index.rst +++ b/docs/apache-airflow-providers-apache-kylin/index.rst @@ -32,7 +32,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/tests/system/providers/apache/kylin/example_kylin_dag.py b/tests/system/providers/apache/kylin/example_kylin_dag.py new file mode 100644 index 0000000000000..9b562b039b5ca --- /dev/null +++ b/tests/system/providers/apache/kylin/example_kylin_dag.py @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +This is an example DAG which uses the KylinCubeOperator. +The tasks below include kylin build, refresh, merge operation. +""" + +import os +from datetime import datetime + +from airflow import DAG +from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_kylin_operator" + +with DAG( + dag_id=DAG_ID, + schedule_interval=None, + start_date=datetime(2021, 1, 1), + catchup=False, + default_args={'project': 'learn_kylin', 'cube': 'kylin_sales_cube'}, + tags=['example'], +) as dag: + + @dag.task + def gen_build_time(): + """ + Gen build time and push to XCom (with key of "return_value") + :return: A dict with build time values. + """ + return {'date_start': '1325347200000', 'date_end': '1325433600000'} + + gen_build_time_task = gen_build_time() + gen_build_time_output_date_start = gen_build_time_task['date_start'] + gen_build_time_output_date_end = gen_build_time_task['date_end'] + + build_task1 = KylinCubeOperator( + task_id="kylin_build_1", + command='build', + start_time=gen_build_time_output_date_start, + end_time=gen_build_time_output_date_end, + is_track_job=True, + ) + + build_task2 = KylinCubeOperator( + task_id="kylin_build_2", + command='build', + start_time=gen_build_time_output_date_end, + end_time='1325520000000', + is_track_job=True, + ) + + refresh_task1 = KylinCubeOperator( + task_id="kylin_refresh_1", + command='refresh', + start_time=gen_build_time_output_date_start, + end_time=gen_build_time_output_date_end, + is_track_job=True, + ) + + merge_task = KylinCubeOperator( + task_id="kylin_merge", + command='merge', + start_time=gen_build_time_output_date_start, + end_time='1325520000000', + is_track_job=True, + ) + + disable_task = KylinCubeOperator( + task_id="kylin_disable", + command='disable', + ) + + purge_task = KylinCubeOperator( + task_id="kylin_purge", + command='purge', + ) + + build_task3 = KylinCubeOperator( + task_id="kylin_build_3", + command='build', + start_time=gen_build_time_output_date_end, + end_time='1328730000000', + ) + + build_task1 >> build_task2 >> refresh_task1 >> merge_task >> disable_task >> purge_task >> build_task3 + + # Task dependency created via `XComArgs`: + # gen_build_time >> build_task1 + # gen_build_time >> build_task2 + # gen_build_time >> refresh_task1 + # gen_build_time >> merge_task + # gen_build_time >> build_task3 + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)