From 367f94b3cbcfd48a3a1f8ce84c13fe5fcd41e08d Mon Sep 17 00:00:00 2001 From: Utkarsh Sharma Date: Tue, 9 Aug 2022 17:45:38 +0530 Subject: [PATCH] Document for append operator (#619) What is the current behavior? In the past, we had a tutorial which illustrated how to use each of our operators/decorators: https://github.com/astronomer/astro-sdk/blob/be6280df00ccff0d7a1c0dfb099b2065303dbe88/REFERENCE.md closes: #590 What is the new behavior? Have a reference page per operator/decorator similar to https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/ecs.html#howto-operator-ecsoperator In which we reference parts of an (automated tested) example DAG which illustrates the usage of that operator/decorator. Many of these use cases already exist in our example DAGs - we should reference them. Does this introduce a breaking change? Nope --- docs/astro/sql/operators/append.rst | 43 ++++++++++++++++ example_dags/example_append.py | 50 +++++++++++++++++++ ...ple_snowflake_partial_table_with_append.py | 2 + tests/test_example_dags.py | 1 + 4 files changed, 96 insertions(+) create mode 100644 docs/astro/sql/operators/append.rst create mode 100644 example_dags/example_append.py diff --git a/docs/astro/sql/operators/append.rst b/docs/astro/sql/operators/append.rst new file mode 100644 index 000000000..b109b57d5 --- /dev/null +++ b/docs/astro/sql/operators/append.rst @@ -0,0 +1,43 @@ +====================================== +append operator +====================================== + +.. _append_operator: + +When to use the ``append`` operator +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +We can use ``append`` operator when we want to append the source table to the target table. + +.. literalinclude:: ../../../../example_dags/example_append.py + :language: python + :start-after: [START append_example] + :end-before: [END append_example] + +When used without a columns parameter, AstroSDK assumes that both tables have the same schema. + +When tables have same schema +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +#. **Case 1:** When complete table needs to be merged. You can skip ``columns`` parameter. + .. literalinclude:: ../../../../example_dags/example_append.py + :language: python + :start-after: [START append_example] + :end-before: [END append_example] + +#. **Case 2:** When subset of columns needs to be merged to target table we pass ``List`` of cols in ``columns`` parameter. + .. literalinclude:: ../../../../example_dags/example_snowflake_partial_table_with_append.py + :language: python + :start-after: [START append_example_with_columns_list] + :end-before: [END append_example_with_columns_list] + +When table have different schema +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When tables have different schema, we can map different column names by passing a ``dict`` of *source cols to target cols*. + + .. literalinclude:: ../../../../example_dags/example_append.py + :language: python + :start-after: [START append_example_col_dict] + :end-before: [END append_example_col_dict] + +Conflicts +~~~~~~~~~ +``append operator`` doesn't handle the conflicts that may arise while appending data. If you want to handle those scenarios, you can use ``merge operator`` diff --git a/example_dags/example_append.py b/example_dags/example_append.py new file mode 100644 index 000000000..ffcccc028 --- /dev/null +++ b/example_dags/example_append.py @@ -0,0 +1,50 @@ +import pathlib +from datetime import datetime, timedelta + +from airflow.models import DAG + +from astro import sql as aql +from astro.files import File +from astro.sql.table import Table + +CWD = pathlib.Path(__file__).parent + +default_args = { + "owner": "airflow", + "retries": 1, + "retry_delay": 0, +} + +dag = DAG( + dag_id="example_append", + start_date=datetime(2019, 1, 1), + max_active_runs=3, + schedule_interval=timedelta(minutes=30), + default_args=default_args, +) + +DATA_DIR = str(CWD) + "/data/" + +with dag: + load_main = aql.load_file( + input_file=File(path=DATA_DIR + "homes.csv"), + output_table=Table(conn_id="postgres_conn"), + ) + load_append = aql.load_file( + input_file=File(path=DATA_DIR + "/homes2.csv"), + output_table=Table(conn_id="postgres_conn"), + ) + # [START append_example] + aql.append( + target_table=load_main, + source_table=load_append, + ) + # [END append_example] + + # [START append_example_col_dict] + aql.append( + target_table=load_main, source_table=load_append, columns={"beds": "baths"} + ) + # [END append_example_col_dict] + + aql.cleanup() diff --git a/example_dags/example_snowflake_partial_table_with_append.py b/example_dags/example_snowflake_partial_table_with_append.py index 38c32ac51..6030081ec 100644 --- a/example_dags/example_snowflake_partial_table_with_append.py +++ b/example_dags/example_snowflake_partial_table_with_append.py @@ -126,11 +126,13 @@ def example_snowflake_partial_table_with_append(): # Append transformed & filtered data to reporting table # Dependency is inferred by passing the previous `filtered_data` task to `append_table` param + # [START append_example_with_columns_list] record_results = append( source_table=filtered_data, target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID), columns=["sell", "list", "variable", "value"], ) + # [END append_example_with_columns_list] record_results.set_upstream(create_results_table) # We truncate this table only to avoid wasting Snowflake resources diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 4eb4caef2..79811d342 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -32,6 +32,7 @@ def session(): "example_snowflake_partial_table_with_append", "example_sqlite_load_transform", "example_dynamic_map_task", + "example_append", "example_load_file", ], )