diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 9d0f0a9dfb824..ba9d20fcb5a2d 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -719,6 +719,10 @@ transfers: target-integration-name: Google Spreadsheet how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gcs_to_sheets.rst python-module: airflow.providers.google.suite.transfers.gcs_to_sheets + - source-integration-name: SQL + target-integration-name: Google Spreadsheet + how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/sql_to_sheets.rst + python-module: airflow.providers.google.suite.transfers.sql_to_sheets - source-integration-name: Local target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/local_to_gcs.rst diff --git a/airflow/providers/google/suite/example_dags/example_sql_to_sheets.py b/airflow/providers/google/suite/example_dags/example_sql_to_sheets.py new file mode 100644 index 0000000000000..b1e5b97bfe93f --- /dev/null +++ b/airflow/providers/google/suite/example_dags/example_sql_to_sheets.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import models +from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator +from airflow.utils.dates import days_ago + +SQL = "select 1 as my_col" +NEW_SPREADSHEET_ID = "123" + +with models.DAG( + "example_sql_to_sheets", + start_date=days_ago(1), + schedule_interval=None, # Override to match your needs + tags=["example"], +) as dag: + + # [START upload_sql_to_sheets] + upload_gcs_to_sheet = SQLToGoogleSheetsOperator( + task_id="upload_sql_to_sheet", + sql=SQL, + sql_conn_id="database_conn_id", + spreadsheet_id=NEW_SPREADSHEET_ID, + ) + # [END upload_sql_to_sheets] diff --git a/airflow/providers/google/suite/transfers/sql_to_sheets.py b/airflow/providers/google/suite/transfers/sql_to_sheets.py new file mode 100644 index 0000000000000..9838749a5b34c --- /dev/null +++ b/airflow/providers/google/suite/transfers/sql_to_sheets.py @@ -0,0 +1,138 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import datetime +import numbers +from contextlib import closing +from typing import Any, Iterable, Mapping, Optional, Sequence, Union + +from airflow.operators.sql import BaseSQLOperator +from airflow.providers.google.suite.hooks.sheets import GSheetsHook + + +class SQLToGoogleSheetsOperator(BaseSQLOperator): + """ + Copy data from SQL results to provided Google Spreadsheet. + + :param sql: The SQL to execute. + :type sql: str + :param spreadsheet_id: The Google Sheet ID to interact with. + :type spreadsheet_id: str + :param conn_id: the connection ID used to connect to the database. + :type sql_conn_id: str + :param parameters: The parameters to render the SQL query with. + :type parameters: dict or iterable + :param database: name of database which overwrite the defined one in connection + :type database: str + :param spreadsheet_range: The A1 notation of the values to retrieve. + :type spreadsheet_range: str + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate using domain-wide delegation of authority, + if any. For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :type impersonation_chain: Union[str, Sequence[str]] + """ + + template_fields = ( + "sql", + "spreadsheet_id", + "spreadsheet_range", + "impersonation_chain", + ) + + template_fields_renderers = {"sql": "sql"} + template_ext = (".sql",) + + ui_color = "#a0e08c" + + def __init__( + self, + *, + sql: str, + spreadsheet_id: str, + sql_conn_id: str, + parameters: Optional[Union[Mapping, Iterable]] = None, + database: str = None, + spreadsheet_range: str = "Sheet1", + gcp_conn_id: str = "google_cloud_default", + delegate_to: Optional[str] = None, + impersonation_chain: Optional[Union[str, Sequence[str]]] = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.sql = sql + self.conn_id = sql_conn_id + self.database = database + self.parameters = parameters + self.gcp_conn_id = gcp_conn_id + self.spreadsheet_id = spreadsheet_id + self.spreadsheet_range = spreadsheet_range + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def _data_prep(self, data): + for row in data: + item_list = [] + for item in row: + if isinstance(item, (datetime.date, datetime.datetime)): + item = item.isoformat() + elif isinstance(item, int): # To exclude int from the number check. + pass + elif isinstance(item, numbers.Number): + item = float(item) + item_list.append(item) + yield item_list + + def _get_data(self): + hook = self.get_db_hook() + with closing(hook.get_conn()) as conn, closing(conn.cursor()) as cur: + self.log.info("Executing query") + cur.execute(self.sql, self.parameters or ()) + + yield [field[0] for field in cur.description] + yield from self._data_prep(cur.fetchall()) + + def execute(self, context: Any) -> None: + self.log.info("Getting data") + values = list(self._get_data()) + + self.log.info("Connecting to Google") + sheet_hook = GSheetsHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + impersonation_chain=self.impersonation_chain, + ) + + self.log.info(f"Uploading data to https://docs.google.com/spreadsheets/d/{self.spreadsheet_id}") + + sheet_hook.update_values( + spreadsheet_id=self.spreadsheet_id, + range_=self.spreadsheet_range, + values=values, + ) diff --git a/docs/apache-airflow-providers-google/operators/transfer/sql_to_sheets.rst b/docs/apache-airflow-providers-google/operators/transfer/sql_to_sheets.rst new file mode 100644 index 0000000000000..a9dbdf648d774 --- /dev/null +++ b/docs/apache-airflow-providers-google/operators/transfer/sql_to_sheets.rst @@ -0,0 +1,49 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +SQL to Google Sheets Transfer Operators +======================================================== + +With `Google Sheets `__, everyone can work together in the same +spreadsheet at the same time. Use formulas functions, and formatting options to save time and simplify +common spreadsheet tasks. + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include::/operators/_partials/prerequisite_tasks.rst + +.. _howto/operator:SQLToGoogleSheets: + +Upload data from SQL to Google Sheets +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To upload data from and Database using SQL to Google Spreadsheet you can use the +:class:`~airflow.providers.google.suite.transfers.sql_to_sheets.SQLToGoogleSheetsOperator`. + +.. exampleinclude:: /../../airflow/providers/google/suite/example_dags/example_sql_to_sheets.py + :language: python + :dedent: 4 + :start-after: [START upload_sql_to_sheets] + :end-before: [END upload_sql_to_sheets] + +You can use :ref:`Jinja templating ` with +:template-fields:`airflow.providers.google.suite.transfers.sql_to_sheets.SQLToGoogleSheetsOperator`. diff --git a/tests/providers/google/suite/transfers/test_sql_to_sheets.py b/tests/providers/google/suite/transfers/test_sql_to_sheets.py new file mode 100644 index 0000000000000..243805080c23b --- /dev/null +++ b/tests/providers/google/suite/transfers/test_sql_to_sheets.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from unittest.mock import Mock, patch + +from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator + + +class TestSQLToGoogleSheets(unittest.TestCase): + """ + Test class for SQLToGoogleSheetsOperator + """ + + def setUp(self): + """ + setup + """ + + self.gcp_conn_id = "test" + self.sql_conn_id = "test" + self.sql = "select 1 as my_col" + self.spreadsheet_id = "1234567890" + self.values = [[1, 2, 3]] + + @patch("airflow.providers.google.suite.transfers.sql_to_sheets.GSheetsHook") + def test_execute(self, mock_sheet_hook): + + op = SQLToGoogleSheetsOperator( + task_id="test_task", + spreadsheet_id=self.spreadsheet_id, + gcp_conn_id=self.gcp_conn_id, + sql_conn_id=self.sql_conn_id, + sql=self.sql, + ) + + op._get_data = Mock(return_value=self.values) + + op.execute(None) + + mock_sheet_hook.assert_called_once_with( + gcp_conn_id=self.gcp_conn_id, + delegate_to=None, + impersonation_chain=None, + ) + + mock_sheet_hook.return_value.update_values.assert_called_once_with( + spreadsheet_id=self.spreadsheet_id, + range_="Sheet1", + values=self.values, + )