diff --git a/airflow/providers/presto/CHANGELOG.rst b/airflow/providers/presto/CHANGELOG.rst index 1d7af0a87e110..125bf57f4b98e 100644 --- a/airflow/providers/presto/CHANGELOG.rst +++ b/airflow/providers/presto/CHANGELOG.rst @@ -30,6 +30,8 @@ Breaking changes Deprecated ``hql`` parameter has been removed in ``get_records``, ``get_first``, ``get_pandas_df`` and ``run`` methods of the ``PrestoHook``. +Remove ``PrestoToSlackOperator`` in favor of Slack provider ``SqlToSlackOperator``. + 3.1.0 ..... diff --git a/airflow/providers/presto/provider.yaml b/airflow/providers/presto/provider.yaml index 3294f1cc75737..b4279bef004a8 100644 --- a/airflow/providers/presto/provider.yaml +++ b/airflow/providers/presto/provider.yaml @@ -58,10 +58,6 @@ transfers: how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst python-module: airflow.providers.presto.transfers.gcs_to_presto - - source-integration-name: Presto - target-integration-name: Slack - how-to-guide: /docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst - python-module: airflow.providers.presto.transfers.presto_to_slack connection-types: - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py deleted file mode 100644 index bcf2c3177a6a5..0000000000000 --- a/airflow/providers/presto/transfers/presto_to_slack.py +++ /dev/null @@ -1,96 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -import warnings -from typing import Iterable, Mapping, Optional, Sequence, Union - -from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator - - -class PrestoToSlackOperator(SqlToSlackOperator): - """ - Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are - rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{ - results_df }}'. The 'results_df' variable name can be changed by specifying a different - 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to - allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df | - tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:PrestoToSlackOperator` - - :param sql: The SQL statement to execute on Presto (templated) - :param slack_message: The templated Slack message to send with the data returned from Presto. - You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the - SQL results - :param presto_conn_id: destination presto connection - :param slack_conn_id: The connection id for Slack - :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df' - :param parameters: The parameters to pass to the SQL query - :param slack_token: The token to use to authenticate to Slack. If this is not provided, the - 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id - :param slack_channel: The channel to send message. Override default from Slack connection. - """ - - template_fields: Sequence[str] = ('sql', 'slack_message', 'slack_channel') - template_ext: Sequence[str] = ('.sql', '.jinja', '.j2') - template_fields_renderers = {"sql": "sql", "slack_message": "jinja"} - times_rendered = 0 - - def __init__( - self, - *, - sql: str, - slack_message: str, - presto_conn_id: str = 'presto_default', - slack_conn_id: str = 'slack_default', - results_df_name: str = 'results_df', - parameters: Optional[Union[Iterable, Mapping]] = None, - slack_token: Optional[str] = None, - slack_channel: Optional[str] = None, - **kwargs, - ) -> None: - self.presto_conn_id = presto_conn_id - self.sql = sql - self.parameters = parameters - self.slack_conn_id = slack_conn_id - self.slack_token = slack_token - self.slack_message = slack_message - self.results_df_name = results_df_name - self.slack_channel = slack_channel - - warnings.warn( - """ - PrestoToSlackOperator is deprecated. - Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`. - """, - DeprecationWarning, - stacklevel=2, - ) - - super().__init__( - sql=self.sql, - sql_conn_id=self.presto_conn_id, - slack_conn_id=self.slack_conn_id, - slack_webhook_token=self.slack_token, - slack_message=self.slack_message, - slack_channel=self.slack_channel, - results_df_name=self.results_df_name, - parameters=self.parameters, - **kwargs, - ) diff --git a/docs/apache-airflow-providers-presto/index.rst b/docs/apache-airflow-providers-presto/index.rst index f1f6188b6a8b2..33b748bf4c4fc 100644 --- a/docs/apache-airflow-providers-presto/index.rst +++ b/docs/apache-airflow-providers-presto/index.rst @@ -28,12 +28,6 @@ Content PrestoTransferOperator types -.. toctree:: - :maxdepth: 1 - :caption: Guides - - PrestoToSlackOperator types - .. toctree:: :maxdepth: 1 :caption: References diff --git a/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst b/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst deleted file mode 100644 index 5dded6bd0e9ac..0000000000000 --- a/docs/apache-airflow-providers-presto/operators/transfer/presto_to_slack.rst +++ /dev/null @@ -1,38 +0,0 @@ - .. Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - .. http://www.apache.org/licenses/LICENSE-2.0 - - .. Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - -.. _howto/operator:PrestoToSlackOperator: - -PrestoToSlackOperator -======================== - -Use the :class:`~airflow.providers.presto.transfers.presto_to_slack.presto_to_slack` to post messages to predefined Slack -channels. - -Using the Operator -^^^^^^^^^^^^^^^^^^ - -This operator will execute a custom query in Presto and publish a Slack message that can be formatted -and contain the resulting dataset (e.g. ASCII formatted dataframe). - -An example usage of the PrestoToSlackOperator is as follows: - -.. exampleinclude:: /../../tests/system/providers/presto/example_presto_to_slack.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_presto_to_slack] - :end-before: [END howto_operator_presto_to_slack] diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index bf1b18a598b4a..dd06841df925c 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -570,8 +570,7 @@ ], "cross-providers-deps": [ "common.sql", - "google", - "slack" + "google" ] }, "qubole": { diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py deleted file mode 100644 index bcc5e82a8a9d2..0000000000000 --- a/tests/providers/presto/transfers/test_presto_to_slack.py +++ /dev/null @@ -1,105 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from unittest import mock - -from airflow.models import DAG -from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator -from airflow.utils import timezone -from tests.test_utils.db import clear_db_runs - -TEST_DAG_ID = 'presto_to_slack_unit_test' -DEFAULT_DATE = timezone.datetime(2022, 1, 1) - - -class TestPrestoToSlackOperator: - def setup_class(self): - clear_db_runs() - - def setup_method(self): - self.example_dag = DAG('unit_test_dag_presto_to_slack', start_date=DEFAULT_DATE) - - def teardown_method(self): - clear_db_runs() - - @staticmethod - def _construct_operator(**kwargs): - operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs) - return operator - - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') - def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class): - operator_args = { - 'presto_conn_id': 'presto_connection', - 'slack_conn_id': 'slack_connection', - 'sql': "sql {{ ds }}", - 'results_df_name': 'xxxx', - 'parameters': ['1', '2', '3'], - 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_channel': 'my_channel', - 'dag': self.example_dag, - } - presto_to_slack_operator = self._construct_operator(**operator_args) - mock_dbapi_hook = mock.Mock() - presto_to_slack_operator._get_hook = mock_dbapi_hook - - get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df - get_pandas_df_mock.return_value = '1234' - - slack_webhook_hook = mock_slack_hook_class.return_value - presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', - message='message: 2022-01-01, 1234', - channel='my_channel', - webhook_token=None, - ) - - slack_webhook_hook.execute.assert_called_once() - - @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook') - def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_class): - operator_args = { - 'presto_conn_id': 'presto_connection', - 'slack_conn_id': 'slack_connection', - 'slack_token': 'test_token', - 'sql': "sql {{ ds }}", - 'results_df_name': 'xxxx', - 'parameters': ['1', '2', '3'], - 'slack_message': 'message: {{ ds }}, {{ xxxx }}', - 'slack_channel': 'my_channel', - 'dag': self.example_dag, - } - presto_to_slack_operator = self._construct_operator(**operator_args) - mock_dbapi_hook = mock.Mock() - presto_to_slack_operator._get_hook = mock_dbapi_hook - - get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df - get_pandas_df_mock.return_value = '1234' - - slack_webhook_hook = mock_slack_hook_class.return_value - presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - - mock_slack_hook_class.assert_called_once_with( - http_conn_id='slack_connection', - message='message: 2022-01-01, 1234', - channel='my_channel', - webhook_token='test_token', - ) - - slack_webhook_hook.execute.assert_called_once() diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py deleted file mode 100644 index dc87c831b4983..0000000000000 --- a/tests/system/providers/presto/example_presto_to_slack.py +++ /dev/null @@ -1,54 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -Example DAG using PrestoToSlackOperator. -""" - -import os -from datetime import datetime - -from airflow import models -from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator - -PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table") -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -DAG_ID = "example_presto_to_slack" -SLACK_CONN_WEBHOOK = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' - -with models.DAG( - dag_id=DAG_ID, - schedule_interval='@once', # Override to match your needs - start_date=datetime(2022, 1, 1), - catchup=False, - tags=["example"], -) as dag: - # [START howto_operator_presto_to_slack] - PrestoToSlackOperator( - task_id="presto_to_slack", - slack_token=SLACK_CONN_WEBHOOK, - sql=f"SELECT col FROM {PRESTO_TABLE}", - slack_channel="my_channel", - slack_message="message: {{ ds }}, {{ results_df }}", - ) - # [END howto_operator_presto_to_slack] - - -from tests.system.utils import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag)