diff --git a/airflow/providers/apache/spark/hooks/spark_sql.py b/airflow/providers/apache/spark/hooks/spark_sql.py index bb28e57850054..0fce68c3294af 100644 --- a/airflow/providers/apache/spark/hooks/spark_sql.py +++ b/airflow/providers/apache/spark/hooks/spark_sql.py @@ -83,9 +83,11 @@ def __init__( yarn_queue: Optional[str] = None, ) -> None: super().__init__() + options: Dict = {} + conn: Optional[Connection] = None try: - conn: "Optional[Connection]" = self.get_connection(conn_id) + conn = self.get_connection(conn_id) except AirflowNotFoundException: conn = None options: Dict = {} diff --git a/airflow/providers/asana/example_dags/example_asana.py b/airflow/providers/asana/example_dags/example_asana.py index 092a3b9ff37ae..5a4dbb61cbbd2 100644 --- a/airflow/providers/asana/example_dags/example_asana.py +++ b/airflow/providers/asana/example_dags/example_asana.py @@ -28,12 +28,12 @@ AsanaUpdateTaskOperator, ) -ASANA_TASK_TO_UPDATE = os.environ.get("ASANA_TASK_TO_UPDATE") -ASANA_TASK_TO_DELETE = os.environ.get("ASANA_TASK_TO_DELETE") +ASANA_TASK_TO_UPDATE = os.environ.get("ASANA_TASK_TO_UPDATE", "update_task") +ASANA_TASK_TO_DELETE = os.environ.get("ASANA_TASK_TO_DELETE", "delete_task") # This example assumes a default project ID has been specified in the connection. If you # provide a different id in ASANA_PROJECT_ID_OVERRIDE, it will override this default # project ID in the AsanaFindTaskOperator example below -ASANA_PROJECT_ID_OVERRIDE = os.environ.get("ASANA_PROJECT_ID_OVERRIDE") +ASANA_PROJECT_ID_OVERRIDE = os.environ.get("ASANA_PROJECT_ID_OVERRIDE", "test_project") # This connection should specify a personal access token and a default project ID CONN_ID = os.environ.get("ASANA_CONNECTION_ID") diff --git a/airflow/providers/asana/hooks/asana.py b/airflow/providers/asana/hooks/asana.py index 44367bd00bca6..5b97fd0b6f410 100644 --- a/airflow/providers/asana/hooks/asana.py +++ b/airflow/providers/asana/hooks/asana.py @@ -17,14 +17,15 @@ # under the License. """Connect to Asana.""" -from typing import Any, Dict +import sys +from typing import Any, Dict, Optional from asana import Client from asana.error import NotFoundError -try: +if sys.version_info >= (3, 8): from functools import cached_property -except ImportError: +else: from cached_property import cached_property from airflow.hooks.base import BaseHook @@ -84,7 +85,7 @@ def client(self) -> Client: return Client.access_token(self.connection.password) - def create_task(self, task_name: str, params: dict) -> dict: + def create_task(self, task_name: str, params: Optional[dict]) -> dict: """ Creates an Asana task. @@ -98,7 +99,7 @@ def create_task(self, task_name: str, params: dict) -> dict: response = self.client.tasks.create(params=merged_params) return response - def _merge_create_task_parameters(self, task_name: str, task_params: dict) -> dict: + def _merge_create_task_parameters(self, task_name: str, task_params: Optional[dict]) -> dict: """ Merge create_task parameters with default params from the connection. @@ -144,7 +145,7 @@ def delete_task(self, task_id: str) -> dict: self.log.info("Asana task %s not found for deletion.", task_id) return {} - def find_task(self, params: dict) -> list: + def find_task(self, params: Optional[dict]) -> list: """ Retrieves a list of Asana tasks that match search parameters. @@ -157,7 +158,7 @@ def find_task(self, params: dict) -> list: response = self.client.tasks.find_all(params=merged_params) return list(response) - def _merge_find_task_parameters(self, search_parameters: dict) -> dict: + def _merge_find_task_parameters(self, search_parameters: Optional[dict]) -> dict: """ Merge find_task parameters with default params from the connection. diff --git a/airflow/providers/jira/hooks/jira.py b/airflow/providers/jira/hooks/jira.py index 5d186b864682b..4f936fe5bba0f 100644 --- a/airflow/providers/jira/hooks/jira.py +++ b/airflow/providers/jira/hooks/jira.py @@ -42,7 +42,7 @@ def __init__(self, jira_conn_id: str = default_conn_name, proxies: Optional[Any] super().__init__() self.jira_conn_id = jira_conn_id self.proxies = proxies - self.client = None + self.client: Optional[JIRA] = None self.get_conn() def get_conn(self) -> JIRA: diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index b10809dbe1383..95abe67c77781 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -211,13 +211,13 @@ def get_iam_token(self, conn: Connection) -> Tuple[str, str, int]: token = aws_hook.conn.generate_db_auth_token(conn.host, port, conn.login) return login, token, port - def get_table_primary_key(self, table: str, schema: Optional[str] = "public") -> List[str]: + def get_table_primary_key(self, table: str, schema: Optional[str] = "public") -> Optional[List[str]]: """ Helper method that returns the table primary key :param table: Name of the target table :type table: str - :param table: Name of the target schema, public by default + :param schema: Name of the target schema, public by default :type table: str :return: Primary key columns list :rtype: List[str] diff --git a/airflow/providers/slack/operators/slack.py b/airflow/providers/slack/operators/slack.py index 706e69c98c3d2..07438451fed3c 100644 --- a/airflow/providers/slack/operators/slack.py +++ b/airflow/providers/slack/operators/slack.py @@ -201,9 +201,9 @@ def __init__( self, channel: str = '#general', initial_comment: str = 'No message has been set!', - filename: str = None, - filetype: str = None, - content: str = None, + filename: Optional[str] = None, + filetype: Optional[str] = None, + content: Optional[str] = None, **kwargs, ) -> None: self.method = 'files.upload' @@ -212,7 +212,7 @@ def __init__( self.filename = filename self.filetype = filetype self.content = content - self.file_params = {} + self.file_params: Dict = {} super().__init__(method=self.method, **kwargs) def execute(self, **kwargs): diff --git a/airflow/providers/telegram/hooks/telegram.py b/airflow/providers/telegram/hooks/telegram.py index a4e2c83ce51f0..5b143868726c0 100644 --- a/airflow/providers/telegram/hooks/telegram.py +++ b/airflow/providers/telegram/hooks/telegram.py @@ -79,7 +79,7 @@ def get_conn(self) -> telegram.bot.Bot: """ return telegram.bot.Bot(token=self.token) - def __get_token(self, token: Optional[str], telegram_conn_id: str) -> str: + def __get_token(self, token: Optional[str], telegram_conn_id: Optional[str]) -> str: """ Returns the telegram API token @@ -103,7 +103,7 @@ def __get_token(self, token: Optional[str], telegram_conn_id: str) -> str: raise AirflowException("Cannot get token: No valid Telegram connection supplied.") - def __get_chat_id(self, chat_id: Optional[str], telegram_conn_id: str) -> Optional[str]: + def __get_chat_id(self, chat_id: Optional[str], telegram_conn_id: Optional[str]) -> Optional[str]: """ Returns the telegram chat ID for a chat/channel/group diff --git a/airflow/providers/telegram/operators/telegram.py b/airflow/providers/telegram/operators/telegram.py index 57f6288da063c..fea1934ffa0d1 100644 --- a/airflow/providers/telegram/operators/telegram.py +++ b/airflow/providers/telegram/operators/telegram.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """Operator for Telegram""" -from typing import Optional +from typing import Dict, Optional from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -70,7 +70,7 @@ def __init__( super().__init__(**kwargs) - def execute(self, **kwargs) -> None: + def execute(self, context: Dict) -> None: """Calls the TelegramHook to post the provided Telegram message""" if self.text: self.telegram_kwargs['text'] = self.text diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py index 4f0a2d5adcb4a..501ef585ad95e 100644 --- a/airflow/providers/trino/hooks/trino.py +++ b/airflow/providers/trino/hooks/trino.py @@ -138,12 +138,7 @@ def get_pandas_df(self, hql, parameters=None, **kwargs): df = pandas.DataFrame(**kwargs) return df - def run( - self, - hql, - autocommit: bool = False, - parameters: Optional[dict] = None, - ) -> None: + def run(self, hql, autocommit: bool = False, parameters: Optional[dict] = None, handler=None) -> None: """Execute the statement against Trino. Can be used to create views.""" return super().run(sql=self._strip_sql(hql), parameters=parameters) diff --git a/tests/providers/telegram/operators/test_telegram.py b/tests/providers/telegram/operators/test_telegram.py index d6341e2ab736f..8d4fde034185e 100644 --- a/tests/providers/telegram/operators/test_telegram.py +++ b/tests/providers/telegram/operators/test_telegram.py @@ -58,7 +58,7 @@ def test_should_send_message_when_all_parameters_are_provided(self, mock_telegra task_id='telegram', text="some non empty text", ) - hook.execute() + hook.execute(None) mock_telegram_hook.assert_called_once_with( telegram_conn_id='telegram_default', @@ -89,7 +89,7 @@ def side_effect(*args, **kwargs): task_id='telegram', text="some non empty text", ) - hook.execute() + hook.execute(None) assert "cosmic rays caused bit flips" == str(ctx.value) @@ -105,7 +105,7 @@ def test_should_forward_all_args_to_telegram(self, mock_telegram_hook): text="some non empty text", telegram_kwargs={"custom_arg": "value"}, ) - hook.execute() + hook.execute(None) mock_telegram_hook.assert_called_once_with( telegram_conn_id='telegram_default', @@ -128,7 +128,7 @@ def test_should_give_precedence_to_text_passed_in_constructor(self, mock_telegra text="some non empty text - higher precedence", telegram_kwargs={"custom_arg": "value", "text": "some text, that will be ignored"}, ) - hook.execute() + hook.execute(None) mock_telegram_hook.assert_called_once_with( telegram_conn_id='telegram_default', @@ -159,7 +159,8 @@ def test_should_return_templatized_text_field(self, mock_hook): telegram_kwargs={"custom_arg": "value", "text": "should be ignored"}, ) operator.render_template_fields({"ds": "2021-02-04"}) - operator.execute() + + operator.execute(None) assert operator.text == "execution date is 2021-02-04" assert 'text' in operator.telegram_kwargs assert operator.telegram_kwargs['text'] == "execution date is 2021-02-04"