From eee668769685024817078e2894ff9971f2ccb011 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 19 Aug 2019 09:13:22 -0400 Subject: [PATCH] [AIRFLOW-5256] Related pylint changes for common licences in python files --- .pre-commit-config.yaml | 14 +- CONTRIBUTING.md | 6 +- airflow/api/auth/backend/kerberos_auth.py | 17 ++ airflow/config_templates/default_celery.py | 2 +- airflow/contrib/hooks/grpc_hook.py | 38 +++-- airflow/contrib/hooks/qubole_hook.py | 58 ++++--- airflow/contrib/hooks/ssh_hook.py | 1 - .../operators/aws_sqs_publish_operator.py | 6 +- .../operators/docker_swarm_operator.py | 5 +- .../operators/kubernetes_pod_operator.py | 6 +- airflow/contrib/operators/qubole_operator.py | 8 +- airflow/example_dags/docker_copy_data.py | 1 + airflow/exceptions.py | 17 +- airflow/executors/kubernetes_executor.py | 57 +++++-- airflow/gcp/example_dags/example_bigtable.py | 6 +- airflow/gcp/hooks/mlengine.py | 26 +-- airflow/gcp/operators/mlengine.py | 26 +-- airflow/gcp/utils/mlengine_operator_utils.py | 27 +-- .../gcp/utils/mlengine_prediction_summary.py | 27 +-- airflow/kubernetes/kube_client.py | 15 +- airflow/kubernetes/pod.py | 23 +++ airflow/kubernetes/pod_generator.py | 9 +- airflow/kubernetes/pod_launcher.py | 50 ++++-- airflow/kubernetes/secret.py | 4 + airflow/kubernetes/volume.py | 19 ++- airflow/kubernetes/volume_mount.py | 36 ++-- airflow/kubernetes/worker_configuration.py | 10 +- airflow/lineage/backend/__init__.py | 2 + ...4_add_kubernetes_resource_checkpointing.py | 4 +- ...5c0_add_kubernetes_scheduler_uniqueness.py | 4 +- .../versions/dd25f486b8ea_add_idx_log_dag.py | 4 +- airflow/models/__init__.py | 2 +- airflow/models/baseoperator.py | 83 +++++++--- airflow/models/taskfail.py | 2 +- airflow/models/taskreschedule.py | 2 +- airflow/operators/docker_operator.py | 37 ++++- airflow/security/kerberos.py | 49 +++++- airflow/security/utils.py | 23 ++- airflow/utils/log/file_task_handler.py | 44 ++--- airflow/utils/strings.py | 1 - dev/airflow-jira | 28 ++-- dev/airflow-license | 27 +-- dev/airflow-pr | 28 ++-- docs/conf.py | 2 +- docs/exts/__init__.py | 1 - docs/exts/docroles.py | 52 ++++-- docs/exts/exampleinclude.py | 72 ++++++-- docs/exts/removemarktransform.py | 15 +- pylintrc | 1 + scripts/ci/_utils.sh | 15 ++ scripts/ci/ci_refresh_pylint_todo.sh | 37 +++++ .../ci/in_container/_in_container_utils.sh | 54 ++++++ .../ci/in_container/refresh_pylint_todo.sh | 33 ++++ scripts/ci/in_container/run_pylint_tests.sh | 6 +- scripts/ci/pylint_todo.txt | 154 ++---------------- setup.py | 12 +- tests/contrib/hooks/test_grpc_hook.py | 29 ++-- tests/contrib/hooks/test_salesforce_hook.py | 3 +- .../operators/test_aws_athena_operator.py | 9 +- tests/contrib/operators/test_grpc_operator.py | 25 +-- .../operators/test_sagemaker_base_operator.py | 6 +- .../test_sagemaker_training_operator.py | 9 +- tests/contrib/utils/gcp_authenticator.py | 16 +- tests/dags/test_clear_subdag.py | 3 +- tests/dags/test_example_bash_operator.py | 1 - tests/executors/test_kubernetes_executor.py | 40 ++--- tests/gcp/hooks/test_bigquery.py | 6 +- tests/kubernetes/test_pod_launcher.py | 5 +- tests/minikube/test_kubernetes_executor.py | 8 +- .../minikube/test_kubernetes_pod_operator.py | 13 +- tests/operators/test_docker_operator.py | 4 +- tests/utils/log/elasticmock/__init__.py | 29 +++- .../log/elasticmock/fake_elasticsearch.py | 79 ++++++--- .../log/elasticmock/utilities/__init__.py | 20 ++- 74 files changed, 1031 insertions(+), 582 deletions(-) create mode 100755 scripts/ci/ci_refresh_pylint_todo.sh create mode 100755 scripts/ci/in_container/refresh_pylint_todo.sh diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9667263190293..09b48e3a4bbc6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -92,7 +92,7 @@ repos: - license-templates/LICENSE.txt - --fuzzy-match-generates-todo - id: insert-license - name: Add licence for shell files + name: Add licence for all shell files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ types: [shell] files: ^breeze$|^breeze-complete$|\.sh$ @@ -102,6 +102,16 @@ repos: - --license-filepath - license-templates/LICENSE.txt - --fuzzy-match-generates-todo + - id: insert-license + name: Add licence for all python files + exclude: ^\.github/.*$"|^airflow/_vendor/.*$ + types: [python] + args: + - --comment-style + - "|#|" + - --license-filepath + - license-templates/LICENSE.txt + - --fuzzy-match-generates-todo - id: insert-license name: Add licence for all XML files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ @@ -113,7 +123,7 @@ repos: - license-templates/LICENSE.txt - --fuzzy-match-generates-todo - id: insert-license - name: Add licence for yaml files + name: Add licence for all yaml files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ types: [yaml] args: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b501886f595cb..93306591d7185 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -199,8 +199,12 @@ as follows: 3) Fix all the issues reported by pylint 4) Re-run [scripts/ci/ci_pylint.sh](scripts/ci/ci_pylint.sh) 5) If you see "success" - submit PR following [Pull Request guidelines](#pull-request-guidelines) +6) You can refresh periodically [scripts/ci/pylint_todo.txt](scripts/ci/pylint_todo.txt) file. + You can do it by running + [scripts/ci/ci_refresh_pylint_todo.sh](scripts/ci/ci_refresh_pylint_todo.sh). + This can take quite some time (especially on MacOS)! -There are following guidelines when fixing pylint errors: +You can follow these guidelines when fixing pylint errors: * Ideally fix the errors rather than disable pylint checks - often you can easily refactor the code (IntelliJ/PyCharm might be helpful when extracting methods in complex code or moving methods around) diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py index 6db971383a34c..b1bff2eb106d5 100644 --- a/airflow/api/auth/backend/kerberos_auth.py +++ b/airflow/api/auth/backend/kerberos_auth.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # Copyright (c) 2013, Michael Komitee # All rights reserved. diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 4a6da2a50728e..35a7c510ed810 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Default celery configuration.""" import ssl from airflow.configuration import conf diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py index a6ca8a7e5ca92..664ed70b7372a 100644 --- a/airflow/contrib/hooks/grpc_hook.py +++ b/airflow/contrib/hooks/grpc_hook.py @@ -1,17 +1,23 @@ # -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""GRPC Hook""" import grpc from google import auth as google_auth @@ -58,7 +64,7 @@ def get_conn(self): if auth_type == "NO_AUTH": channel = grpc.insecure_channel(base_url) - elif auth_type == "SSL" or auth_type == "TLS": + elif auth_type in {"SSL", "TLS"}: credential_file_name = self._get_field("credential_pem_file") creds = grpc.ssl_channel_credentials(open(credential_file_name).read()) channel = grpc.secure_channel(base_url, creds) @@ -91,7 +97,9 @@ def get_conn(self): return channel - def run(self, stub_class, call_func, streaming=False, data={}): + def run(self, stub_class, call_func, streaming=False, data=None): + if data is None: + data = {} with self.get_conn() as channel: stub = stub_class(channel) try: @@ -102,10 +110,14 @@ def run(self, stub_class, call_func, streaming=False, data={}): else: yield from response except grpc.RpcError as ex: + # noinspection PyUnresolvedReferences self.log.exception( "Error occurred when calling the grpc service: {0}, method: {1} \ status code: {2}, error details: {3}" - .format(stub.__class__.__name__, call_func, ex.code(), ex.details())) + .format(stub.__class__.__name__, + call_func, + ex.code(), # pylint: disable=no-member + ex.details())) # pylint: disable=no-member raise ex def _get_field(self, field_name, default=None): diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index c2177c19743e4..8a87c04aca22a 100644 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -17,13 +17,18 @@ # specific language governing permissions and limitations # under the License. # - +"""Qubole hook""" import os import pathlib import time import datetime import re +from qds_sdk.qubole import Qubole +from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \ + PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \ + DbImportCommand, SqlCommand + from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow.configuration import conf @@ -31,11 +36,6 @@ from airflow.utils.state import State from airflow.models import TaskInstance -from qds_sdk.qubole import Qubole -from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \ - PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \ - DbImportCommand, SqlCommand - COMMAND_CLASSES = { "hivecmd": HiveCommand, "prestocmd": PrestoCommand, @@ -57,20 +57,24 @@ def flatten_list(list_of_lists): + """Flatten the list""" return [element for array in list_of_lists for element in array] def filter_options(options): + """Remove options from the list""" options_to_remove = ["help", "print-logs-live", "print-logs"] return [option for option in options if option not in options_to_remove] def get_options_list(command_class): + """Get options list""" options_list = [option.get_opt_string().strip("--") for option in command_class.optparser.option_list] return filter_options(options_list) def build_command_args(): + """Build Command argument from command and options""" command_args, hyphen_args = {}, set() for cmd in COMMAND_CLASSES: @@ -95,7 +99,8 @@ def build_command_args(): class QuboleHook(BaseHook): - def __init__(self, *args, **kwargs): + """Hook for Qubole communication""" + def __init__(self, *args, **kwargs): # pylint: disable=unused-argument conn = self.get_connection(kwargs['qubole_conn_id']) Qubole.configure(api_token=conn.password, api_url=conn.host) self.task_id = kwargs['task_id'] @@ -107,6 +112,7 @@ def __init__(self, *args, **kwargs): @staticmethod def handle_failure_retry(context): + """Handle retries in case of failures""" ti = context['ti'] cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id) @@ -123,6 +129,7 @@ def handle_failure_retry(context): cmd.cancel() def execute(self, context): + """Execute call""" args = self.cls.parse(self.create_cmd_args(context)) self.cmd = self.cls.create(**args) self.task_instance = context['task_instance'] @@ -197,7 +204,7 @@ def get_log(self, ti): """ if self.cmd is None: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) - Command.get_log_id(self.cls, cmd_id) + Command.get_log_id(cmd_id) def get_jobs_id(self, ti): """ @@ -207,8 +214,9 @@ def get_jobs_id(self, ti): """ if self.cmd is None: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) - Command.get_jobs_id(self.cls, cmd_id) + Command.get_jobs_id(cmd_id) + # noinspection PyMethodMayBeStatic def get_extra_links(self, operator, dttm): """ Get link to qubole command result page. @@ -229,28 +237,25 @@ def get_extra_links(self, operator, dttm): return url def create_cmd_args(self, context): + """Creates command arguments""" args = [] cmd_type = self.kwargs['command_type'] inplace_args = None tags = {self.dag_id, self.task_id, context['run_id']} positional_args_list = flatten_list(POSITIONAL_ARGS.values()) - for k, v in self.kwargs.items(): - if k in COMMAND_ARGS[cmd_type]: - if k in HYPHEN_ARGS: - args.append("--{0}={1}".format(k.replace('_', '-'), v)) - elif k in positional_args_list: - inplace_args = v - elif k == 'tags': - if isinstance(v, str): - tags.add(v) - elif isinstance(v, (list, tuple)): - for val in v: - tags.add(val) + for key, value in self.kwargs.items(): + if key in COMMAND_ARGS[cmd_type]: + if key in HYPHEN_ARGS: + args.append("--{0}={1}".format(key.replace('_', '-'), value)) + elif key in positional_args_list: + inplace_args = value + elif key == 'tags': + self._add_tags(tags, value) else: - args.append("--{0}={1}".format(k, v)) + args.append("--{0}={1}".format(key, value)) - if k == 'notify' and v is True: + if key == 'notify' and value is True: args.append("--notify") args.append("--tags={0}".format(','.join(filter(None, tags)))) @@ -259,3 +264,10 @@ def create_cmd_args(self, context): args += inplace_args.split(' ') return args + + @staticmethod + def _add_tags(tags, value): + if isinstance(value, str): + tags.add(value) + elif isinstance(value, (list, tuple)): + tags.extend(value) diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py index b431b9f703928..ab0ab085b87d7 100644 --- a/airflow/contrib/hooks/ssh_hook.py +++ b/airflow/contrib/hooks/ssh_hook.py @@ -29,7 +29,6 @@ from airflow.hooks.base_hook import BaseHook -# noinspection PyAbstractClass class SSHHook(BaseHook): """ Hook for ssh remote execution using Paramiko. diff --git a/airflow/contrib/operators/aws_sqs_publish_operator.py b/airflow/contrib/operators/aws_sqs_publish_operator.py index 072339838a68c..0bf2f7f84c660 100644 --- a/airflow/contrib/operators/aws_sqs_publish_operator.py +++ b/airflow/contrib/operators/aws_sqs_publish_operator.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# -# Licensed to the ApachMee Software Foundation (ASF) under one + +# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +"""Publish message to SQS queue""" from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.contrib.hooks.aws_sqs_hook import SQSHook @@ -38,7 +39,6 @@ class SQSPublishOperator(BaseOperator): :param aws_conn_id: AWS connection id (default: aws_default) :type aws_conn_id: str """ - template_fields = ('sqs_queue', 'message_content', 'delay_seconds') ui_color = '#6ad3fa' diff --git a/airflow/contrib/operators/docker_swarm_operator.py b/airflow/contrib/operators/docker_swarm_operator.py index 9ac3057a75fac..2f63ccd120f0b 100644 --- a/airflow/contrib/operators/docker_swarm_operator.py +++ b/airflow/contrib/operators/docker_swarm_operator.py @@ -1,8 +1,4 @@ -''' -Run ephemeral Docker Swarm services -''' # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -19,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Run ephemeral Docker Swarm services""" from docker import types diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 4c09ce07c76bb..7616b2ca7f9aa 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Executes task in a Kubernetes POD""" from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -23,7 +23,7 @@ from airflow.utils.state import State -class KubernetesPodOperator(BaseOperator): +class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes """ Execute a task in a Kubernetes Pod @@ -172,7 +172,7 @@ def execute(self, context): raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) @apply_defaults - def __init__(self, + def __init__(self, # pylint: disable=too-many-arguments,too-many-locals namespace, image, name, diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index e591ed9d0894e..ddd7e80542f00 100644 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Qubole operator""" from typing import Iterable from airflow.models.baseoperator import BaseOperator, BaseOperatorLink @@ -25,7 +26,7 @@ class QDSLink(BaseOperatorLink): - + """Link to QDS""" name = 'Go to QDS' def get_link(self, operator, dttm): @@ -191,16 +192,19 @@ def on_kill(self, ti=None): self.get_hook().kill(ti) def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): + """get_results from Qubole""" return self.get_hook().get_results(ti, fp, inline, delim, fetch) def get_log(self, ti): + """get_log from Qubole""" return self.get_hook().get_log(ti) def get_jobs_id(self, ti): + """get jobs_id from Qubole""" return self.get_hook().get_jobs_id(ti) def get_hook(self): - # Reinitiating the hook, as some template fields might have changed + """Reinitialising the hook, as some template fields might have changed""" return QuboleHook(*self.args, **self.kwargs) def __getattribute__(self, name): diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index 484f82f683df3..b41131d883d94 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 09abf79c4b380..710c15ccf38b0 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -19,6 +19,7 @@ # # Note: Any AirflowException raised is expected to cause the TaskInstance # to be marked in an ERROR state +"""Exceptions used by Airflow""" class AirflowException(Exception): @@ -40,11 +41,11 @@ class AirflowNotFoundException(AirflowException): class AirflowConfigException(AirflowException): - pass + """Raise when there is configuration problem""" class AirflowSensorTimeout(AirflowException): - pass + """Raise when there is a timeout on sensor polling""" class AirflowRescheduleException(AirflowException): @@ -52,30 +53,30 @@ class AirflowRescheduleException(AirflowException): Raise when the task should be re-scheduled at a later time. :param reschedule_date: The date when the task should be rescheduled - :type reschedule: datetime.datetime + :type reschedule_date: datetime.datetime """ def __init__(self, reschedule_date): self.reschedule_date = reschedule_date class InvalidStatsNameException(AirflowException): - pass + """Raise when name of the stats is invalid""" class AirflowTaskTimeout(AirflowException): - pass + """Raise when the task execution times-out""" class AirflowWebServerTimeout(AirflowException): - pass + """Raise when the web server times out""" class AirflowSkipException(AirflowException): - pass + """Raise when the task should be skipped""" class AirflowDagCycleException(AirflowException): - pass + """Raise when there is a cycle in Dag definition""" class DagNotFound(AirflowNotFoundException): diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index fd7a8d0e7afd4..676fbbca07116 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Kubernetes executor""" import base64 import hashlib from queue import Empty @@ -22,8 +22,10 @@ import re import json import multiprocessing -from dateutil import parser from uuid import uuid4 + +from dateutil import parser + import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException @@ -40,8 +42,12 @@ from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin +MAX_POD_ID_LEN = 253 +MAX_LABEL_LEN = 63 -class KubeConfig: + +class KubeConfig: # pylint: disable=too-many-instance-attributes + """Configuration for Kubernetes""" core_section = 'core' kubernetes_section = 'kubernetes' @@ -197,13 +203,14 @@ def __init__(self): # and only return a blank string if contexts are not set. def _get_security_context_val(self, scontext): val = conf.get(self.kubernetes_section, scontext) - if len(val) == 0: - return val + if not val: + return 0 else: return int(val) def _validate(self): # TODO: use XOR for dags_volume_claim and git_dags_folder_mount_point + # pylint: disable=too-many-boolean-expressions if not self.dags_volume_claim \ and not self.dags_volume_host \ and not self.dags_in_image \ @@ -223,9 +230,11 @@ def _validate(self): 'must be set for authentication through user credentials; ' 'or `git_ssh_key_secret_name` must be set for authentication ' 'through ssh key, but not both') + # pylint: enable=too-many-boolean-expressions -class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): +class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): + """Watches for Kubernetes jobs""" def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace @@ -235,6 +244,7 @@ def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube self.kube_config = kube_config def run(self): + """Performs watching""" kube_client = get_kube_client() while True: try: @@ -280,6 +290,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): return last_resource_version def process_error(self, event): + """Process error response""" self.log.error( 'Encountered Error response from k8s list namespaced pod stream => %s', event @@ -288,16 +299,17 @@ def process_error(self, event): if raw_object['code'] == 410: self.log.info( 'Kubernetes resource version is too old, must reset to 0 => %s', - raw_object['message'] + (raw_object['message'],) ) # Return resource version 0 return '0' raise AirflowException( - 'Kubernetes failure for %s with code %s and message: %s', - raw_object['reason'], raw_object['code'], raw_object['message'] + 'Kubernetes failure for %s with code %s and message: %s' % + (raw_object['reason'], raw_object['code'], raw_object['message']) ) def process_status(self, pod_id, status, labels, resource_version): + """Process status response""" if status == 'Pending': self.log.info('Event: %s Pending', pod_id) elif status == 'Failed': @@ -316,6 +328,7 @@ def process_status(self, pod_id, status, labels, resource_version): class AirflowKubernetesScheduler(LoggingMixin): + """Airflow Scheduler for Kubernetes""" def __init__(self, kube_config, task_queue, result_queue, kube_client, worker_uuid): self.log.debug("Creating Kubernetes executor") self.kube_config = kube_config @@ -349,7 +362,6 @@ def _health_check_kube_watcher(self): def run_next(self, next_job): """ - The run_next command will check the task_queue for any un-run jobs. It will then create a unique job-id, launch that job in the cluster, and store relevant info in the current_jobs map so we can track the job's @@ -380,6 +392,7 @@ def run_next(self, next_job): self.log.debug("Kubernetes Job created!") def delete_pod(self, pod_id: str) -> None: + """Deletes POD""" try: self.kube_client.delete_namespaced_pod( pod_id, self.namespace, body=client.V1DeleteOptions(), @@ -410,6 +423,7 @@ def sync(self): break def process_watcher_task(self, task): + """Process the task by watcher.""" pod_id, state, labels, resource_version = task self.log.info( 'Attempting to finish pod; pod_id: %s; state: %s; labels: %s', @@ -447,8 +461,6 @@ def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid): :param random_uuid: a uuid :return: ``str`` valid Pod name of appropriate length """ - MAX_POD_ID_LEN = 253 - safe_key = safe_dag_id + safe_task_id safe_pod_id = safe_key[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid @@ -466,8 +478,6 @@ def _make_safe_label_value(string): way from the original value sent to this function, then we need to truncate to 53chars, and append it with a unique hash. """ - MAX_LABEL_LEN = 63 - safe_label = re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string) if len(safe_label) > MAX_LABEL_LEN or string != safe_label: @@ -521,7 +531,7 @@ def _labels_to_key(self, labels): dag_id = labels['dag_id'] task_id = labels['task_id'] ex_time = self._label_safe_datestring_to_datetime(labels['execution_date']) - except Exception as e: + except Exception as e: # pylint: disable=broad-except self.log.warning( 'Error while retrieving labels; labels: %s; exception: %s', labels, e @@ -558,11 +568,13 @@ def _labels_to_key(self, labels): return None def terminate(self): + """Termninates the watcher.""" self.watcher_queue.join() self._manager.shutdown() class KubernetesExecutor(BaseExecutor, LoggingMixin): + """Executor for Kubernetes""" def __init__(self): self.kube_config = KubeConfig() self.task_queue = None @@ -599,6 +611,8 @@ def clear_not_launched_queued_tasks(self, session=None): ) for task in queued_tasks: + # noinspection PyProtectedMember + # pylint: disable=protected-access dict_string = ( "dag_id={},task_id={},execution_date={},airflow-worker={}".format( AirflowKubernetesScheduler._make_safe_label_value(task.dag_id), @@ -609,13 +623,14 @@ def clear_not_launched_queued_tasks(self, session=None): self.worker_uuid ) ) + # pylint: enable=protected-access kwargs = dict(label_selector=dict_string) if self.kube_config.kube_client_request_args: for key, value in self.kube_config.kube_client_request_args.iteritems(): kwargs[key] = value pod_list = self.kube_client.list_namespaced_pod( self.kube_config.kube_namespace, **kwargs) - if len(pod_list.items) == 0: + if not pod_list.items: self.log.info( 'TaskInstance: %s found in queued state but was not launched, ' 'rescheduling', task @@ -663,6 +678,7 @@ def _create_or_update_secret(secret_name, secret_path): _create_or_update_secret(service_account['name'], service_account['path']) def start(self): + """Starts the executor""" self.log.info('Start Kubernetes executor') self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid() self.log.debug('Start with worker_uuid: %s', self.worker_uuid) @@ -682,6 +698,7 @@ def start(self): self.clear_not_launched_queued_tasks() def execute_async(self, key, command, queue=None, executor_config=None): + """Executes task asynchronously""" self.log.info( 'Add task %s with command %s with executor_config %s', key, command, executor_config @@ -691,6 +708,7 @@ def execute_async(self, key, command, queue=None, executor_config=None): self.task_queue.put((key, command, kube_executor_config)) def sync(self): + """Synchronize task state.""" if self.running: self.log.debug('self.running: %s', self.running) if self.queued_tasks: @@ -698,7 +716,7 @@ def sync(self): self.kube_scheduler.sync() last_resource_version = None - while True: + while True: # pylint: disable=too-many-nested-blocks try: results = self.result_queue.get_nowait() try: @@ -707,7 +725,7 @@ def sync(self): self.log.info('Changing state of %s to %s', results, state) try: self._change_state(key, state, pod_id) - except Exception as e: + except Exception as e: # pylint: disable=broad-except self.log.exception('Exception: %s when attempting ' + 'to change state of %s to %s, re-queueing.', e, results, state) self.result_queue.put(results) @@ -718,6 +736,7 @@ def sync(self): KubeResourceVersion.checkpoint_resource_version(last_resource_version) + # pylint: disable=too-many-nested-blocks for _ in range(self.kube_config.worker_pods_creation_batch_size): try: task = self.task_queue.get_nowait() @@ -731,6 +750,7 @@ def sync(self): self.task_queue.task_done() except Empty: break + # pylint: enable=too-many-nested-blocks def _change_state(self, key, state, pod_id: str) -> None: if state != State.RUNNING: @@ -744,6 +764,7 @@ def _change_state(self, key, state, pod_id: str) -> None: self.event_buffer[key] = state def end(self): + """Called when the executor shuts down""" self.log.info('Shutting down Kubernetes executor') self.task_queue.join() self.result_queue.join() diff --git a/airflow/gcp/example_dags/example_bigtable.py b/airflow/gcp/example_dags/example_bigtable.py index cecc57c86f39d..e8bbc8e665fab 100644 --- a/airflow/gcp/example_dags/example_bigtable.py +++ b/airflow/gcp/example_dags/example_bigtable.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. diff --git a/airflow/gcp/hooks/mlengine.py b/airflow/gcp/hooks/mlengine.py index 0358ffbffc14f..6ef8987abd888 100644 --- a/airflow/gcp/hooks/mlengine.py +++ b/airflow/gcp/hooks/mlengine.py @@ -1,18 +1,20 @@ # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. """ This module contains a Google ML Engine Hook. """ diff --git a/airflow/gcp/operators/mlengine.py b/airflow/gcp/operators/mlengine.py index 97166c6a2f0d0..35f912115d5d6 100644 --- a/airflow/gcp/operators/mlengine.py +++ b/airflow/gcp/operators/mlengine.py @@ -1,18 +1,20 @@ # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. """ This module contains GCP MLEngine operators. """ diff --git a/airflow/gcp/utils/mlengine_operator_utils.py b/airflow/gcp/utils/mlengine_operator_utils.py index 4eb642c3e9cb9..bc68e8deb2f67 100644 --- a/airflow/gcp/utils/mlengine_operator_utils.py +++ b/airflow/gcp/utils/mlengine_operator_utils.py @@ -1,18 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. """ This module contains helper functions for MLEngine operators. """ diff --git a/airflow/gcp/utils/mlengine_prediction_summary.py b/airflow/gcp/utils/mlengine_prediction_summary.py index beca4b4962124..1a0853160133c 100644 --- a/airflow/gcp/utils/mlengine_prediction_summary.py +++ b/airflow/gcp/utils/mlengine_prediction_summary.py @@ -1,20 +1,21 @@ # flake8: noqa: F841 # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. """A template called by DataFlowPythonOperator to summarize BatchPrediction. It accepts a user function to calculate the metric(s) per instance in diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py index 7a00be751f09f..52d68f1324d4f 100644 --- a/airflow/kubernetes/kube_client.py +++ b/airflow/kubernetes/kube_client.py @@ -14,11 +14,12 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Client for kubernetes communication""" from airflow.configuration import conf try: from kubernetes import config, client - from kubernetes.client.rest import ApiException + from kubernetes.client.rest import ApiException # pylint: disable=unused-import has_kubernetes = True except ImportError as e: # We need an exception class to be able to use it in ``except`` elsewhere @@ -41,6 +42,18 @@ def _load_kube_config(in_cluster, cluster_context, config_file): def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), cluster_context=None, config_file=None): + """ + Retrieves Kubernetes client + + :param in_cluster: whether we are in cluster + :type in_cluster: bool + :param cluster_context: context of the cluster + :type cluster_context: str + :param config_file: configuration file + :type config_file: str + :return kubernetes client + :rtype client.CoreV1Api + """ if not in_cluster: if cluster_context is None: cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 3765ab24eeac0..bdce1ce50a042 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -24,6 +24,20 @@ class Resources(K8SModel): + """ + Stores information about resources used by the Pod. + + :param request_memory: requested memory + :type request_memory: str + :param request_cpu: requested CPU number + :type request_cpu: float | str + :param limit_memory: limit for memory usage + :type limit_memory: str + :param limit_cpu: Limit for CPU used + :type limit_cpu: float | str + :param limit_gpu: Limits for GPU used + :type limit_gpu: int + """ def __init__( self, request_memory=None, @@ -38,21 +52,26 @@ def __init__( self.limit_gpu = limit_gpu def is_empty_resource_request(self): + """Whether resource is empty""" return not self.has_limits() and not self.has_requests() def has_limits(self): + """Whether resource has limits""" return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None def has_requests(self): + """Whether resource has requests""" return self.request_cpu is not None or self.request_memory is not None def to_k8s_client_obj(self) -> k8s.V1ResourceRequirements: + """Converts to k8s client object""" return k8s.V1ResourceRequirements( limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu}, requests={'cpu': self.request_cpu, 'memory': self.request_memory} ) def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: + """Attaches to pod""" cp_pod = copy.deepcopy(pod) resources = self.to_k8s_client_obj() cp_pod.spec.containers[0].resources = resources @@ -60,17 +79,21 @@ def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: class Port(K8SModel): + """POD port""" def __init__( self, name=None, container_port=None): + """Creates port""" self.name = name self.container_port = container_port def to_k8s_client_obj(self) -> k8s.V1ContainerPort: + """Converts to k8s client object""" return k8s.V1ContainerPort(name=self.name, container_port=self.container_port) def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: + """Attaches to pod""" cp_pod = copy.deepcopy(pod) port = self.to_k8s_client_obj() cp_pod.spec.containers[0].ports = cp_pod.spec.containers[0].ports or [] diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index cc821af2edc8a..d12b7525785df 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -22,9 +22,10 @@ """ import copy +import uuid + import kubernetes.client.models as k8s from airflow.executors import Executors -import uuid class PodDefaults: @@ -91,8 +92,7 @@ class PodGenerator: :param pod: The fully specified pod. :type pod: kubernetes.client.models.V1Pod """ - - def __init__( + def __init__( # pylint: disable=too-many-arguments,too-many-locals self, image, name=None, @@ -189,6 +189,7 @@ def __init__( self.extract_xcom = extract_xcom def gen_pod(self) -> k8s.V1Pod: + """Generates pod""" result = self.ud_pod if result is None: @@ -204,6 +205,7 @@ def gen_pod(self) -> k8s.V1Pod: @staticmethod def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: + """Adds sidecar""" pod_cp = copy.deepcopy(pod) pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME) @@ -214,6 +216,7 @@ def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod: @staticmethod def from_obj(obj) -> k8s.V1Pod: + """Converts to pod from obj""" if obj is None: return k8s.V1Pod() diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index 7dffa9f144249..3e97112d6dc7c 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -14,27 +14,32 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Launches PODs""" import json import time -import tenacity +from datetime import datetime as dt from typing import Tuple, Optional -from airflow.settings import pod_mutation_hook -from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.state import State -from datetime import datetime as dt -from kubernetes.client.models.v1_pod import V1Pod +from requests.exceptions import BaseHTTPError + +import tenacity + from kubernetes import watch, client from kubernetes.client.rest import ApiException from kubernetes.stream import stream as kubernetes_stream +from kubernetes.client.models.v1_pod import V1Pod + +from airflow.settings import pod_mutation_hook +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State from airflow import AirflowException -from requests.exceptions import BaseHTTPError -from .kube_client import get_kube_client from airflow.kubernetes.pod_generator import PodDefaults +from .kube_client import get_kube_client + class PodStatus: + """Status of the PODs""" PENDING = 'pending' RUNNING = 'running' FAILED = 'failed' @@ -42,8 +47,20 @@ class PodStatus: class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None, in_cluster=True, cluster_context=None, - extract_xcom=False): + """Launches PODS""" + def __init__(self, + kube_client: client.CoreV1Api = None, + in_cluster: bool = True, + cluster_context: str = None, + extract_xcom: bool = False): + """ + Creates the launcher. + + :param kube_client: kubernetes client + :param in_cluster: whether we are in cluster + :param cluster_context: context of the cluster + :param extract_xcom: whether we should extract xcom + """ super().__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) @@ -51,6 +68,7 @@ def __init__(self, kube_client=None, in_cluster=True, cluster_context=None, self.extract_xcom = extract_xcom def run_pod_async(self, pod: V1Pod, **kwargs): + """Runs POD asynchronously""" pod_mutation_hook(pod) sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) @@ -68,6 +86,7 @@ def run_pod_async(self, pod: V1Pod, **kwargs): return resp def delete_pod(self, pod: V1Pod): + """Deletes POD""" try: self._client.delete_namespaced_pod( pod.metadata.name, pod.metadata.namespace, body=client.V1DeleteOptions()) @@ -127,14 +146,17 @@ def _task_status(self, event): return status def pod_not_started(self, pod: V1Pod): + """Tests if pod has not started""" state = self._task_status(self.read_pod(pod)) return state == State.QUEUED def pod_is_running(self, pod: V1Pod): + """Tests if pod is running""" state = self._task_status(self.read_pod(pod)) - return state != State.SUCCESS and state != State.FAILED + return state not in (State.SUCCESS, State.FAILED) def base_container_is_running(self, pod: V1Pod): + """Tests if base container is running""" event = self.read_pod(pod) status = next(iter(filter(lambda s: s.name == 'base', event.status.container_statuses)), None) @@ -148,6 +170,7 @@ def base_container_is_running(self, pod: V1Pod): reraise=True ) def read_pod_logs(self, pod: V1Pod): + """Reads log from the POD""" try: return self._client.read_namespaced_pod_log( name=pod.metadata.name, @@ -168,6 +191,7 @@ def read_pod_logs(self, pod: V1Pod): reraise=True ) def read_pod(self, pod: V1Pod): + """Read POD information""" try: return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace) except BaseHTTPError as e: @@ -203,8 +227,10 @@ def _exec_pod_command(self, resp, command): if resp.peek_stderr(): self.log.info(resp.read_stderr()) break + return None def process_status(self, job_id, status): + """Process status infomration for the JOB""" status = status.lower() if status == PodStatus.PENDING: return State.QUEUED diff --git a/airflow/kubernetes/secret.py b/airflow/kubernetes/secret.py index aca64d64f7e56..3d33739cbd672 100644 --- a/airflow/kubernetes/secret.py +++ b/airflow/kubernetes/secret.py @@ -64,6 +64,7 @@ def __init__(self, deploy_type, deploy_target, secret, key=None): self.key = key def to_env_secret(self) -> k8s.V1EnvVar: + """Stores es environment secret""" return k8s.V1EnvVar( name=self.deploy_target, value_from=k8s.V1EnvVarSource( @@ -75,11 +76,13 @@ def to_env_secret(self) -> k8s.V1EnvVar: ) def to_env_from_secret(self) -> k8s.V1EnvFromSource: + """Reads from environment to secret""" return k8s.V1EnvFromSource( secret_ref=k8s.V1SecretEnvSource(name=self.secret) ) def to_volume_secret(self) -> Tuple[k8s.V1Volume, k8s.V1VolumeMount]: + """Converts to volume secret""" vol_id = 'secretvol{}'.format(uuid.uuid4()) return ( k8s.V1Volume( @@ -96,6 +99,7 @@ def to_volume_secret(self) -> Tuple[k8s.V1Volume, k8s.V1VolumeMount]: ) def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: + """Attaches to pod""" cp_pod = copy.deepcopy(pod) if self.deploy_type == 'volume': volume, volume_mount = self.to_volume_secret() diff --git a/airflow/kubernetes/volume.py b/airflow/kubernetes/volume.py index f18cebdc5e2e9..679671cc87b33 100644 --- a/airflow/kubernetes/volume.py +++ b/airflow/kubernetes/volume.py @@ -25,20 +25,23 @@ class Volume(K8SModel): - def __init__(self, name, configs): - """ Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps - and Persistent Volumes - :param name: the name of the volume mount - :type name: str - :param configs: dictionary of any features needed for volume. + """ + Adds Kubernetes Volume to pod. allows pod to access features like ConfigMaps + and Persistent Volumes + + :param name: the name of the volume mount + :type name: str + :param configs: dictionary of any features needed for volume. We purposely keep this vague since there are multiple volume types with changing configs. - :type configs: dict - """ + :type configs: dict + """ + def __init__(self, name, configs): self.name = name self.configs = configs def to_k8s_client_obj(self) -> Dict[str, str]: + """Converts to k8s object""" return { 'name': self.name, **self.configs diff --git a/airflow/kubernetes/volume_mount.py b/airflow/kubernetes/volume_mount.py index 804428216dec4..3f7b2b5bef5ec 100644 --- a/airflow/kubernetes/volume_mount.py +++ b/airflow/kubernetes/volume_mount.py @@ -24,26 +24,32 @@ class VolumeMount(K8SModel): - """Defines Kubernetes Volume Mount""" + """ + Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to + running container. + :param name: the name of the volume mount + :type name: str + :param mount_path: + :type mount_path: str + :param sub_path: subpath within the volume mount + :type sub_path: str + :param read_only: whether to access pod with read-only mode + :type read_only: bool + """ def __init__(self, name, mount_path, sub_path, read_only): - """Initialize a Kubernetes Volume Mount. Used to mount pod level volumes to - running container. - :param name: the name of the volume mount - :type name: str - :param mount_path: - :type mount_path: str - :param sub_path: subpath within the volume mount - :type sub_path: str - :param read_only: whether to access pod with read-only mode - :type read_only: bool - """ self.name = name self.mount_path = mount_path self.sub_path = sub_path self.read_only = read_only def to_k8s_client_obj(self) -> k8s.V1VolumeMount: + """ + Converts to k8s object. + + :return Volume Mount k8s object + + """ return k8s.V1VolumeMount( name=self.name, mount_path=self.mount_path, @@ -52,6 +58,12 @@ def to_k8s_client_obj(self) -> k8s.V1VolumeMount: ) def attach_to_pod(self, pod: k8s.V1Pod) -> k8s.V1Pod: + """ + Attaches to pod + + :return Copy of the Pod object + + """ cp_pod = copy.deepcopy(pod) volume_mount = self.to_k8s_client_obj() cp_pod.spec.containers[0].volume_mounts = pod.spec.containers[0].volume_mounts or [] diff --git a/airflow/kubernetes/worker_configuration.py b/airflow/kubernetes/worker_configuration.py index 2fc0e484297ef..90e36f206ac98 100644 --- a/airflow/kubernetes/worker_configuration.py +++ b/airflow/kubernetes/worker_configuration.py @@ -14,16 +14,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Configuration of the worker""" import os +from typing import List, Dict +import kubernetes.client.models as k8s from airflow.configuration import conf -import kubernetes.client.models as k8s from airflow.kubernetes.pod_generator import PodGenerator from airflow.utils.log.logging_mixin import LoggingMixin from airflow.kubernetes.secret import Secret from airflow.kubernetes.k8s_model import append_to_pod -from typing import List, Dict class WorkerConfiguration(LoggingMixin): @@ -224,7 +224,7 @@ def _get_image_pull_secrets(self) -> List[k8s.V1LocalObjectReference]: if not self.kube_config.image_pull_secrets: return [] pull_secrets = self.kube_config.image_pull_secrets.split(',') - return list(map(lambda name: k8s.V1LocalObjectReference(name), pull_secrets)) + return list(map(k8s.V1LocalObjectReference, pull_secrets)) def _get_security_context(self) -> k8s.V1PodSecurityContext: """Defines the security context""" @@ -354,6 +354,7 @@ def _construct_volume(name, claim, host) -> k8s.V1Volume: return list(volumes.values()) def generate_dag_volume_mount_path(self) -> str: + """Generate path for DAG volume""" if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host: return self.worker_airflow_dags @@ -361,6 +362,7 @@ def generate_dag_volume_mount_path(self) -> str: def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_date, try_number, airflow_command) -> k8s.V1Pod: + """Creates POD.""" pod_generator = PodGenerator( namespace=namespace, name=pod_id, diff --git a/airflow/lineage/backend/__init__.py b/airflow/lineage/backend/__init__.py index 243b86973f324..53a9b6751d70f 100644 --- a/airflow/lineage/backend/__init__.py +++ b/airflow/lineage/backend/__init__.py @@ -16,9 +16,11 @@ # specific language governing permissions and limitations # under the License. # +"""Sends lineage metadata to a backend""" class LineageBackend: + """Sends lineage metadata to a backend""" def send_lineage(self, operator=None, inlets=None, outlets=None, context=None): """ diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py index b77e49d6574f1..c1e62a2cc2a5f 100644 --- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py +++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py @@ -44,11 +44,11 @@ def upgrade(): conn = op.get_bind() # alembic creates an invalid SQL for mssql and mysql dialects - if conn.dialect.name in ("mysql"): + if conn.dialect.name in {"mysql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id<>0", name="kube_resource_version_one_row_id") ) - elif conn.dialect.name not in ("mssql"): + elif conn.dialect.name not in {"mssql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id") ) diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py index 7e9e7fecaf34b..86ad47131e4a1 100644 --- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py +++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py @@ -45,11 +45,11 @@ def upgrade(): conn = op.get_bind() # alembic creates an invalid SQL for mssql and mysql dialects - if conn.dialect.name in ("mysql"): + if conn.dialect.name in {"mysql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id<>0", name="kube_worker_one_row_id") ) - elif conn.dialect.name not in ("mssql"): + elif conn.dialect.name not in {"mssql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id") ) diff --git a/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py index 3249a2e0589cb..560b763963dd6 100644 --- a/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py +++ b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py @@ -15,9 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -from alembic import op - """add idx_log_dag Revision ID: dd25f486b8ea @@ -25,6 +22,7 @@ Create Date: 2018-08-07 06:41:41.028249 """ +from alembic import op # revision identifiers, used by Alembic. revision = 'dd25f486b8ea' diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 43b13f596c54b..2dc56116233dc 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Airflow models""" from airflow.models.base import Base, ID_LEN # noqa: F401 from airflow.models.baseoperator import BaseOperator # noqa: F401 from airflow.models.connection import Connection # noqa: F401 diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 65fe8a2fad626..bdac7ab90dec1 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -16,18 +16,22 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +Base operator for all operators. +""" from abc import ABCMeta, abstractmethod -from cached_property import cached_property import copy import functools import logging import sys import warnings from datetime import timedelta, datetime -from dateutil.relativedelta import relativedelta from typing import Callable, Dict, Iterable, List, Optional, Set, Any, Union +from dateutil.relativedelta import relativedelta + +from cached_property import cached_property + import jinja2 from airflow import settings @@ -53,6 +57,7 @@ ScheduleInterval = Union[str, timedelta, relativedelta] +# pylint: disable=too-many-instance-attributes,too-many-public-methods @functools.total_ordering class BaseOperator(LoggingMixin): """ @@ -260,6 +265,8 @@ class derived from this one results in the creation of a task object, 'do_xcom_push', } + # noinspection PyUnusedLocal + # pylint: disable=too-many-arguments,too-many-locals @apply_defaults def __init__( self, @@ -278,7 +285,7 @@ def __init__( wait_for_downstream: bool = False, dag: Optional[DAG] = None, params: Optional[Dict] = None, - default_args: Optional[Dict] = None, + default_args: Optional[Dict] = None, # pylint: disable=unused-argument priority_weight: int = 1, weight_rule: str = WeightRule.DOWNSTREAM, queue: str = conf.get('celery', 'default_queue'), @@ -407,7 +414,7 @@ def __init__( self._outlets.update(outlets) def __eq__(self, other): - if (type(self) == type(other) and + if (type(self) == type(other) and # pylint: disable=unidiomatic-typecheck self.task_id == other.task_id): return all(self.__dict__.get(c, None) == other.__dict__.get(c, None) for c in self._comps) return False @@ -420,8 +427,8 @@ def __lt__(self, other): def __hash__(self): hash_components = [type(self)] - for c in self._comps: - val = getattr(self, c, None) + for component in self._comps: + val = getattr(self, component, None) try: hash(val) hash_components.append(val) @@ -505,7 +512,7 @@ def dag(self, dag): elif self.task_id not in dag.task_dict: dag.add_task(self) - self._dag = dag + self._dag = dag # pylint: disable=attribute-defined-outside-init def has_dag(self): """ @@ -515,6 +522,7 @@ def has_dag(self): @property def dag_id(self): + """Returns dag id if it has one or an adhoc + owner""" if self.has_dag(): return self.dag.dag_id else: @@ -535,6 +543,15 @@ def deps(self): @property def priority_weight_total(self): + """ + Total priority weight for the task. It might include all upstream or downstream tasks. + depending on the weight rule. + + - WeightRule.ABSOLUTE - only own weight + - WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks + - WeightRule.UPSTREAM - adds priority weight of all upstream tasks + + """ if self.weight_rule == WeightRule.ABSOLUTE: return self.priority_weight elif self.weight_rule == WeightRule.DOWNSTREAM: @@ -551,10 +568,12 @@ def priority_weight_total(self): @cached_property def operator_extra_link_dict(self): + """Returns dictionary of all extra links for the operator""" return {link.name: link for link in self.operator_extra_links} @cached_property def global_operator_extra_link_dict(self): + """Returns dictionary of all global extra links""" from airflow.plugins_manager import global_operator_extra_links return {link.name: link for link in global_operator_extra_links} @@ -599,7 +618,9 @@ def __deepcopy__(self, memo): result = cls.__new__(cls) memo[id(self)] = result - shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs + # noinspection PyProtectedMember + shallow_copy = cls.shallow_copy_attrs + \ + cls._base_operator_shallow_copy_attrs # pylint: disable=protected-access for k, v in self.__dict__.items(): if k not in shallow_copy: @@ -615,7 +636,7 @@ def __getstate__(self): return state def __setstate__(self, state): - self.__dict__ = state + self.__dict__ = state # pylint: disable=attribute-defined-outside-init self._log = logging.getLogger("airflow.task.operators") def render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Environment] = None) -> None: @@ -637,7 +658,7 @@ def render_template_fields(self, context: Dict, jinja_env: Optional[jinja2.Envir rendered_content = self.render_template(content, context, jinja_env) setattr(self, attr_name, rendered_content) - def render_template( + def render_template( # pylint: disable=too-many-return-statements self, content: Any, context: Dict, jinja_env: Optional[jinja2.Environment] = None ) -> Any: """ @@ -665,7 +686,7 @@ def render_template( return jinja_env.from_string(content).render(**context) if isinstance(content, tuple): - if type(content) is not tuple: + if type(content) is not tuple: # pylint: disable=unidiomatic-typecheck # Special case for named tuples return content.__class__( *(self.render_template(element, context, jinja_env) for element in content) @@ -698,8 +719,8 @@ def prepare_template(self): """ def resolve_template_files(self): - # Getting the content of files for template_field / template_ext - if self.template_ext: + """Getting the content of files for template_field / template_ext""" + if self.template_ext: # pylint: disable=too-many-nested-blocks for attr in self.template_fields: content = getattr(self, attr, None) if content is None: @@ -709,16 +730,16 @@ def resolve_template_files(self): env = self.get_template_env() try: setattr(self, attr, env.loader.get_source(env, content)[0]) - except Exception as e: + except Exception as e: # pylint: disable=broad-except self.log.exception(e) elif isinstance(content, list): env = self.dag.get_template_env() - for i in range(len(content)): + for i in range(len(content)): # pylint: disable=consider-using-enumerate if isinstance(content[i], str) and \ any([content[i].endswith(ext) for ext in self.template_ext]): try: content[i] = env.loader.get_source(env, content[i])[0] - except Exception as e: + except Exception as e: # pylint: disable=broad-except self.log.exception(e) self.prepare_template() @@ -729,6 +750,7 @@ def upstream_list(self): @property def upstream_task_ids(self): + """@property: list of ids of tasks directly upstream""" return self._upstream_task_ids @property @@ -738,6 +760,7 @@ def downstream_list(self): @property def downstream_task_ids(self): + """@property: list of ids of tasks directly downstream""" return self._downstream_task_ids @provide_session @@ -832,14 +855,15 @@ def run( start_date = start_date or self.start_date end_date = end_date or self.end_date or timezone.utcnow() - for dt in self.dag.date_range(start_date, end_date=end_date): - TaskInstance(self, dt).run( + for execution_date in self.dag.date_range(start_date, end_date=end_date): + TaskInstance(self, execution_date).run( mark_success=mark_success, ignore_depends_on_past=( - dt == start_date and ignore_first_depends_on_past), + execution_date == start_date and ignore_first_depends_on_past), ignore_ti_state=ignore_ti_state) def dry_run(self): + """Performs dry run for the operator - just render template fields.""" self.log.info('Dry run') for attr in self.template_fields: content = getattr(self, attr) @@ -873,31 +897,35 @@ def __repr__(self): @property def task_type(self): + """@property: type of the task""" return self.__class__.__name__ def add_only_new(self, item_set, item): + """Adds only new items to item set""" if item in item_set: self.log.warning( - 'Dependency {self}, {item} already registered' - ''.format(self=self, item=item)) + 'Dependency %s, %s already registered', self, item) else: item_set.add(item) def _set_relatives(self, task_or_task_list, upstream=False): + """Sets relatives for the task.""" try: task_list = list(task_or_task_list) except TypeError: task_list = [task_or_task_list] - for t in task_list: - if not isinstance(t, BaseOperator): + for task in task_list: + if not isinstance(task, BaseOperator): raise AirflowException( "Relationships can only be set between " - "Operators; received {}".format(t.__class__.__name__)) + "Operators; received {}".format(task.__class__.__name__)) # relationships can only be set if the tasks share a single DAG. Tasks # without a DAG are assigned to that DAG. - dags = {t._dag.dag_id: t._dag for t in [self] + task_list if t.has_dag()} + dags = { + task._dag.dag_id: task._dag # pylint: disable=protected-access + for task in [self] + task_list if task.has_dag()} if len(dags) > 1: raise AirflowException( @@ -970,6 +998,7 @@ def xcom_pull( @cached_property def extra_links(self) -> Iterable[str]: + """@property: extra links for the task. """ return list(set(self.operator_extra_link_dict.keys()) .union(self.global_operator_extra_link_dict.keys())) @@ -988,6 +1017,8 @@ def get_extra_links(self, dttm, link_name): return self.operator_extra_link_dict[link_name].get_link(self, dttm) elif link_name in self.global_operator_extra_link_dict: return self.global_operator_extra_link_dict[link_name].get_link(self, dttm) + else: + return None class BaseOperatorLink(metaclass=ABCMeta): diff --git a/airflow/models/taskfail.py b/airflow/models/taskfail.py index ad0861a0485a4..3fb08b3cee000 100644 --- a/airflow/models/taskfail.py +++ b/airflow/models/taskfail.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Taskfail tracks the failed run durations of each task instance""" from sqlalchemy import Column, Index, Integer, String from airflow.models.base import Base, ID_LEN diff --git a/airflow/models/taskreschedule.py b/airflow/models/taskreschedule.py index 4bdb6431f1ae4..ebca6ee4e4ee6 100644 --- a/airflow/models/taskreschedule.py +++ b/airflow/models/taskreschedule.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""TaskReschedule tracks rescheduled task instances.""" from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc from airflow.models.base import Base, ID_LEN diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 005671c260dd0..d4adf3c6bea17 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -16,19 +16,23 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +Implements Docker operator +""" import json from typing import Union, List, Dict, Iterable +import ast +from docker import APIClient, tls + from airflow.hooks.docker_hook import DockerHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -from docker import APIClient, tls -import ast +# pylint: disable=too-many-instance-attributes class DockerOperator(BaseOperator): """ Execute a command inside a docker container. @@ -119,6 +123,7 @@ class DockerOperator(BaseOperator): template_fields = ('command', 'environment', 'container_name') template_ext = ('.sh', '.bash',) + # pylint: disable=too-many-arguments,too-many-locals @apply_defaults def __init__( self, @@ -184,7 +189,12 @@ def __init__( self.cli = None self.container = None - def get_hook(self): + def get_hook(self) -> DockerHook: + """ + Retrieves hook for the operator. + + :return: The Docker Hook + """ return DockerHook( docker_conn_id=self.docker_conn_id, base_url=self.docker_url, @@ -238,6 +248,8 @@ def _run_image(self): if self.do_xcom_push: return self.cli.logs(container=self.container['Id']) \ if self.xcom_all else line.encode('utf-8') + else: + return None def execute(self, context): @@ -253,10 +265,10 @@ def execute(self, context): ) # Pull the docker image if `force_pull` is set or image does not exist locally - if self.force_pull or len(self.cli.images(name=self.image)) == 0: + if self.force_pull or not self.cli.images(name=self.image): self.log.info('Pulling docker image %s', self.image) - for l in self.cli.pull(self.image, stream=True): - output = json.loads(l.decode('utf-8').strip()) + for line in self.cli.pull(self.image, stream=True): + output = json.loads(line.decode('utf-8').strip()) if 'status' in output: self.log.info("%s", output['status']) @@ -265,6 +277,12 @@ def execute(self, context): self._run_image() def get_command(self): + """ + Retrieve command(s). if command string starts with [, it returns the command list) + + :return: the command (or commands) + :rtype: str | List[str] + """ if isinstance(self.command, str) and self.command.strip().find('[') == 0: commands = ast.literal_eval(self.command) else: @@ -279,11 +297,14 @@ def on_kill(self): def __get_tls_config(self): tls_config = None if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: + # Ignore type error on SSL version here - it is deprecated and type annotation is wrong + # it should be string + # noinspection PyTypeChecker tls_config = tls.TLSConfig( ca_cert=self.tls_ca_cert, client_cert=(self.tls_client_cert, self.tls_client_key), verify=True, - ssl_version=self.tls_ssl_version, + ssl_version=self.tls_ssl_version, # type: ignore assert_hostname=self.tls_hostname ) self.docker_url = self.docker_url.replace('tcp://', 'https://') diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 1c891bef3867b..18f52c3834c43 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -1,4 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,6 +31,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Kerberos security provider""" from typing import Optional import socket @@ -29,10 +47,16 @@ log = LoggingMixin().log -def renew_from_kt(principal, keytab): +def renew_from_kt(principal: str, keytab: str): + """ + Renew kerberos token from keytab + + :param principal: principal + :param keytab: keytab file + :return: None + """ # The config is specified in seconds. But we ask for that same amount in # minutes to give ourselves a large renewal buffer. - renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency') cmd_principal = principal or conf.get('kerberos', 'principal').replace( @@ -47,7 +71,7 @@ def renew_from_kt(principal, keytab): "-c", conf.get('kerberos', 'ccache'), # specify credentials cache cmd_principal ] - log.info("Reinitting kerberos from keytab: %s", " ".join(cmdv)) + log.info("Re-initialising kerberos from keytab: %s", " ".join(cmdv)) subp = subprocess.Popen(cmdv, stdout=subprocess.PIPE, @@ -63,7 +87,7 @@ def renew_from_kt(principal, keytab): ) sys.exit(subp.returncode) - global NEED_KRB181_WORKAROUND + global NEED_KRB181_WORKAROUND # pylint: disable=global-statement if NEED_KRB181_WORKAROUND is None: NEED_KRB181_WORKAROUND = detect_conf_var() if NEED_KRB181_WORKAROUND: @@ -73,7 +97,13 @@ def renew_from_kt(principal, keytab): perform_krb181_workaround(principal) -def perform_krb181_workaround(principal): +def perform_krb181_workaround(principal: str): + """ + Workaround for Kerberos 1.8.1. + + :param principal: principal name + :return: None + """ cmdv = [conf.get('kerberos', 'kinit_path'), "-c", conf.get('kerberos', 'ccache'), "-R"] # Renew ticket_cache @@ -112,7 +142,14 @@ def detect_conf_var() -> bool: return b'X-CACHECONF:' in file.read() -def run(principal, keytab): +def run(principal: str, keytab: str): + """ + Run the kerbros renewer. + + :param principal: principal name + :param keytab: keytab file + :return: None + """ if not keytab: log.debug("Keytab renewer not starting, no keytab configured") sys.exit(0) diff --git a/airflow/security/utils.py b/airflow/security/utils.py index 33550107b5bd2..85e2cedba712e 100644 --- a/airflow/security/utils.py +++ b/airflow/security/utils.py @@ -1,4 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,7 +32,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +"""Various security-related utils.""" import re import socket @@ -34,6 +51,7 @@ def get_components(principal): def replace_hostname_pattern(components, host=None): + """Replaces hostname with the right pattern including lowercase of the name.""" fqdn = host if not fqdn or fqdn == '0.0.0.0': fqdn = get_hostname() @@ -41,7 +59,7 @@ def replace_hostname_pattern(components, host=None): def get_fqdn(hostname_or_ip=None): - # Get hostname + """Retrieves FQDN - hostname for the IP or hostname.""" try: if hostname_or_ip: fqdn = socket.gethostbyaddr(hostname_or_ip)[0] @@ -56,6 +74,7 @@ def get_fqdn(hostname_or_ip=None): def principal_from_username(username, realm): + """Retrieves principal from the user name and realm.""" if ('@' not in username) and realm: username = "{}@{}".format(username, realm) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 1e65539fb1ec5..b389dee4e18f2 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -16,13 +16,16 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""File logging handler for tasks.""" import logging import os +from typing import Optional + import requests from airflow.configuration import conf from airflow.configuration import AirflowConfigException +from airflow.models import TaskInstance from airflow.utils.file import mkdirs from airflow.utils.helpers import parse_template_string @@ -33,39 +36,38 @@ class FileTaskHandler(logging.Handler): task instance logs. It creates and delegates log handling to `logging.FileHandler` after receiving task instance context. It reads logs from task instance's host machine. + :param base_log_folder: Base log folder to place logs. + :param filename_template: template filename string """ - - def __init__(self, base_log_folder, filename_template): - """ - :param base_log_folder: Base log folder to place logs. - :param filename_template: template filename string - """ + def __init__(self, base_log_folder: str, filename_template: str): super().__init__() - self.handler = None + self.handler = None # type: Optional[logging.FileHandler] self.local_base = base_log_folder self.filename_template, self.filename_jinja_template = \ parse_template_string(filename_template) - def set_context(self, ti): + def set_context(self, ti: TaskInstance): """ Provide task_instance context to airflow task handler. + :param ti: task instance object """ local_loc = self._init_file(ti) self.handler = logging.FileHandler(local_loc) - self.handler.setFormatter(self.formatter) + if self.formatter: + self.handler.setFormatter(self.formatter) self.handler.setLevel(self.level) def emit(self, record): - if self.handler is not None: + if self.handler: self.handler.emit(record) def flush(self): - if self.handler is not None: + if self.handler: self.handler.flush() def close(self): - if self.handler is not None: + if self.handler: self.handler.close() def _render_filename(self, ti, try_number): @@ -79,7 +81,7 @@ def _render_filename(self, ti, try_number): execution_date=ti.execution_date.isoformat(), try_number=try_number) - def _read(self, ti, try_number, metadata=None): + def _read(self, ti, try_number, metadata=None): # pylint: disable=unused-argument """ Template method that contains custom logic of reading logs given the try_number. @@ -102,7 +104,7 @@ def _read(self, ti, try_number, metadata=None): with open(location) as file: log += "*** Reading local file: {}\n".format(location) log += "".join(file.readlines()) - except Exception as e: + except Exception as e: # pylint: disable=broad-except log = "*** Failed to load local log file: {}\n".format(location) log += "*** {}\n".format(str(e)) else: @@ -127,7 +129,7 @@ def _read(self, ti, try_number, metadata=None): response.raise_for_status() log += '\n' + response.text - except Exception as e: + except Exception as e: # pylint: disable=broad-except log += "*** Failed to fetch log file from worker. {}\n".format(str(e)) return log, {'end_of_log': True} @@ -159,13 +161,13 @@ def read(self, task_instance, try_number=None, metadata=None): try_numbers = [try_number] logs = [''] * len(try_numbers) - metadatas = [{}] * len(try_numbers) - for i, try_number in enumerate(try_numbers): - log, metadata = self._read(task_instance, try_number, metadata) + metadata_array = [{}] * len(try_numbers) + for i, try_number_element in enumerate(try_numbers): + log, metadata = self._read(task_instance, try_number_element, metadata) logs[i] += log - metadatas[i] = metadata + metadata_array[i] = metadata - return logs, metadatas + return logs, metadata_array def _init_file(self, ti): """ diff --git a/airflow/utils/strings.py b/airflow/utils/strings.py index babc375df4913..63c44b821259b 100644 --- a/airflow/utils/strings.py +++ b/airflow/utils/strings.py @@ -2,7 +2,6 @@ Common utility functions with strings ''' # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/dev/airflow-jira b/dev/airflow-jira index 79eb82f142eba..36d09d3017394 100755 --- a/dev/airflow-jira +++ b/dev/airflow-jira @@ -1,21 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. # Utility for creating well-formed pull request merges and pushing them to # Apache. diff --git a/dev/airflow-license b/dev/airflow-license index 6b0b7eb8782da..91a283c78f3eb 100755 --- a/dev/airflow-license +++ b/dev/airflow-license @@ -1,20 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import os import re diff --git a/dev/airflow-pr b/dev/airflow-pr index e520589fd2b50..8ab95ef3f82de 100755 --- a/dev/airflow-pr +++ b/dev/airflow-pr @@ -1,21 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. # Utility for creating well-formed pull request merges and pushing them to # Apache. diff --git a/docs/conf.py b/docs/conf.py index cf7b8a11eca42..f369614a2a5d2 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,6 +19,7 @@ # specific language governing permissions and limitations # under the License. + # Airflow documentation build configuration file, created by # sphinx-quickstart on Thu Oct 9 20:50:01 2014. # diff --git a/docs/exts/__init__.py b/docs/exts/__init__.py index 645848f26c3b3..c718e012dfa4d 100644 --- a/docs/exts/__init__.py +++ b/docs/exts/__init__.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # diff --git a/docs/exts/docroles.py b/docs/exts/docroles.py index 00f88f1eba2b2..14150f9a80c3c 100644 --- a/docs/exts/docroles.py +++ b/docs/exts/docroles.py @@ -1,27 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. # +"""Document roles""" +from functools import partial from docutils import nodes, utils from sphinx.ext.autodoc.importer import import_module, mock -from functools import partial class RoleException(Exception): - pass + """Exception for roles extension """ def get_template_field(env, fullname): @@ -55,7 +59,15 @@ def get_template_field(env, fullname): return list(template_fields) -def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, content=[]): +# noinspection PyUnusedLocal +def template_field_role(app, + typ, # pylint: disable=unused-argument + rawtext, + text, + lineno, + inliner, + options=None, # pylint: disable=unused-argument + content=None): # pylint: disable=unused-argument """ A role that allows you to include a list of template fields in the middle of the text. This is especially useful when writing guides describing how to use the operator. @@ -63,13 +75,18 @@ def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, co Sample usage:: - :template-fields:`airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeSentimentOperator` + :template-fields: + `airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeSentimentOperator` For further information look at: * [http://docutils.sourceforge.net/docs/howto/rst-roles.html](Creating reStructuredText Interpreted Text Roles) """ + if options is None: + options = {} + if content is None: + content = [] text = utils.unescape(text) try: @@ -89,7 +106,8 @@ def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, co def setup(app): - from docutils.parsers.rst import roles + """Sets the extension up""" + from docutils.parsers.rst import roles # pylint: disable=wrong-import-order roles.register_local_role("template-fields", partial(template_field_role, app)) return {"version": "builtin", "parallel_read_safe": True, "parallel_write_safe": True} diff --git a/docs/exts/exampleinclude.py b/docs/exts/exampleinclude.py index 03becd3277607..3f802411ac737 100644 --- a/docs/exts/exampleinclude.py +++ b/docs/exts/exampleinclude.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,6 +19,8 @@ # specific language governing permissions and limitations # under the License. + +"""Nice formatted include for examples""" from os import path from docutils import nodes @@ -36,8 +37,10 @@ logger = logging.getLogger(__name__) -class example_header(nodes.reference, nodes.FixedTextElement): - pass +class ExampleHeader(nodes.reference, nodes.FixedTextElement): # pylint: disable=too-many-ancestors + """ + Header for examples. + """ class ExampleInclude(SphinxDirective): @@ -79,10 +82,10 @@ def run(self): document = self.state.document if not document.settings.file_insertion_enabled: return [document.reporter.warning("File insertion disabled", line=self.lineno)] - # convert options['diff'] to absolute path + # convert options['diff'] to absolute a_path if "diff" in self.options: - _, path = self.env.relfn2path(self.options["diff"]) - self.options["diff"] = path + _, a_path = self.env.relfn2path(self.options["diff"]) + self.options["diff"] = a_path try: location = self.state_machine.get_source_and_line(self.lineno) @@ -113,27 +116,39 @@ def run(self): extra_args["linenostart"] = reader.lineno_start container_node = nodes.container("", literal_block=True, classes=["example-block-wrapper"]) - container_node += example_header(filename=filename) + container_node += ExampleHeader(filename=filename) container_node += retnode retnode = container_node return [retnode] - except Exception as exc: + except Exception as exc: # pylint: disable=broad-except return [document.reporter.warning(str(exc), line=self.lineno)] +# pylint: disable=protected-access +# noinspection PyProtectedMember def register_source(app, env, modname): + """ + Registers source code. + + :param app: application + :param env: environment of the plugin + :param modname: name of the module to load + :return: True if the code is registered successfully, False otherwise + """ entry = env._viewcode_modules.get(modname, None) # type: ignore if entry is False: print("[%s] Entry is false for " % modname) - return + return False code_tags = app.emit_firstresult("viewcode-find-source", modname) if code_tags is None: + # noinspection PyBroadException try: analyzer = ModuleAnalyzer.for_module(modname) - except Exception as ex: - logger.info("Module \"%s\" could not be loaded. Full source will not be available.", modname) + except Exception as ex: # pylint: disable=broad-except + logger.info("Module \"%s\" could not be loaded. Full source will not be available. \"%s\"", + modname, ex) env._viewcode_modules[modname] = False # type: ignore return False @@ -149,15 +164,22 @@ def register_source(app, env, modname): code, tags = code_tags if entry is None or entry[0] != code: - # print("Registeted", entry) - entry = code, tags, {}, "" env._viewcode_modules[modname] = entry # type: ignore return True +# pylint: enable=protected-access + +def create_node(env, relative_path, show_button): + """ + Creates documentation node for example include. -def create_node(app, env, relative_path, show_button): + :param env: environment of the documentation + :param relative_path: path of the code + :param show_button: whether to show "view code" button + :return paragraph with the node + """ pagename = "_modules/" + relative_path[:-3] header_classes = ["example-header"] @@ -182,7 +204,18 @@ def create_node(app, env, relative_path, show_button): return paragraph +# noinspection PyProtectedMember +# pylint: disable=protected-access def doctree_read(app, doctree): + """ + Reads documentation tree for the application and register sources in the generated documentation. + + :param app: application + :param doctree: documentation tree + + :return None + + """ env = app.builder.env if not hasattr(env, "_viewcode_modules"): env._viewcode_modules = {} # type: ignore @@ -190,19 +223,26 @@ def doctree_read(app, doctree): if app.builder.name == "singlehtml": return - for objnode in doctree.traverse(example_header): + for objnode in doctree.traverse(ExampleHeader): filepath = objnode.get("filename") relative_path = path.relpath( filepath, path.commonprefix([app.config.exampleinclude_sourceroot, filepath]) ) modname = relative_path.replace("/", ".")[:-3] show_button = register_source(app, env, modname) - onlynode = create_node(app, env, relative_path, show_button) + onlynode = create_node(env, relative_path, show_button) objnode.replace_self(onlynode) +# pylint: enable=protected-access def setup(app): + """ + Sets the plugin up and returns configuration of the plugin. + + :param app: application. + :return json description of the configuration that is needed by the plugin. + """ directives.register_directive("exampleinclude", ExampleInclude) app.connect("doctree-read", doctree_read) app.add_config_value("exampleinclude_sourceroot", None, "env") diff --git a/docs/exts/removemarktransform.py b/docs/exts/removemarktransform.py index 01ec440d0e112..4b045af235c25 100644 --- a/docs/exts/removemarktransform.py +++ b/docs/exts/removemarktransform.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,10 +19,12 @@ # specific language governing permissions and limitations # under the License. +"""Remove Transform Mark for Sphinx""" import re from docutils import nodes -from pygments.lexers import guess_lexer, Python3Lexer, PythonLexer +# noinspection PyUnresolvedReferences +from pygments.lexers import guess_lexer, Python3Lexer, PythonLexer # pylint: disable=no-name-in-module from sphinx.transforms import SphinxTransform from sphinx.transforms.post_transforms.code import TrimDoctestFlagsTransform @@ -50,8 +51,8 @@ def apply(self, **kwargs): node[:] = [nodes.Text(source)] @staticmethod - def is_pycode(node): - # type: (nodes.literal_block) -> bool + def is_pycode(node: nodes.literal_block) -> bool: + """Checks if the node is literal block of python""" if node.rawsource != node.astext(): return False # skip parsed-literal node @@ -59,16 +60,18 @@ def is_pycode(node): if language in ("py", "py3", "python", "python3", "default"): return True elif language == "guess": + # noinspection PyBroadException try: lexer = guess_lexer(node.rawsource) - return isinstance(lexer, PythonLexer) or isinstance(lexer, Python3Lexer) - except Exception: + return isinstance(lexer, (PythonLexer, Python3Lexer)) + except Exception: # pylint: disable=broad-except pass return False def setup(app): + """Sets the transform up""" app.add_post_transform(TrimDocMarkerFlagsTransform) return {"version": "builtin", "parallel_read_safe": False, "parallel_write_safe": False} diff --git a/pylintrc b/pylintrc index b7227154f8fe3..2d4c5c67b7369 100644 --- a/pylintrc +++ b/pylintrc @@ -162,6 +162,7 @@ disable=print-statement, no-else-return, # deemed unnecessary no-else-raise, # deemed unnecessary too-many-format-args, # Pylint fails on multiline string format + too-many-lines, # Pylint fails on too many lines and we have several cases of those cell-var-from-loop, # Raises spurious errors super-init-not-called, # BasPH: ignored for now but should be fixed somewhere in the future arguments-differ, # Doesn't always raise valid messages diff --git a/scripts/ci/_utils.sh b/scripts/ci/_utils.sh index fa435099447f9..77353994db93f 100644 --- a/scripts/ci/_utils.sh +++ b/scripts/ci/_utils.sh @@ -751,6 +751,10 @@ function filter_out_files_from_pylint_todo_list() { set +e for FILE in "$@" do + if [[ ${FILE} == "airflow/migrations/versions/"* ]]; then + # Skip all generated migration scripts + continue + fi if ! grep -x "./${FILE}" <"${AIRFLOW_SOURCES}/scripts/ci/pylint_todo.txt" >/dev/null; then FILTERED_FILES+=("${FILE}") fi @@ -758,3 +762,14 @@ function filter_out_files_from_pylint_todo_list() { set -e export FILTERED_FILES } + +function refresh_pylint_todo() { + docker run "${AIRFLOW_CONTAINER_EXTRA_DOCKER_FLAGS[@]}" \ + --entrypoint /opt/airflow/scripts/ci/in_container/refresh_pylint_todo.sh \ + --env PYTHONDONTWRITEBYTECODE \ + --env AIRFLOW_CI_VERBOSE="${VERBOSE}" \ + --env AIRFLOW_CI_SILENT \ + --env HOST_USER_ID="$(id -ur)" \ + --env HOST_GROUP_ID="$(id -gr)" \ + "${AIRFLOW_SLIM_CI_IMAGE}" | tee -a "${OUTPUT_LOG}" +} diff --git a/scripts/ci/ci_refresh_pylint_todo.sh b/scripts/ci/ci_refresh_pylint_todo.sh new file mode 100755 index 0000000000000..e7e62ae6c6d6e --- /dev/null +++ b/scripts/ci/ci_refresh_pylint_todo.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +export AIRFLOW_CI_SILENT=${AIRFLOW_CI_SILENT:="false"} +export ASSUME_QUIT_TO_ALL_QUESTIONS=${ASSUME_QUIT_TO_ALL_QUESTIONS:="true"} + +# shellcheck source=scripts/ci/_utils.sh +. "${MY_DIR}/_utils.sh" + +basic_sanity_checks + +script_start + +rebuild_ci_slim_image_if_needed + +refresh_pylint_todo + +script_end diff --git a/scripts/ci/in_container/_in_container_utils.sh b/scripts/ci/in_container/_in_container_utils.sh index 8a6ee7223f441..7eac280d2957b 100644 --- a/scripts/ci/in_container/_in_container_utils.sh +++ b/scripts/ci/in_container/_in_container_utils.sh @@ -121,4 +121,58 @@ function in_container_basic_sanity_check() { in_container_cleanup_pycache } +function in_container_refresh_pylint_todo() { + print_in_container_info + print_in_container_info "Refreshing list of all non-pylint compliant files. This can take some time." + print_in_container_info + + print_in_container_info + print_in_container_info "Finding list all non-pylint compliant files everywhere except 'tests' folder" + print_in_container_info + + # Using path -prune is much better in the local environment on OSX because we have host + # Files mounted and node_modules is a huge directory which takes many seconds to even scan + # -prune works better than -not path because it skips traversing the whole directory. -not path traverses + # the directory and only excludes it after all of it is scanned + find . \ + -path "./airflow/www/node_modules" -prune -o \ + -path "./airflow/www_rbac/node_modules" -prune -o \ + -path "./airflow/_vendor" -prune -o \ + -path "./airflow/migrations/versions" -prune -o \ + -path "./.eggs" -prune -o \ + -path "./docs/_build" -prune -o \ + -path "./build" -prune -o \ + -path "./tests" -prune -o \ + -name "*.py" \ + -not -name 'webserver_config.py' | \ + grep ".*.py$" | \ + xargs pylint | tee "${MY_DIR}/../pylint_todo_main.txt" + + grep -v "\*\*" < "${MY_DIR}/../pylint_todo_main.txt" | \ + grep -v "^$" | grep -v "\-\-\-" | grep -v "^Your code has been" | \ + awk 'FS=":" {print "./"$1}' | sort | uniq > "${MY_DIR}/../pylint_todo_new.txt" + + print_in_container_info + print_in_container_info "So far found $(wc -l <"${MY_DIR}/../pylint_todo_new.txt") files" + print_in_container_info + + print_in_container_info + print_in_container_info "Finding list of all non-pylint compliant files in 'tests' folder" + print_in_container_info + + find "./tests" -name "*.py" -print0 | \ + xargs -0 pylint --disable="${DISABLE_CHECKS_FOR_TESTS}" | tee "${MY_DIR}/../pylint_todo_tests.txt" + + grep -v "\*\*" < "${MY_DIR}/../pylint_todo_tests.txt" | \ + grep -v "^$" | grep -v "\-\-\-" | grep -v "^Your code has been" | \ + awk 'FS=":" {print "./"$1}' | sort | uniq >> "${MY_DIR}/../pylint_todo_new.txt" + + rm -fv "${MY_DIR}/../pylint_todo_main.txt" "${MY_DIR}/../pylint_todo_tests.txt" + mv -v "${MY_DIR}/../pylint_todo_new.txt" "${MY_DIR}/../pylint_todo.txt" + + print_in_container_info + print_in_container_info "Found $(wc -l <"${MY_DIR}/../pylint_todo.txt") files" + print_in_container_info +} + export DISABLE_CHECKS_FOR_TESTS="missing-docstring,no-self-use,too-many-public-methods,protected-access" diff --git a/scripts/ci/in_container/refresh_pylint_todo.sh b/scripts/ci/in_container/refresh_pylint_todo.sh new file mode 100755 index 0000000000000..f1e382455201a --- /dev/null +++ b/scripts/ci/in_container/refresh_pylint_todo.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Script to run Pylint on main code. Can be started from any working directory +set -uo pipefail + +MY_DIR=$(cd "$(dirname "$0")" || exit 1; pwd) + +# shellcheck source=scripts/ci/in_container/_in_container_utils.sh +. "${MY_DIR}/_in_container_utils.sh" + +in_container_basic_sanity_check + +in_container_script_start + +in_container_refresh_pylint_todo + +in_container_script_end diff --git a/scripts/ci/in_container/run_pylint_tests.sh b/scripts/ci/in_container/run_pylint_tests.sh index a7db72c4f36da..ab3e63b8097a7 100755 --- a/scripts/ci/in_container/run_pylint_tests.sh +++ b/scripts/ci/in_container/run_pylint_tests.sh @@ -28,21 +28,19 @@ in_container_basic_sanity_check in_container_script_start -DISABLE_CHECKS="missing-docstring,no-self-use,too-many-public-methods,protected-access" - if [[ ${#@} == "0" ]]; then echo echo "Running pylint for 'tests' folder" echo find "./tests" -name "*.py" | \ grep -vFf scripts/ci/pylint_todo.txt | \ - xargs pylint --disable="${DISABLE_CHECKS}" --output-format=colorized + xargs pylint --disable="${DISABLE_CHECKS_FOR_TESTS}" --output-format=colorized RES=$? else print_in_container_info print_in_container_info "Running Pylint for tests with parameters: $*" print_in_container_info - pylint --disable="${DISABLE_CHECKS}" --output-format=colorized "$@" + pylint --disable="${DISABLE_CHECKS_FOR_TESTS}" --output-format=colorized "$@" RES=$? fi diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index f9f7ea783a49f..edba8d6badaa4 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -1,33 +1,28 @@ +./airflow/__init__.py ./airflow/bin/cli.py -./airflow/configuration.py ./airflow/config_templates/airflow_local_settings.py -./airflow/config_templates/default_celery.py +./airflow/configuration.py ./airflow/contrib/auth/backends/github_enterprise_auth.py ./airflow/contrib/auth/backends/google_auth.py ./airflow/contrib/auth/backends/kerberos_auth.py ./airflow/contrib/auth/backends/ldap_auth.py -./airflow/contrib/auth/backends/password_auth.py ./airflow/contrib/hooks/azure_container_instance_hook.py ./airflow/contrib/hooks/azure_container_volume_hook.py ./airflow/contrib/hooks/azure_cosmos_hook.py ./airflow/contrib/hooks/azure_data_lake_hook.py ./airflow/contrib/hooks/azure_fileshare_hook.py ./airflow/contrib/hooks/cassandra_hook.py -./airflow/contrib/hooks/databricks_hook.py ./airflow/contrib/hooks/datadog_hook.py -./airflow/contrib/hooks/datastore_hook.py ./airflow/contrib/hooks/dingding_hook.py ./airflow/contrib/hooks/discord_webhook_hook.py ./airflow/contrib/hooks/emr_hook.py ./airflow/contrib/hooks/fs_hook.py ./airflow/contrib/hooks/ftp_hook.py -./airflow/contrib/hooks/grpc_hook.py ./airflow/contrib/hooks/jenkins_hook.py ./airflow/contrib/hooks/openfaas_hook.py ./airflow/contrib/hooks/opsgenie_alert_hook.py ./airflow/contrib/hooks/pinot_hook.py ./airflow/contrib/hooks/qubole_check_hook.py -./airflow/contrib/hooks/qubole_hook.py ./airflow/contrib/hooks/redshift_hook.py ./airflow/contrib/hooks/sagemaker_hook.py ./airflow/contrib/hooks/segment_hook.py @@ -42,14 +37,11 @@ ./airflow/contrib/hooks/vertica_hook.py ./airflow/contrib/hooks/wasb_hook.py ./airflow/contrib/operators/adls_list_operator.py -./airflow/contrib/operators/awsbatch_operator.py ./airflow/contrib/operators/aws_athena_operator.py -./airflow/contrib/operators/aws_sqs_publish_operator.py +./airflow/contrib/operators/awsbatch_operator.py ./airflow/contrib/operators/azure_container_instances_operator.py ./airflow/contrib/operators/azure_cosmos_operator.py ./airflow/operators/cassandra_to_gcs.py -./airflow/contrib/operators/datastore_export_operator.py -./airflow/contrib/operators/datastore_import_operator.py ./airflow/contrib/operators/dingding_operator.py ./airflow/contrib/operators/discord_webhook_operator.py ./airflow/contrib/operators/druid_operator.py @@ -63,19 +55,16 @@ ./airflow/contrib/operators/hive_to_dynamodb.py ./airflow/contrib/operators/jenkins_job_trigger_operator.py ./airflow/contrib/operators/jira_operator.py -./airflow/contrib/operators/kubernetes_pod_operator.py ./airflow/contrib/operators/mongo_to_s3.py ./airflow/contrib/operators/opsgenie_alert_operator.py ./airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py ./airflow/contrib/operators/oracle_to_oracle_transfer.py ./airflow/contrib/operators/qubole_check_operator.py -./airflow/contrib/operators/qubole_operator.py ./airflow/contrib/operators/redis_publish_operator.py ./airflow/contrib/operators/s3_copy_object_operator.py ./airflow/contrib/operators/s3_delete_objects_operator.py ./airflow/contrib/operators/s3_list_operator.py ./airflow/contrib/operators/s3_to_gcs_operator.py -./airflow/contrib/operators/s3_to_gcs_transfer_operator.py ./airflow/contrib/operators/s3_to_sftp_operator.py ./airflow/contrib/operators/sagemaker_base_operator.py ./airflow/contrib/operators/sagemaker_endpoint_config_operator.py @@ -130,20 +119,16 @@ ./airflow/contrib/sensors/sftp_sensor.py ./airflow/contrib/sensors/wasb_sensor.py ./airflow/contrib/sensors/weekday_sensor.py -./airflow/gcp/utils/mlengine_operator_utils.py -./airflow/gcp/utils/mlengine_prediction_summary. -./airflow/exceptions.py +./airflow/executors/__init__.py ./airflow/executors/base_executor.py ./airflow/executors/celery_executor.py ./airflow/executors/dask_executor.py -./airflow/executors/kubernetes_executor.py ./airflow/executors/local_executor.py ./airflow/executors/sequential_executor.py -./airflow/executors/__init__.py +./airflow/hooks/__init__.py ./airflow/hooks/dbapi_hook.py ./airflow/hooks/docker_hook.py ./airflow/hooks/druid_hook.py -./airflow/hooks/hdfs_hook.py ./airflow/hooks/hive_hooks.py ./airflow/hooks/http_hook.py ./airflow/hooks/jdbc_hook.py @@ -156,81 +141,20 @@ ./airflow/hooks/samba_hook.py ./airflow/hooks/sqlite_hook.py ./airflow/hooks/zendesk_hook.py -./airflow/hooks/__init__.py +./airflow/jobs/__init__.py ./airflow/jobs/backfill_job.py ./airflow/jobs/base_job.py ./airflow/jobs/local_task_job.py ./airflow/jobs/scheduler_job.py -./airflow/jobs/__init__.py -./airflow/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py -./airflow/kubernetes/kubernetes_request_factory/pod_request_factory.py -./airflow/kubernetes/kube_client.py -./airflow/kubernetes/pod.py -./airflow/kubernetes/pod_generator.py -./airflow/kubernetes/pod_launcher.py -./airflow/kubernetes/secret.py -./airflow/kubernetes/volume.py -./airflow/kubernetes/volume_mount.py -./airflow/kubernetes/worker_configuration.py -./airflow/lineage/backend/atlas/typedefs.py +./airflow/lineage/__init__.py ./airflow/lineage/backend/atlas/__init__.py -./airflow/lineage/backend/__init__.py +./airflow/lineage/backend/atlas/typedefs.py ./airflow/lineage/datasets.py -./airflow/lineage/__init__.py ./airflow/logging_config.py -./airflow/macros/hive.py ./airflow/macros/__init__.py +./airflow/macros/hive.py ./airflow/migrations/env.py -./airflow/migrations/versions/004c1210f153_increase_queue_name_size_limit.py -./airflow/migrations/versions/03bc53e68815_add_sm_dag_index.py -./airflow/migrations/versions/05f30312d566_merge_heads.py -./airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py -./airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py -./airflow/migrations/versions/004c1210f153_increase_queue_name_size_limit.py -./airflow/migrations/versions/127d2bf2dfa7_add_dag_id_state_index_on_dag_run_table.py -./airflow/migrations/versions/13eb55f81627_for_compatibility.py -./airflow/migrations/versions/1507a7289a2f_create_is_encrypted.py -./airflow/migrations/versions/1968acfc09e3_add_is_encrypted_column_to_variable_.py -./airflow/migrations/versions/1b38cef5b76e_add_dagrun.py -./airflow/migrations/versions/211e584da130_add_ti_state_index.py -./airflow/migrations/versions/27c6a30d7c24_add_executor_config_to_task_instance.py -./airflow/migrations/versions/2e541a1dcfed_task_duration.py -./airflow/migrations/versions/2e82aab8ef20_rename_user_table.py -./airflow/migrations/versions/338e90f54d61_more_logging_into_task_isntance.py -./airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py -./airflow/migrations/versions/40e67319e3a9_dagrun_config.py -./airflow/migrations/versions/41f5f12752f8_add_superuser_field.py -./airflow/migrations/versions/4446e08588_dagrun_start_end.py -./airflow/migrations/versions/4addfa1236f1_add_fractional_seconds_to_mysql_tables.py -./airflow/migrations/versions/4ebbffe0a39a_merge_heads.py -./airflow/migrations/versions/502898887f84_adding_extra_to_log.py -./airflow/migrations/versions/52d714495f0_job_id_indices.py -./airflow/migrations/versions/561833c1c74b_add_password_column_to_user.py -./airflow/migrations/versions/5e7d17757c7a_add_pid_field_to_taskinstance.py -./airflow/migrations/versions/64de9cddf6c9_add_task_fails_journal_table.py -./airflow/migrations/versions/8504051e801b_xcom_dag_task_indices.py -./airflow/migrations/versions/856955da8476_fix_sqlite_foreign_key.py -./airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py -./airflow/migrations/versions/939bb1e647c8_task_reschedule_fk_on_cascade_delete.py -./airflow/migrations/versions/947454bf1dff_add_ti_job_id_index.py -./airflow/migrations/versions/9635ae0956e7_index_faskfail.py -./airflow/migrations/versions/a56c9515abdc_remove_dag_stat_table.py -./airflow/migrations/versions/bba5a7cfc896_add_a_column_to_track_the_encryption_.py -./airflow/migrations/versions/bbc73705a13e_add_notification_sent_column_to_sla_miss.py -./airflow/migrations/versions/bdaa763e6c56_make_xcom_value_column_a_large_binary.py -./airflow/migrations/versions/bf00311e1990_add_index_to_taskinstance.py -./airflow/migrations/versions/c8ffec048a3b_add_fields_to_dag.py -./airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py -./airflow/migrations/versions/cf5dc11e79ad_drop_user_and_chart.py -./airflow/migrations/versions/d2ae31099d61_increase_text_size_for_mysql.py -./airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py -./airflow/migrations/versions/dd4ecb8fbee3_add_schedule_interval_to_dag.py -./airflow/migrations/versions/e3a246e0dc1_current_schema.py -./airflow/migrations/versions/f23433877c24_fix_mysql_not_null_constraint.py -./airflow/migrations/versions/f2ca10b85618_add_dag_stats_table.py -./airflow/migrations/versions/6e96a59344a4_make_taskinstance_pool_not_nullable.py ./airflow/models/base.py -./airflow/models/baseoperator.py ./airflow/models/connection.py ./airflow/models/crypto.py ./airflow/models/dag.py @@ -243,16 +167,13 @@ ./airflow/models/pool.py ./airflow/models/skipmixin.py ./airflow/models/slamiss.py -./airflow/models/taskfail.py ./airflow/models/taskinstance.py -./airflow/models/taskreschedule.py ./airflow/models/variable.py ./airflow/models/xcom.py -./airflow/models/__init__.py +./airflow/operators/__init__.py ./airflow/operators/bash_operator.py ./airflow/operators/check_operator.py ./airflow/operators/dagrun_operator.py -./airflow/operators/docker_operator.py ./airflow/operators/druid_check_operator.py ./airflow/operators/dummy_operator.py ./airflow/operators/email_operator.py @@ -283,10 +204,8 @@ ./airflow/operators/slack_operator.py ./airflow/operators/sqlite_operator.py ./airflow/operators/subdag_operator.py -./airflow/operators/__init__.py ./airflow/plugins_manager.py -./airflow/security/kerberos.py -./airflow/security/utils.py +./airflow/sensors/__init__.py ./airflow/sensors/base_sensor_operator.py ./airflow/sensors/external_task_sensor.py ./airflow/sensors/hdfs_sensor.py @@ -300,27 +219,24 @@ ./airflow/sensors/time_delta_sensor.py ./airflow/sensors/time_sensor.py ./airflow/sensors/web_hdfs_sensor.py -./airflow/sensors/__init__.py ./airflow/settings.py ./airflow/stats.py +./airflow/task/task_runner/__init__.py ./airflow/task/task_runner/base_task_runner.py ./airflow/task/task_runner/standard_task_runner.py -./airflow/task/task_runner/__init__.py +./airflow/ti_deps/dep_context.py ./airflow/ti_deps/deps/base_ti_dep.py -./airflow/ti_deps/deps/dagrun_exists_dep.py ./airflow/ti_deps/deps/dag_ti_slots_available_dep.py ./airflow/ti_deps/deps/dag_unpaused_dep.py +./airflow/ti_deps/deps/dagrun_exists_dep.py ./airflow/ti_deps/deps/exec_date_after_start_date_dep.py ./airflow/ti_deps/deps/not_in_retry_period_dep.py -./airflow/ti_deps/deps/not_running_dep.py -./airflow/ti_deps/deps/not_skipped_dep.py ./airflow/ti_deps/deps/prev_dagrun_dep.py ./airflow/ti_deps/deps/ready_to_reschedule.py ./airflow/ti_deps/deps/runnable_exec_date_dep.py ./airflow/ti_deps/deps/task_concurrency_dep.py ./airflow/ti_deps/deps/trigger_rule_dep.py ./airflow/ti_deps/deps/valid_state_dep.py -./airflow/ti_deps/dep_context.py ./airflow/utils/asciiart.py ./airflow/utils/cli_action_loggers.py ./airflow/utils/compression.py @@ -335,7 +251,6 @@ ./airflow/utils/json.py ./airflow/utils/log/es_task_handler.py ./airflow/utils/log/file_processor_handler.py -./airflow/utils/log/file_task_handler.py ./airflow/utils/log/gcs_task_handler.py ./airflow/utils/log/logging_mixin.py ./airflow/utils/log/s3_task_handler.py @@ -364,11 +279,6 @@ ./airflow/www/validators.py ./airflow/www/views.py ./airflow/www/widgets.py -./airflow/__init__.py -./docs/conf.py -./docs/exts/docroles.py -./docs/exts/exampleinclude.py -./docs/exts/removemarktransform.py ./scripts/perf/scheduler_ops_metrics.py ./tests/cli/test_cli.py ./tests/cli/test_worker_initialisation.py @@ -376,20 +286,14 @@ ./tests/contrib/hooks/test_aws_lambda_hook.py ./tests/contrib/hooks/test_azure_container_instance_hook.py ./tests/contrib/hooks/test_azure_cosmos_hook.py -./tests/gcp/hooks/test_bigquery.py ./tests/contrib/hooks/test_cassandra_hook.py ./tests/contrib/hooks/test_databricks_hook.py -./tests/contrib/hooks/test_datastore_hook.py -./tests/contrib/hooks/test_discord_webhook_hook.py -./tests/contrib/hooks/test_ftp_hook.py +./tests/contrib/hooks/test_imap_hook.py ./tests/contrib/hooks/test_jira_hook.py ./tests/contrib/hooks/test_mongo_hook.py ./tests/contrib/hooks/test_openfaas_hook.py ./tests/contrib/hooks/test_opsgenie_alert_hook.py ./tests/contrib/hooks/test_redshift_hook.py -./tests/contrib/hooks/test_salesforce_hook.py -./tests/contrib/hooks/test_segment_hook.py -./tests/contrib/hooks/test_sftp_hook.py ./tests/contrib/hooks/test_slack_webhook_hook.py ./tests/contrib/hooks/test_snowflake_hook.py ./tests/contrib/hooks/test_spark_sql_hook.py @@ -404,14 +308,11 @@ ./tests/contrib/operators/test_ecs_operator.py ./tests/contrib/operators/test_emr_add_steps_operator.py ./tests/contrib/operators/test_emr_create_job_flow_operator.py -./tests/operators/test_gcs_to_s3.py -./tests/contrib/operators/test_grpc_operator.py ./tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py ./tests/contrib/operators/test_qubole_check_operator.py ./tests/contrib/operators/test_redis_publish_operator.py ./tests/contrib/operators/test_s3_copy_object_operator.py ./tests/contrib/operators/test_s3_delete_objects_operator.py -./tests/contrib/operators/test_s3_to_gcs_operator.py ./tests/contrib/operators/test_s3_to_sftp_operator.py ./tests/contrib/operators/test_sftp_operator.py ./tests/contrib/operators/test_snowflake_operator.py @@ -425,16 +326,10 @@ ./tests/contrib/sensors/test_redis_pub_sub_sensor.py ./tests/contrib/sensors/test_sqs_sensor.py ./tests/contrib/sensors/test_weekday_sensor.py -./tests/contrib/utils/logging_command_executor.py -./tests/contrib/utils/run_once_decorator.py -./tests/contrib/utils/test_sendgrid.py -./tests/contrib/utils/test_weekday.py ./tests/core.py ./tests/executors/test_base_executor.py ./tests/executors/test_celery_executor.py ./tests/executors/test_dask_executor.py -./tests/executors/test_executor.py -./tests/executors/test_kubernetes_executor.py ./tests/hooks/test_docker_hook.py ./tests/hooks/test_druid_hook.py ./tests/hooks/test_hive_hook.py @@ -445,16 +340,10 @@ ./tests/hooks/test_webhdfs_hook.py ./tests/jobs/test_backfill_job.py ./tests/jobs/test_base_job.py -./tests/jobs/test_local_task_job.py ./tests/jobs/test_scheduler_job.py -./tests/kubernetes/kubernetes_request_factory/test_kubernetes_request_factory.py -./tests/kubernetes/kubernetes_request_factory/test_pod_request_factory.py -./tests/kubernetes/test_pod_launcher.py ./tests/lineage/backend/test_atlas.py ./tests/lineage/test_lineage.py ./tests/macros/test_hive.py -./tests/minikube/test_kubernetes_executor.py -./tests/minikube/test_kubernetes_pod_operator.py ./tests/models/test_cleartasks.py ./tests/models/test_dag.py ./tests/models/test_dagbag.py @@ -462,10 +351,8 @@ ./tests/models/test_pool.py ./tests/models/test_skipmixin.py ./tests/models/test_taskinstance.py -./tests/models/test_variable.py ./tests/operators/test_bash_operator.py -./tests/operators/test_check_operator.py -./tests/operators/test_docker_operator.py +./tests/operators/test_gcs_to_s3.py ./tests/operators/test_hive_operator.py ./tests/operators/test_hive_to_druid.py ./tests/operators/test_mssql_to_hive.py @@ -488,20 +375,17 @@ ./tests/sensors/test_sql_sensor.py ./tests/sensors/test_timedelta_sensor.py ./tests/sensors/test_timeout_sensor.py -./tests/task/task_runner/test_standard_task_runner.py ./tests/task/__init__.py +./tests/task/task_runner/test_standard_task_runner.py ./tests/test_configuration.py ./tests/test_local_settings.py ./tests/test_logging_config.py ./tests/test_stats.py -./tests/test_utils/db.py -./tests/test_utils/decorators.py ./tests/test_utils/reset_warning_registry.py ./tests/ti_deps/deps/fake_models.py ./tests/ti_deps/deps/test_dagrun_exists_dep.py ./tests/ti_deps/deps/test_ready_to_reschedule_dep.py ./tests/ti_deps/deps/test_runnable_exec_date_dep.py -./tests/utils/log/elasticmock/fake_elasticsearch.py ./tests/utils/log/test_es_task_handler.py ./tests/utils/log/test_file_processor_handler.py ./tests/utils/log/test_s3_task_handler.py @@ -512,15 +396,11 @@ ./tests/utils/test_db.py ./tests/utils/test_decorators.py ./tests/utils/test_helpers.py -./tests/utils/test_json.py -./tests/utils/test_logging_mixin.py ./tests/utils/test_log_handlers.py -./tests/utils/test_module_loading.py ./tests/utils/test_tests.py ./tests/utils/test_timezone.py ./tests/www/api/experimental/test_endpoints.py ./tests/www/api/experimental/test_kerberos_endpoints.py ./tests/www/test_security.py ./tests/www/test_utils.py -./tests/www/test_validators.py ./tests/www/test_views.py diff --git a/setup.py b/setup.py index 056ae6c6fa795..9b988a4bc1fec 100644 --- a/setup.py +++ b/setup.py @@ -16,9 +16,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Setup for the Airflow library.""" +"""Setup.py for the Airflow project.""" -import importlib +from importlib import util import io import logging import os @@ -33,11 +33,11 @@ # Kept manually in sync with airflow.__version__ # noinspection PyUnresolvedReferences -spec = importlib.util.spec_from_file_location("airflow.version", os.path.join('airflow', 'version.py')) +spec = util.spec_from_file_location("airflow.version", os.path.join('airflow', 'version.py')) # noinspection PyUnresolvedReferences -mod = importlib.util.module_from_spec(spec) -spec.loader.exec_module(mod) -version = mod.version +mod = util.module_from_spec(spec) +spec.loader.exec_module(mod) # type: ignore +version = mod.version # type: ignore PY3 = sys.version_info[0] == 3 diff --git a/tests/contrib/hooks/test_grpc_hook.py b/tests/contrib/hooks/test_grpc_hook.py index 476c43cb891a8..5da9b681c3ee4 100644 --- a/tests/contrib/hooks/test_grpc_hook.py +++ b/tests/contrib/hooks/test_grpc_hook.py @@ -1,16 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import unittest from io import StringIO @@ -47,8 +52,9 @@ def get_airflow_connection_with_port(): ) +# noinspection PyMethodMayBeStatic,PyUnusedLocal class StubClass: - def __init__(self, _): + def __init__(self, _): # pylint: disable=unused-argument pass def single_call(self, data): @@ -62,6 +68,7 @@ class TestGrpcHook(unittest.TestCase): def setUp(self): self.channel_mock = mock.patch('grpc.Channel').start() + # noinspection PyUnusedLocal def custom_conn_func(self, _): mocked_channel = self.channel_mock.return_value return mocked_channel diff --git a/tests/contrib/hooks/test_salesforce_hook.py b/tests/contrib/hooks/test_salesforce_hook.py index 5637e2f54b306..a0067e13904ee 100644 --- a/tests/contrib/hooks/test_salesforce_hook.py +++ b/tests/contrib/hooks/test_salesforce_hook.py @@ -20,8 +20,9 @@ import unittest -import pandas as pd from unittest.mock import patch, Mock + +import pandas as pd from simple_salesforce import Salesforce from airflow.contrib.hooks.salesforce_hook import SalesforceHook diff --git a/tests/contrib/operators/test_aws_athena_operator.py b/tests/contrib/operators/test_aws_athena_operator.py index c3d98e25db97d..9a6124ae04f4a 100644 --- a/tests/contrib/operators/test_aws_athena_operator.py +++ b/tests/contrib/operators/test_aws_athena_operator.py @@ -1,21 +1,21 @@ # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + # import unittest @@ -47,6 +47,8 @@ } +# noinspection PyUnusedLocal +# pylint: disable=unused-argument class TestAWSAthenaOperator(unittest.TestCase): def setUp(self): @@ -141,6 +143,7 @@ def test_xcom_push_and_pull(self, mock_conn, mock_run_query, mock_check_query_st self.assertEqual(ti.xcom_pull(task_ids='test_aws_athena_operator'), ATHENA_QUERY_ID) +# pylint: enable=unused-argument if __name__ == '__main__': diff --git a/tests/contrib/operators/test_grpc_operator.py b/tests/contrib/operators/test_grpc_operator.py index 77c0747369c18..91648e2dbf521 100644 --- a/tests/contrib/operators/test_grpc_operator.py +++ b/tests/contrib/operators/test_grpc_operator.py @@ -1,16 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import unittest from airflow.contrib.operators.grpc_operator import GrpcOperator diff --git a/tests/contrib/operators/test_sagemaker_base_operator.py b/tests/contrib/operators/test_sagemaker_base_operator.py index af2d64e81b695..76af826bc83cf 100644 --- a/tests/contrib/operators/test_sagemaker_base_operator.py +++ b/tests/contrib/operators/test_sagemaker_base_operator.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. diff --git a/tests/contrib/operators/test_sagemaker_training_operator.py b/tests/contrib/operators/test_sagemaker_training_operator.py index d12758060d0f8..919b3f1bcac0c 100644 --- a/tests/contrib/operators/test_sagemaker_training_operator.py +++ b/tests/contrib/operators/test_sagemaker_training_operator.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. @@ -79,6 +79,8 @@ } +# noinspection PyUnusedLocal +# pylint: disable=unused-argument class TestSageMakerTrainingOperator(unittest.TestCase): def setUp(self): @@ -120,6 +122,7 @@ def test_execute_with_failure(self, mock_training, mock_client): 'ResponseMetadata': {'HTTPStatusCode': 404}} self.assertRaises(AirflowException, self.sagemaker.execute, None) +# pylint: enable=unused-argument if __name__ == '__main__': diff --git a/tests/contrib/utils/gcp_authenticator.py b/tests/contrib/utils/gcp_authenticator.py index b6134ab43f630..9ccc6ab2de25b 100644 --- a/tests/contrib/utils/gcp_authenticator.py +++ b/tests/contrib/utils/gcp_authenticator.py @@ -60,20 +60,16 @@ class GcpAuthenticator(LoggingCommandExecutor): """ - Manages authentication to Google Cloud Platform. It helps to manage - connection - it can authenticate with the gcp key name specified + Initialises the authenticator. + + :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values) + :param project_extra: optional extra project parameter passed to google cloud + connection """ original_account = None # type: Optional[str] - def __init__(self, gcp_key, project_extra=None): - """ - Initialises the authenticator. - - :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values) - :param project_extra: optional extra project parameter passed to google cloud - connection - """ + def __init__(self, gcp_key: str, project_extra: str = None): super().__init__() self.gcp_key = gcp_key self.project_extra = project_extra diff --git a/tests/dags/test_clear_subdag.py b/tests/dags/test_clear_subdag.py index 467ed10a22e45..7385b686f676b 100644 --- a/tests/dags/test_clear_subdag.py +++ b/tests/dags/test_clear_subdag.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,7 +14,9 @@ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations +# under the License. +# import datetime diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index 12ad1bc0305c4..ae965c9260606 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -16,7 +16,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - from datetime import timedelta import airflow diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index f258a69b612f0..c4c6af25e0f29 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -16,12 +16,12 @@ # under the License. # +from datetime import datetime import unittest import re import string import random from urllib3 import HTTPResponse -from datetime import datetime from tests.compat import mock try: @@ -33,6 +33,7 @@ AirflowKubernetesScheduler = None # type: ignore +# pylint: disable=unused-argument class TestAirflowKubernetesScheduler(unittest.TestCase): @staticmethod def _gen_random_string(seed, str_len): @@ -86,15 +87,15 @@ def test_make_safe_label_value(self): self.assertTrue(self._is_safe_label_value(safe_dag_id)) safe_task_id = AirflowKubernetesScheduler._make_safe_label_value(task_id) self.assertTrue(self._is_safe_label_value(safe_task_id)) - id = "my_dag_id" + dag_id = "my_dag_id" self.assertEqual( - id, - AirflowKubernetesScheduler._make_safe_label_value(id) + dag_id, + AirflowKubernetesScheduler._make_safe_label_value(dag_id) ) - id = "my_dag_id_" + "a" * 64 + dag_id = "my_dag_id_" + "a" * 64 self.assertEqual( "my_dag_id_" + "a" * 43 + "-0ce114c45", - AirflowKubernetesScheduler._make_safe_label_value(id) + AirflowKubernetesScheduler._make_safe_label_value(dag_id) ) @unittest.skipIf(AirflowKubernetesScheduler is None, @@ -122,43 +123,43 @@ class TestKubernetesExecutor(unittest.TestCase): def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watcher): # When a quota is exceeded this is the ApiException we get - r = HTTPResponse( + response = HTTPResponse( body='{"kind": "Status", "apiVersion": "v1", "metadata": {}, "status": "Failure", ' '"message": "pods \\"podname\\" is forbidden: exceeded quota: compute-resources, ' 'requested: limits.memory=4Gi, used: limits.memory=6508Mi, limited: limits.memory=10Gi", ' '"reason": "Forbidden", "details": {"name": "podname", "kind": "pods"}, "code": 403}') - r.status = 403 - r.reason = "Forbidden" + response.status = 403 + response.reason = "Forbidden" # A mock kube_client that throws errors when making a pod mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True) mock_kube_client.create_namespaced_pod = mock.MagicMock( - side_effect=ApiException(http_resp=r)) + side_effect=ApiException(http_resp=response)) mock_get_kube_client.return_value = mock_kube_client mock_api_client = mock.MagicMock() mock_api_client.sanitize_for_serialization.return_value = {} mock_kube_client.api_client = mock_api_client - kubernetesExecutor = KubernetesExecutor() - kubernetesExecutor.start() + kubernetes_executor = KubernetesExecutor() + kubernetes_executor.start() # Execute a task while the Api Throws errors try_number = 1 - kubernetesExecutor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number), - command='command', executor_config={}) - kubernetesExecutor.sync() - kubernetesExecutor.sync() + kubernetes_executor.execute_async(key=('dag', 'task', datetime.utcnow(), try_number), + command='command', executor_config={}) + kubernetes_executor.sync() + kubernetes_executor.sync() assert mock_kube_client.create_namespaced_pod.called - self.assertFalse(kubernetesExecutor.task_queue.empty()) + self.assertFalse(kubernetes_executor.task_queue.empty()) # Disable the ApiException mock_kube_client.create_namespaced_pod.side_effect = None # Execute the task without errors should empty the queue - kubernetesExecutor.sync() + kubernetes_executor.sync() assert mock_kube_client.create_namespaced_pod.called - self.assertTrue(kubernetesExecutor.task_queue.empty()) + self.assertTrue(kubernetes_executor.task_queue.empty()) @mock.patch('airflow.executors.kubernetes_executor.KubeConfig') @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.sync') @@ -207,6 +208,7 @@ def test_change_state_failed(self, mock_delete_pod, mock_get_kube_client, mock_k executor._change_state(key, State.FAILED, 'pod_id') self.assertTrue(executor.event_buffer[key] == State.FAILED) mock_delete_pod.assert_called_once_with('pod_id') +# pylint: enable=unused-argument @mock.patch('airflow.executors.kubernetes_executor.KubeConfig') @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') diff --git a/tests/gcp/hooks/test_bigquery.py b/tests/gcp/hooks/test_bigquery.py index 4253ae43841ff..6701228f90c97 100644 --- a/tests/gcp/hooks/test_bigquery.py +++ b/tests/gcp/hooks/test_bigquery.py @@ -68,8 +68,8 @@ class TestPandasGbqCredentials(unittest.TestCase): return_value=("CREDENTIALS", "PROJECT_ID",) ) @mock.patch('airflow.gcp.hooks.bigquery.read_gbq') - def test_credentials_provided(self, mock_read_gbq, mock_get_credentials_and_project_id): - self.instance = hook.BigQueryHook() + def test_credentials_provided(self, mock_read_gbq, _): + self.instance = hook.BigQueryHook() # pylint: disable=attribute-defined-outside-init self.instance.get_pandas_df('select 1') @@ -268,6 +268,8 @@ def mock_poll_job_complete(job_id): return job_id in mock_canceled_jobs +# pylint: disable=invalid-name +# noinspection PyUnusedLocal def mock_job_cancel(projectId, jobId): # pylint: disable=unused-argument mock_canceled_jobs.append(jobId) return mock.Mock() diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py index bb8c16da8d7ca..9e0c288dd1d4f 100644 --- a/tests/kubernetes/test_pod_launcher.py +++ b/tests/kubernetes/test_pod_launcher.py @@ -14,15 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import unittest +import mock from requests.exceptions import BaseHTTPError from airflow import AirflowException from airflow.kubernetes.pod_launcher import PodLauncher -import unittest -import mock - class TestPodLauncher(unittest.TestCase): diff --git a/tests/minikube/test_kubernetes_executor.py b/tests/minikube/test_kubernetes_executor.py index 96e2284325466..b595243a28696 100644 --- a/tests/minikube/test_kubernetes_executor.py +++ b/tests/minikube/test_kubernetes_executor.py @@ -17,19 +17,19 @@ import os +import re +import time import unittest + from subprocess import check_call, check_output import requests.exceptions import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -import time - -import re try: check_call(["/usr/local/bin/kubectl", "get", "pods"]) -except Exception as e: +except Exception as e: # pylint: disable=broad-except if os.environ.get('KUBERNETES_VERSION'): raise e else: diff --git a/tests/minikube/test_kubernetes_pod_operator.py b/tests/minikube/test_kubernetes_pod_operator.py index a5984a63ca96d..d88779ab0adfb 100644 --- a/tests/minikube/test_kubernetes_pod_operator.py +++ b/tests/minikube/test_kubernetes_pod_operator.py @@ -16,14 +16,18 @@ # under the License. import unittest +from unittest import mock +from unittest.mock import ANY + import os import shutil import json -from unittest.mock import ANY from subprocess import check_call + from kubernetes.client.rest import ApiException import kubernetes.client.models as k8s from kubernetes.client.api_client import ApiClient + from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.kubernetes.secret import Secret from airflow import AirflowException @@ -32,11 +36,10 @@ from airflow.kubernetes.pod_generator import PodDefaults from airflow.kubernetes.volume_mount import VolumeMount from airflow.kubernetes.volume import Volume -from tests.compat import mock try: check_call(["/usr/local/bin/kubectl", "get", "pods"]) -except Exception as e: +except Exception as e: # pylint: disable=broad-except if os.environ.get('KUBERNETES_VERSION'): raise e else: @@ -46,10 +49,11 @@ ) +# pylint: disable=unused-argument class TestKubernetesPodOperator(unittest.TestCase): def setUp(self): - self.maxDiff = None + self.maxDiff = None # pylint: disable=invalid-name self.api_client = ApiClient() self.expected_pod = { 'apiVersion': 'v1', @@ -584,5 +588,6 @@ def test_envs_from_secrets(self, mock_client, launcher_mock): ) +# pylint: enable=unused-argument if __name__ == '__main__': unittest.main() diff --git a/tests/operators/test_docker_operator.py b/tests/operators/test_docker_operator.py index ef348eabfdd88..68b8450d9dad5 100644 --- a/tests/operators/test_docker_operator.py +++ b/tests/operators/test_docker_operator.py @@ -20,6 +20,7 @@ import unittest import logging +from airflow.exceptions import AirflowException try: from airflow.operators.docker_operator import DockerOperator from airflow.hooks.docker_hook import DockerHook @@ -27,7 +28,6 @@ except ImportError: pass -from airflow.exceptions import AirflowException from tests.compat import mock @@ -125,7 +125,7 @@ def test_execute_unicode_logs(self, client_class_mock): client_class_mock.return_value = client_mock - originalRaiseExceptions = logging.raiseExceptions + originalRaiseExceptions = logging.raiseExceptions # pylint: disable=invalid-name logging.raiseExceptions = True operator = DockerOperator(image='ubuntu', owner='unittest', task_id='unittest') diff --git a/tests/utils/log/elasticmock/__init__.py b/tests/utils/log/elasticmock/__init__.py index 6d4ce51fa26ba..287963368c19c 100644 --- a/tests/utils/log/elasticmock/__init__.py +++ b/tests/utils/log/elasticmock/__init__.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -21,7 +38,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +"""Elastic mock module used for testing""" from functools import wraps from typing import Dict @@ -33,7 +50,8 @@ ELASTIC_INSTANCES = {} # type: Dict[str, FakeElasticsearch] -def _get_elasticmock(hosts=None, *args, **kwargs): +# noinspection PyUnusedLocal +def _get_elasticmock(hosts=None, *args, **kwargs): # pylint: disable=unused-argument host = _normalize_hosts(hosts)[0] elastic_key = '{0}:{1}'.format( host.get('host', 'localhost'), host.get('port', 9200) @@ -47,11 +65,12 @@ def _get_elasticmock(hosts=None, *args, **kwargs): return connection -def elasticmock(f): - @wraps(f) +def elasticmock(function): + """Elasticmock decorator""" + @wraps(function) def decorated(*args, **kwargs): ELASTIC_INSTANCES.clear() with patch('elasticsearch.Elasticsearch', _get_elasticmock): - result = f(*args, **kwargs) + result = function(*args, **kwargs) return result return decorated diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py b/tests/utils/log/elasticmock/fake_elasticsearch.py index f69a1ce0c20b3..a4c4646bc4ab8 100644 --- a/tests/utils/log/elasticmock/fake_elasticsearch.py +++ b/tests/utils/log/elasticmock/fake_elasticsearch.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -31,6 +48,8 @@ from .utilities import get_random_id +# pylint: disable=redefined-builtin +# noinspection PyShadowingBuiltins class FakeElasticsearch(Elasticsearch): __documents_dict = None @@ -101,15 +120,7 @@ def exists(self, index, doc_type, id, params=None): def get(self, index, id, doc_type='_all', params=None): result = None if index in self.__documents_dict: - for document in self.__documents_dict[index]: - if document.get('_id') == id: - if doc_type == '_all': - result = document - break - else: - if document.get('_type') == doc_type: - result = document - break + result = self.find_document(doc_type, id, index, result) if result: result['found'] = True @@ -124,6 +135,17 @@ def get(self, index, id, doc_type='_all', params=None): return result + def find_document(self, doc_type, id, index, result): + for document in self.__documents_dict[index]: + if document.get('_id') == id: + if doc_type == '_all': + result = document + break + elif document.get('_type') == doc_type: + result = document + break + return result + @query_params('_source', '_source_exclude', '_source_include', 'parent', 'preference', 'realtime', 'refresh', 'routing', 'version', 'version_type') @@ -172,7 +194,7 @@ def count(self, index=None, doc_type=None, body=None, params=None): def search(self, index=None, doc_type=None, body=None, params=None): searchable_indexes = self._normalize_index_to_list(index) - matches = self._find_match(index, doc_type, body, params) + matches = self._find_match(index, doc_type, body) result = { 'hits': { @@ -250,7 +272,7 @@ def suggest(self, body, index=None, params=None): ] return result_dict - def _find_match(self, index, doc_type, body, params=None): # pylint: disable=unused-argument + def _find_match(self, index, doc_type, body): # pylint: disable=unused-argument searchable_indexes = self._normalize_index_to_list(index) searchable_doc_types = self._normalize_doc_type_to_list(doc_type) @@ -258,23 +280,29 @@ def _find_match(self, index, doc_type, body, params=None): # pylint: disable=un matches = [] for searchable_index in searchable_indexes: - for document in self.__documents_dict[searchable_index]: - if searchable_doc_types\ - and document.get('_type') not in searchable_doc_types: - continue - - if 'match_phrase' in must: - for query_id in must['match_phrase']: - query_val = must['match_phrase'][query_id] - if query_id in document['_source']: - if query_val in document['_source'][query_id]: - # use in as a proxy for match_phrase - matches.append(document) - else: - matches.append(document) + self.find_document_in_searchable_index(matches, must, searchable_doc_types, searchable_index) return matches + def find_document_in_searchable_index(self, matches, must, searchable_doc_types, searchable_index): + for document in self.__documents_dict[searchable_index]: + if searchable_doc_types and document.get('_type') not in searchable_doc_types: + continue + + if 'match_phrase' in must: + self.match_must_phrase(document, matches, must) + else: + matches.append(document) + + @staticmethod + def match_must_phrase(document, matches, must): + for query_id in must['match_phrase']: + query_val = must['match_phrase'][query_id] + if query_id in document['_source']: + if query_val in document['_source'][query_id]: + # use in as a proxy for match_phrase + matches.append(document) + def _normalize_index_to_list(self, index): # Ensure to have a list of index if index is None: @@ -310,3 +338,4 @@ def _normalize_doc_type_to_list(doc_type): raise ValueError("Invalid param 'index'") return searchable_doc_types +# pylint: enable=redefined-builtin diff --git a/tests/utils/log/elasticmock/utilities/__init__.py b/tests/utils/log/elasticmock/utilities/__init__.py index 19438ba1c5e46..d9abb4839fcc3 100644 --- a/tests/utils/log/elasticmock/utilities/__init__.py +++ b/tests/utils/log/elasticmock/utilities/__init__.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -21,7 +38,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +"""Utilities for Elastic mock""" import random import string @@ -30,4 +47,5 @@ def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE): + """Returns random if for elasticsearch""" return ''.join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in range(size))