diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index 229a114fc9cfb..055712529f835 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -19,7 +19,7 @@ import inspect import re from inspect import signature -from typing import Any, Callable, Dict, Optional, Tuple, TypeVar, cast +from typing import Any, Callable, Dict, Iterable, Optional, Tuple, Type, TypeVar, cast from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -101,7 +101,7 @@ class DecoratedOperator(BaseOperator): :type kwargs_to_upstream: dict """ - template_fields = ('op_args', 'op_kwargs') + template_fields: Iterable[str] = ('op_args', 'op_kwargs') template_fields_renderers = {"op_args": "py", "op_kwargs": "py"} # since we won't mutate the arguments, we should just do the shallow copy @@ -180,7 +180,7 @@ def _hook_apply_defaults(self, *args, **kwargs): def task_decorator_factory( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, - decorated_operator_class: BaseOperator = None, + decorated_operator_class: Type[BaseOperator] = None, **kwargs, ) -> Callable[[T], T]: """ diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py index 7421cda1cef9f..e1830ea7bd8d7 100644 --- a/airflow/providers/docker/decorators/docker.py +++ b/airflow/providers/docker/decorators/docker.py @@ -62,7 +62,7 @@ class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): :type multiple_outputs: bool """ - template_fields = ('op_args', 'op_kwargs') + template_fields: Iterable[str] = ('op_args', 'op_kwargs') # since we won't mutate the arguments, we should just do the shallow copy # there are some cases we can't deepcopy the objects (e.g protobuf). diff --git a/airflow/providers/docker/hooks/docker.py b/airflow/providers/docker/hooks/docker.py index 49c76c71ad1e8..76db2d84505d2 100644 --- a/airflow/providers/docker/hooks/docker.py +++ b/airflow/providers/docker/hooks/docker.py @@ -52,7 +52,7 @@ def get_ui_field_behaviour() -> Dict: def __init__( self, - docker_conn_id: str = default_conn_name, + docker_conn_id: Optional[str] = default_conn_name, base_url: Optional[str] = None, version: Optional[str] = None, tls: Optional[str] = None, @@ -63,7 +63,11 @@ def __init__( if not version: raise AirflowException('No Docker API version provided') + if not docker_conn_id: + raise AirflowException('No Docker connection id provided') + conn = self.get_connection(docker_conn_id) + if not conn.host: raise AirflowException('No Docker URL provided') if not conn.login: @@ -87,7 +91,7 @@ def get_conn(self) -> APIClient: self.__login(client) return client - def __login(self, client) -> int: + def __login(self, client) -> None: self.log.debug('Logging into Docker') try: client.login( diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 506e89ae4d91f..652424fac4aa5 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -152,7 +152,7 @@ class DockerOperator(BaseOperator): :type retrieve_output_path: Optional[str] """ - template_fields = ('image', 'command', 'environment', 'container_name') + template_fields: Iterable[str] = ('image', 'command', 'environment', 'container_name') template_ext = ( '.sh', '.bash', @@ -281,6 +281,8 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir else: self.environment.pop('AIRFLOW_TMP_DIR', None) + if not self.cli: + raise Exception("The 'cli' should be initialized before!") self.container = self.cli.create_container( command=self.format_command(self.command), name=self.container_name,