From f8be014d9fa299e461acec85eac5276bd7a977d1 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 17 Sep 2019 13:16:32 +0200 Subject: [PATCH] [AIRFLOW-5256] Related pylint changes for common licences in python files (#5786) (cherry picked from commit 47801057989046dfcf7b424ce54afee103803815) --- .pre-commit-config.yaml | 14 +++- airflow/api/auth/backend/kerberos_auth.py | 17 ++++ airflow/config_templates/default_celery.py | 2 +- .../example_gcp_bigtable_operators.py | 6 +- .../contrib/executors/kubernetes_executor.py | 45 +++++++---- .../hooks/azure_container_instance_hook.py | 25 +++--- airflow/contrib/hooks/gcp_mlengine_hook.py | 28 +++---- airflow/contrib/hooks/grpc_hook.py | 38 +++++---- airflow/contrib/hooks/qubole_hook.py | 45 ++++++++--- airflow/contrib/kubernetes/kube_client.py | 15 +++- airflow/contrib/kubernetes/pod_launcher.py | 48 +++++++++--- .../operators/aws_sqs_publish_operator.py | 6 +- .../operators/kubernetes_pod_operator.py | 6 +- .../contrib/operators/mlengine_operator.py | 29 +++---- airflow/contrib/operators/qubole_operator.py | 8 +- .../contrib/utils/mlengine_operator_utils.py | 29 +++---- .../utils/mlengine_prediction_summary.py | 31 ++++---- airflow/example_dags/docker_copy_data.py | 1 + airflow/exceptions.py | 25 ++---- airflow/lineage/backend/__init__.py | 2 + ...4_add_kubernetes_resource_checkpointing.py | 4 +- ...5c0_add_kubernetes_scheduler_uniqueness.py | 4 +- .../versions/dd25f486b8ea_add_idx_log_dag.py | 4 +- airflow/models/__init__.py | 2 +- airflow/models/baseoperator.py | 64 ++++++++++----- airflow/models/taskfail.py | 2 +- airflow/models/taskreschedule.py | 2 +- airflow/operators/docker_operator.py | 20 ++++- airflow/security/kerberos.py | 40 +++++++++- airflow/security/utils.py | 23 +++++- airflow/utils/log/file_task_handler.py | 37 ++++----- dev/airflow-jira | 28 +++---- dev/airflow-license | 28 +++---- dev/airflow-pr | 28 +++---- docs/conf.py | 2 +- docs/exts/__init__.py | 1 - docs/exts/docroles.py | 52 +++++++++---- docs/exts/exampleinclude.py | 68 ++++++++++++---- docs/exts/removemarktransform.py | 13 ++-- scripts/ci/ci_refresh_pylint_todo.sh | 37 +++++++++ .../ci/in_container/_in_container_utils.sh | 54 +++++++++++++ .../ci/in_container/refresh_pylint_todo.sh | 33 ++++++++ setup.py | 7 +- tests/contrib/hooks/test_grpc_hook.py | 47 ++++++----- .../minikube/test_kubernetes_executor.py | 6 +- .../minikube/test_kubernetes_pod_operator.py | 3 +- .../operators/test_aws_athena_operator.py | 9 ++- .../operators/test_sagemaker_base_operator.py | 6 +- .../test_sagemaker_training_operator.py | 9 ++- .../sensors/test_gcs_upload_session_sensor.py | 8 +- tests/contrib/utils/gcp_authenticator.py | 17 ++-- tests/dags/test_clear_subdag.py | 3 +- tests/dags/test_example_bash_operator.py | 2 +- tests/kubernetes/test_pod_launcher.py | 5 +- tests/operators/test_docker_operator.py | 4 +- tests/utils/log/elasticmock/__init__.py | 29 +++++-- .../log/elasticmock/fake_elasticsearch.py | 77 +++++++++++++------ .../log/elasticmock/utilities/__init__.py | 20 ++++- 58 files changed, 848 insertions(+), 370 deletions(-) create mode 100755 scripts/ci/ci_refresh_pylint_todo.sh create mode 100755 scripts/ci/in_container/refresh_pylint_todo.sh diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b490421c0882a..62322b1e81aff 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -92,7 +92,7 @@ repos: - license-templates/LICENSE.txt - --fuzzy-match-generates-todo - id: insert-license - name: Add licence for shell files + name: Add licence for all shell files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ types: [shell] files: ^breeze$|^breeze-complete$|\.sh$ @@ -102,6 +102,16 @@ repos: - --license-filepath - license-templates/LICENSE.txt - --fuzzy-match-generates-todo + - id: insert-license + name: Add licence for all python files + exclude: ^\.github/.*$"|^airflow/_vendor/.*$ + types: [python] + args: + - --comment-style + - "|#|" + - --license-filepath + - license-templates/LICENSE.txt + - --fuzzy-match-generates-todo - id: insert-license name: Add licence for all XML files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ @@ -113,7 +123,7 @@ repos: - license-templates/LICENSE.txt - --fuzzy-match-generates-todo - id: insert-license - name: Add licence for yaml files + name: Add licence for all yaml files exclude: ^\.github/.*$"|^airflow/_vendor/.*$ types: [yaml] args: diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py index dc5ca1d05ea79..bf0b89710321d 100644 --- a/airflow/api/auth/backend/kerberos_auth.py +++ b/airflow/api/auth/backend/kerberos_auth.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # Copyright (c) 2013, Michael Komitee # All rights reserved. diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 7a9fd25064b8a..3d8a767b0ea9c 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Default celery configuration.""" import ssl from airflow import configuration diff --git a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py index aca820a0ecb41..a4241ced94019 100644 --- a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py +++ b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 2adb526337a44..b0e45cac3fcf9 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Kubernetes executor""" import base64 import hashlib from queue import Empty @@ -22,8 +22,10 @@ import re import json import multiprocessing -from dateutil import parser from uuid import uuid4 + +from dateutil import parser + import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException @@ -42,6 +44,9 @@ from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin +MAX_POD_ID_LEN = 253 +MAX_LABEL_LEN = 63 + class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, @@ -125,6 +130,7 @@ def as_dict(self): class KubeConfig: + """Configuration for Kubernetes""" core_section = 'core' kubernetes_section = 'kubernetes' @@ -280,8 +286,8 @@ def __init__(self): # and only return a blank string if contexts are not set. def _get_security_context_val(self, scontext): val = configuration.get(self.kubernetes_section, scontext) - if len(val) == 0: - return val + if not val: + return 0 else: return int(val) @@ -308,7 +314,8 @@ def _validate(self): 'through ssh key, but not both') -class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): +class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): + """Watches for Kubernetes jobs""" def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace @@ -318,6 +325,7 @@ def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube self.kube_config = kube_config def run(self): + """Performs watching""" kube_client = get_kube_client() while True: try: @@ -363,6 +371,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): return last_resource_version def process_error(self, event): + """Process error response""" self.log.error( 'Encountered Error response from k8s list namespaced pod stream => %s', event @@ -371,16 +380,17 @@ def process_error(self, event): if raw_object['code'] == 410: self.log.info( 'Kubernetes resource version is too old, must reset to 0 => %s', - raw_object['message'] + (raw_object['message'],) ) # Return resource version 0 return '0' raise AirflowException( - 'Kubernetes failure for %s with code %s and message: %s', - raw_object['reason'], raw_object['code'], raw_object['message'] + 'Kubernetes failure for %s with code %s and message: %s' % + (raw_object['reason'], raw_object['code'], raw_object['message']) ) def process_status(self, pod_id, status, labels, resource_version): + """Process status response""" if status == 'Pending': self.log.info('Event: %s Pending', pod_id) elif status == 'Failed': @@ -399,6 +409,7 @@ def process_status(self, pod_id, status, labels, resource_version): class AirflowKubernetesScheduler(LoggingMixin): + """Airflow Scheduler for Kubernetes""" def __init__(self, kube_config, task_queue, result_queue, kube_client, worker_uuid): self.log.debug("Creating Kubernetes executor") self.kube_config = kube_config @@ -432,7 +443,6 @@ def _health_check_kube_watcher(self): def run_next(self, next_job): """ - The run_next command will check the task_queue for any un-run jobs. It will then create a unique job-id, launch that job in the cluster, and store relevant info in the current_jobs map so we can track the job's @@ -457,6 +467,7 @@ def run_next(self, next_job): self.log.debug("Kubernetes Job created!") def delete_pod(self, pod_id): + """Deletes POD""" try: self.kube_client.delete_namespaced_pod( pod_id, self.namespace, body=client.V1DeleteOptions(), @@ -487,6 +498,7 @@ def sync(self): break def process_watcher_task(self, task): + """Process the task by watcher.""" pod_id, state, labels, resource_version = task self.log.info( 'Attempting to finish pod; pod_id: %s; state: %s; labels: %s', @@ -524,8 +536,6 @@ def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid): :param random_uuid: a uuid :return: ``str`` valid Pod name of appropriate length """ - MAX_POD_ID_LEN = 253 - safe_key = safe_dag_id + safe_task_id safe_pod_id = safe_key[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid @@ -543,8 +553,6 @@ def _make_safe_label_value(string): way from the original value sent to this function, then we need to truncate to 53chars, and append it with a unique hash. """ - MAX_LABEL_LEN = 63 - safe_label = re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string) if len(safe_label) > MAX_LABEL_LEN or string != safe_label: @@ -635,11 +643,13 @@ def _labels_to_key(self, labels): return None def terminate(self): + """Termninates the watcher.""" self.watcher_queue.join() self._manager.shutdown() class KubernetesExecutor(BaseExecutor, LoggingMixin): + """Executor for Kubernetes""" def __init__(self): self.kube_config = KubeConfig() self.task_queue = None @@ -676,6 +686,8 @@ def clear_not_launched_queued_tasks(self, session=None): ) for task in queued_tasks: + # noinspection PyProtectedMember + # pylint: disable=protected-access dict_string = ( "dag_id={},task_id={},execution_date={},airflow-worker={}".format( AirflowKubernetesScheduler._make_safe_label_value(task.dag_id), @@ -686,13 +698,14 @@ def clear_not_launched_queued_tasks(self, session=None): self.worker_uuid ) ) + # pylint: enable=protected-access kwargs = dict(label_selector=dict_string) if self.kube_config.kube_client_request_args: for key, value in self.kube_config.kube_client_request_args.items(): kwargs[key] = value pod_list = self.kube_client.list_namespaced_pod( self.kube_config.kube_namespace, **kwargs) - if len(pod_list.items) == 0: + if not pod_list.items: self.log.info( 'TaskInstance: %s found in queued state but was not launched, ' 'rescheduling', task @@ -740,6 +753,7 @@ def _create_or_update_secret(secret_name, secret_path): _create_or_update_secret(service_account['name'], service_account['path']) def start(self): + """Starts the executor""" self.log.info('Start Kubernetes executor') self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid() self.log.debug('Start with worker_uuid: %s', self.worker_uuid) @@ -759,6 +773,7 @@ def start(self): self.clear_not_launched_queued_tasks() def execute_async(self, key, command, queue=None, executor_config=None): + """Executes task asynchronously""" self.log.info( 'Add task %s with command %s with executor_config %s', key, command, executor_config @@ -767,6 +782,7 @@ def execute_async(self, key, command, queue=None, executor_config=None): self.task_queue.put((key, command, kube_executor_config)) def sync(self): + """Synchronize task state.""" if self.running: self.log.debug('self.running: %s', self.running) if self.queued_tasks: @@ -825,6 +841,7 @@ def _change_state(self, key, state, pod_id): self.event_buffer[key] = state def end(self): + """Called when the executor shuts down""" self.log.info('Shutting down Kubernetes executor') self.task_queue.join() self.result_queue.join() diff --git a/airflow/contrib/hooks/azure_container_instance_hook.py b/airflow/contrib/hooks/azure_container_instance_hook.py index 04e4fbe8cdb05..0fa958875a762 100644 --- a/airflow/contrib/hooks/azure_container_instance_hook.py +++ b/airflow/contrib/hooks/azure_container_instance_hook.py @@ -1,17 +1,20 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import os diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py index 715e82d47d5de..2ca0bf5502691 100644 --- a/airflow/contrib/hooks/gcp_mlengine_hook.py +++ b/airflow/contrib/hooks/gcp_mlengine_hook.py @@ -1,18 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import random import time from googleapiclient.errors import HttpError diff --git a/airflow/contrib/hooks/grpc_hook.py b/airflow/contrib/hooks/grpc_hook.py index 18eaadd63167f..7ad4e596b7d44 100644 --- a/airflow/contrib/hooks/grpc_hook.py +++ b/airflow/contrib/hooks/grpc_hook.py @@ -1,17 +1,23 @@ # -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""GRPC Hook""" import grpc from google import auth as google_auth @@ -58,7 +64,7 @@ def get_conn(self): if auth_type == "NO_AUTH": channel = grpc.insecure_channel(base_url) - elif auth_type == "SSL" or auth_type == "TLS": + elif auth_type in {"SSL", "TLS"}: credential_file_name = self._get_field("credential_pem_file") creds = grpc.ssl_channel_credentials(open(credential_file_name).read()) channel = grpc.secure_channel(base_url, creds) @@ -91,7 +97,9 @@ def get_conn(self): return channel - def run(self, stub_class, call_func, streaming=False, data={}): + def run(self, stub_class, call_func, streaming=False, data=None): + if data is None: + data = {} with self.get_conn() as channel: stub = stub_class(channel) try: @@ -103,10 +111,14 @@ def run(self, stub_class, call_func, streaming=False, data={}): for single_response in response: yield single_response except grpc.RpcError as ex: + # noinspection PyUnresolvedReferences self.log.exception( "Error occurred when calling the grpc service: {0}, method: {1} \ status code: {2}, error details: {3}" - .format(stub.__class__.__name__, call_func, ex.code(), ex.details())) + .format(stub.__class__.__name__, + call_func, + ex.code(), # pylint: disable=no-member + ex.details())) # pylint: disable=no-member raise ex def _get_field(self, field_name, default=None): diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py index e3f95138e99f4..ef3afb7527933 100644 --- a/airflow/contrib/hooks/qubole_hook.py +++ b/airflow/contrib/hooks/qubole_hook.py @@ -17,13 +17,18 @@ # specific language governing permissions and limitations # under the License. # - +"""Qubole hook""" import os import time import datetime import six import re +from qds_sdk.qubole import Qubole +from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \ + PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \ + DbImportCommand, SqlCommand + from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook from airflow import configuration @@ -31,11 +36,6 @@ from airflow.utils.state import State from airflow.models import TaskInstance -from qds_sdk.qubole import Qubole -from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \ - PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \ - DbImportCommand, SqlCommand - COMMAND_CLASSES = { "hivecmd": HiveCommand, "prestocmd": PrestoCommand, @@ -57,20 +57,24 @@ def flatten_list(list_of_lists): + """Flatten the list""" return [element for array in list_of_lists for element in array] def filter_options(options): + """Remove options from the list""" options_to_remove = ["help", "print-logs-live", "print-logs"] return [option for option in options if option not in options_to_remove] def get_options_list(command_class): + """Get options list""" options_list = [option.get_opt_string().strip("--") for option in command_class.optparser.option_list] return filter_options(options_list) def build_command_args(): + """Build Command argument from command and options""" command_args, hyphen_args = {}, set() for cmd in COMMAND_CLASSES: @@ -95,7 +99,8 @@ def build_command_args(): class QuboleHook(BaseHook): - def __init__(self, *args, **kwargs): + """Hook for Qubole communication""" + def __init__(self, *args, **kwargs): # pylint: disable=unused-argument conn = self.get_connection(kwargs['qubole_conn_id']) Qubole.configure(api_token=conn.password, api_url=conn.host) self.task_id = kwargs['task_id'] @@ -107,6 +112,7 @@ def __init__(self, *args, **kwargs): @staticmethod def handle_failure_retry(context): + """Handle retries in case of failures""" ti = context['ti'] cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id) @@ -123,6 +129,7 @@ def handle_failure_retry(context): cmd.cancel() def execute(self, context): + """Execute call""" args = self.cls.parse(self.create_cmd_args(context)) self.cmd = self.cls.create(**args) self.task_instance = context['task_instance'] @@ -197,7 +204,7 @@ def get_log(self, ti): """ if self.cmd is None: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) - Command.get_log_id(self.cls, cmd_id) + Command.get_log_id(cmd_id) def get_jobs_id(self, ti): """ @@ -207,7 +214,7 @@ def get_jobs_id(self, ti): """ if self.cmd is None: cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id) - Command.get_jobs_id(self.cls, cmd_id) + Command.get_jobs_id(cmd_id) def get_extra_links(self, operator, dttm): """ @@ -229,6 +236,7 @@ def get_extra_links(self, operator, dttm): return url def create_cmd_args(self, context): + """Creates command arguments""" args = [] cmd_type = self.kwargs['command_type'] inplace_args = None @@ -247,10 +255,18 @@ def create_cmd_args(self, context): elif isinstance(v, (list, tuple)): for val in v: tags.add(val) + for key, value in self.kwargs.items(): + if key in COMMAND_ARGS[cmd_type]: + if key in HYPHEN_ARGS: + args.append("--{0}={1}".format(key.replace('_', '-'), value)) + elif key in positional_args_list: + inplace_args = value + elif key == 'tags': + self._add_tags(tags, value) else: - args.append("--{0}={1}".format(k, v)) + args.append("--{0}={1}".format(key, value)) - if k == 'notify' and v is True: + if key == 'notify' and value is True: args.append("--notify") args.append("--tags={0}".format(','.join(filter(None, tags)))) @@ -259,3 +275,10 @@ def create_cmd_args(self, context): args += inplace_args.split(' ') return args + + @staticmethod + def _add_tags(tags, value): + if isinstance(value, six.string_types): + tags.add(value) + elif isinstance(value, (list, tuple)): + tags.extend(value) diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index cc699e7104475..23eb95d59a1ce 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -14,12 +14,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Client for kubernetes communication""" from airflow.configuration import conf from six import PY2 try: from kubernetes import config, client - from kubernetes.client.rest import ApiException + from kubernetes.client.rest import ApiException # pylint: disable=unused-import has_kubernetes = True except ImportError as e: # We need an exception class to be able to use it in ``except`` elsewhere @@ -48,6 +49,18 @@ def _load_kube_config(in_cluster, cluster_context, config_file): def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), cluster_context=None, config_file=None): + """ + Retrieves Kubernetes client + + :param in_cluster: whether we are in cluster + :type in_cluster: bool + :param cluster_context: context of the cluster + :type cluster_context: str + :param config_file: configuration file + :type config_file: str + :return kubernetes client + :rtype client.CoreV1Api + """ if not in_cluster: if cluster_context is None: cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None) diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index ad59bcd560b19..6fc83ffaa5918 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -14,28 +14,34 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Launches PODs""" import json import time -import tenacity +from datetime import datetime as dt from typing import Tuple, Optional +from requests.exceptions import BaseHTTPError + +import tenacity + +from kubernetes import watch, client +from kubernetes.client.rest import ApiException +from kubernetes.stream import stream as kubernetes_stream + from airflow.settings import pod_mutation_hook from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State -from datetime import datetime as dt +from airflow import AirflowException + from airflow.contrib.kubernetes.pod import Pod from airflow.contrib.kubernetes.kubernetes_request_factory import \ pod_request_factory as pod_factory -from kubernetes import watch, client -from kubernetes.client.rest import ApiException -from kubernetes.stream import stream as kubernetes_stream -from airflow import AirflowException -from requests.exceptions import BaseHTTPError + from .kube_client import get_kube_client class PodStatus(object): + """Status of the PODs""" PENDING = 'pending' RUNNING = 'running' FAILED = 'failed' @@ -43,8 +49,20 @@ class PodStatus(object): class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None, in_cluster=True, cluster_context=None, + """Launches PODS""" + def __init__(self, + kube_client=None, + in_cluster=True, + cluster_context=None, extract_xcom=False): + """ + Creates the launcher. + + :param kube_client: kubernetes client + :param in_cluster: whether we are in cluster + :param cluster_context: context of the cluster + :param extract_xcom: whether we should extract xcom + """ super(PodLauncher, self).__init__() self._client = kube_client or get_kube_client(in_cluster=in_cluster, cluster_context=cluster_context) @@ -54,6 +72,7 @@ def __init__(self, kube_client=None, in_cluster=True, cluster_context=None, ) if extract_xcom else pod_factory.SimplePodRequestFactory() def run_pod_async(self, pod, **kwargs): + """Runs POD asynchronously""" pod_mutation_hook(pod) req = self.kube_req_factory.create(pod) @@ -67,6 +86,7 @@ def run_pod_async(self, pod, **kwargs): return resp def delete_pod(self, pod): + """Deletes POD""" try: self._client.delete_namespaced_pod( pod.name, pod.namespace, body=client.V1DeleteOptions()) @@ -124,14 +144,17 @@ def _task_status(self, event): return status def pod_not_started(self, pod): + """Tests if pod has not started""" state = self._task_status(self.read_pod(pod)) return state == State.QUEUED def pod_is_running(self, pod): + """Tests if pod is running""" state = self._task_status(self.read_pod(pod)) - return state != State.SUCCESS and state != State.FAILED + return state not in (State.SUCCESS, State.FAILED) def base_container_is_running(self, pod): + """Tests if base container is running""" event = self.read_pod(pod) status = next(iter(filter(lambda s: s.name == 'base', event.status.container_statuses)), None) @@ -143,7 +166,7 @@ def base_container_is_running(self, pod): reraise=True ) def read_pod_logs(self, pod): - + """Reads log from the POD""" try: return self._client.read_namespaced_pod_log( name=pod.name, @@ -164,6 +187,7 @@ def read_pod_logs(self, pod): reraise=True ) def read_pod(self, pod): + """Read POD information""" try: return self._client.read_namespaced_pod(pod.name, pod.namespace) except BaseHTTPError as e: @@ -199,8 +223,10 @@ def _exec_pod_command(self, resp, command): if resp.peek_stderr(): self.log.info(resp.read_stderr()) break + return None def process_status(self, job_id, status): + """Process status infomration for the JOB""" status = status.lower() if status == PodStatus.PENDING: return State.QUEUED diff --git a/airflow/contrib/operators/aws_sqs_publish_operator.py b/airflow/contrib/operators/aws_sqs_publish_operator.py index 3067d9ff8f95f..4e9476fd2aed2 100644 --- a/airflow/contrib/operators/aws_sqs_publish_operator.py +++ b/airflow/contrib/operators/aws_sqs_publish_operator.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# -# Licensed to the ApachMee Software Foundation (ASF) under one + +# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file @@ -17,6 +17,7 @@ # specific language governing permissions and limitations # under the License. +"""Publish message to SQS queue""" from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.contrib.hooks.aws_sqs_hook import SQSHook @@ -38,7 +39,6 @@ class SQSPublishOperator(BaseOperator): :param aws_conn_id: AWS connection id (default: aws_default) :type aws_conn_id: str """ - template_fields = ('sqs_queue', 'message_content', 'delay_seconds') ui_color = '#6ad3fa' diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 95acf0e10a7eb..49190fb82060f 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Executes task in a Kubernetes POD""" from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -23,7 +23,7 @@ from airflow.utils.state import State -class KubernetesPodOperator(BaseOperator): +class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes """ Execute a task in a Kubernetes Pod @@ -171,7 +171,7 @@ def _set_resources(self, resources): return inputResource @apply_defaults - def __init__(self, + def __init__(self, # pylint: disable=too-many-arguments,too-many-locals namespace, image, name, diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py index c7349b3872e9d..d3f36ef13bf38 100644 --- a/airflow/contrib/operators/mlengine_operator.py +++ b/airflow/contrib/operators/mlengine_operator.py @@ -1,18 +1,21 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import re from googleapiclient.errors import HttpError diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index 0fb7bc156b49e..6497646a85c0b 100644 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Qubole operator""" from typing import Iterable from airflow.models.baseoperator import BaseOperator, BaseOperatorLink @@ -25,7 +26,7 @@ class QDSLink(BaseOperatorLink): - + """Link to QDS""" name = 'Go to QDS' def get_link(self, operator, dttm): @@ -191,16 +192,19 @@ def on_kill(self, ti=None): self.get_hook().kill(ti) def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True): + """get_results from Qubole""" return self.get_hook().get_results(ti, fp, inline, delim, fetch) def get_log(self, ti): + """get_log from Qubole""" return self.get_hook().get_log(ti) def get_jobs_id(self, ti): + """get jobs_id from Qubole""" return self.get_hook().get_jobs_id(ti) def get_hook(self): - # Reinitiating the hook, as some template fields might have changed + """Reinitialising the hook, as some template fields might have changed""" return QuboleHook(*self.args, **self.kwargs) def __getattribute__(self, name): diff --git a/airflow/contrib/utils/mlengine_operator_utils.py b/airflow/contrib/utils/mlengine_operator_utils.py index c3ca8530dee6e..ea1e6b4a214fa 100644 --- a/airflow/contrib/utils/mlengine_operator_utils.py +++ b/airflow/contrib/utils/mlengine_operator_utils.py @@ -1,18 +1,21 @@ +# -*- coding: utf-8 -*- + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. import base64 import json diff --git a/airflow/contrib/utils/mlengine_prediction_summary.py b/airflow/contrib/utils/mlengine_prediction_summary.py index def793c1be001..1286ce7290c20 100644 --- a/airflow/contrib/utils/mlengine_prediction_summary.py +++ b/airflow/contrib/utils/mlengine_prediction_summary.py @@ -1,19 +1,22 @@ -# flake8: noqa: F841 -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the 'License'); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# flake8: noqa: F841 """A template called by DataFlowPythonOperator to summarize BatchPrediction. diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index 0e2251a0d22ba..e487ddcda47f7 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 315837b4f3722..cc256f5075f2b 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -19,6 +19,7 @@ # # Note: Any AirflowException raised is expected to cause the TaskInstance # to be marked in an ERROR state +"""Exceptions used by Airflow""" class AirflowException(Exception): @@ -40,11 +41,11 @@ class AirflowNotFoundException(AirflowException): class AirflowConfigException(AirflowException): - pass + """Raise when there is configuration problem""" class AirflowSensorTimeout(AirflowException): - pass + """Raise when there is a timeout on sensor polling""" class AirflowRescheduleException(AirflowException): @@ -52,73 +53,63 @@ class AirflowRescheduleException(AirflowException): Raise when the task should be re-scheduled at a later time. :param reschedule_date: The date when the task should be rescheduled - :type reschedule: datetime.datetime + :type reschedule_date: datetime.datetime """ def __init__(self, reschedule_date): self.reschedule_date = reschedule_date class AirflowTaskTimeout(AirflowException): - pass + """Raise when the task execution times-out""" class AirflowWebServerTimeout(AirflowException): - pass + """Raise when the web server times out""" class AirflowSkipException(AirflowException): - pass + """Raise when the task should be skipped""" class AirflowDagCycleException(AirflowException): - pass + """Raise when there is a cycle in Dag definition""" class DagNotFound(AirflowNotFoundException): """Raise when a DAG is not available in the system""" - pass class DagRunNotFound(AirflowNotFoundException): """Raise when a DAG Run is not available in the system""" - pass class DagRunAlreadyExists(AirflowBadRequest): """Raise when creating a DAG run for DAG which already has DAG run entry""" - pass class DagFileExists(AirflowBadRequest): """Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder""" - pass class TaskNotFound(AirflowNotFoundException): """Raise when a Task is not available in the system""" - pass class TaskInstanceNotFound(AirflowNotFoundException): """Raise when a Task Instance is not available in the system""" - pass class PoolNotFound(AirflowNotFoundException): """Raise when a Pool is not available in the system""" - pass class NoAvailablePoolSlot(AirflowException): """Raise when there is not enough slots in pool""" - pass class DagConcurrencyLimitReached(AirflowException): """Raise when DAG concurrency limit is reached""" - pass class TaskConcurrencyLimitReached(AirflowException): """Raise when task concurrency limit is reached""" - pass diff --git a/airflow/lineage/backend/__init__.py b/airflow/lineage/backend/__init__.py index 7913021a20ecb..d0c2ec5601880 100644 --- a/airflow/lineage/backend/__init__.py +++ b/airflow/lineage/backend/__init__.py @@ -16,9 +16,11 @@ # specific language governing permissions and limitations # under the License. # +"""Sends lineage metadata to a backend""" class LineageBackend(object): + """Sends lineage metadata to a backend""" def send_lineage(self, operator=None, inlets=None, outlets=None, context=None): """ diff --git a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py index b77e49d6574f1..c1e62a2cc2a5f 100644 --- a/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py +++ b/airflow/migrations/versions/33ae817a1ff4_add_kubernetes_resource_checkpointing.py @@ -44,11 +44,11 @@ def upgrade(): conn = op.get_bind() # alembic creates an invalid SQL for mssql and mysql dialects - if conn.dialect.name in ("mysql"): + if conn.dialect.name in {"mysql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id<>0", name="kube_resource_version_one_row_id") ) - elif conn.dialect.name not in ("mssql"): + elif conn.dialect.name not in {"mssql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id", name="kube_resource_version_one_row_id") ) diff --git a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py index 7e9e7fecaf34b..86ad47131e4a1 100644 --- a/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py +++ b/airflow/migrations/versions/86770d1215c0_add_kubernetes_scheduler_uniqueness.py @@ -45,11 +45,11 @@ def upgrade(): conn = op.get_bind() # alembic creates an invalid SQL for mssql and mysql dialects - if conn.dialect.name in ("mysql"): + if conn.dialect.name in {"mysql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id<>0", name="kube_worker_one_row_id") ) - elif conn.dialect.name not in ("mssql"): + elif conn.dialect.name not in {"mssql"}: columns_and_constraints.append( sa.CheckConstraint("one_row_id", name="kube_worker_one_row_id") ) diff --git a/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py index 3249a2e0589cb..560b763963dd6 100644 --- a/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py +++ b/airflow/migrations/versions/dd25f486b8ea_add_idx_log_dag.py @@ -15,9 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -from alembic import op - """add idx_log_dag Revision ID: dd25f486b8ea @@ -25,6 +22,7 @@ Create Date: 2018-08-07 06:41:41.028249 """ +from alembic import op # revision identifiers, used by Alembic. revision = 'dd25f486b8ea' diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 718d5718cb075..d85cc806153ea 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Airflow models""" from airflow.models.base import Base, ID_LEN # noqa: F401 from airflow.models.baseoperator import BaseOperator # noqa: F401 from airflow.models.connection import Connection # noqa: F401 diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index e41a8f308fa80..63c095d49679f 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -16,9 +16,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +Base operator for all operators. +""" from abc import ABCMeta, abstractmethod -from cached_property import cached_property import copy import functools import logging @@ -27,6 +28,8 @@ from datetime import timedelta, datetime from typing import Iterable, Optional, Dict, Callable, Set +from cached_property import cached_property + import jinja2 import six @@ -221,14 +224,14 @@ class derived from this one results in the creation of a task object, # Defines which files extensions to look for in the templated fields template_ext = [] # type: Iterable[str] # Defines the color in the UI - ui_color = '#fff' - ui_fgcolor = '#000' + ui_color = '#fff' # type str + ui_fgcolor = '#000' # type str # base list which includes all the attrs that don't need deep copy. _base_operator_shallow_copy_attrs = ('user_defined_macros', 'user_defined_filters', 'params', - '_log',) + '_log',) # type: Iterable[str] # each operator should override this class attr for shallow copy attrs. shallow_copy_attrs = () # type: Iterable[str] @@ -425,8 +428,8 @@ def __lt__(self, other): def __hash__(self): hash_components = [type(self)] - for c in self._comps: - val = getattr(self, c, None) + for component in self._comps: + val = getattr(self, component, None) try: hash(val) hash_components.append(val) @@ -520,6 +523,7 @@ def has_dag(self): @property def dag_id(self): + """Returns dag id if it has one or an adhoc + owner""" if self.has_dag(): return self.dag.dag_id else: @@ -552,6 +556,15 @@ def schedule_interval(self): @property def priority_weight_total(self): + """ + Total priority weight for the task. It might include all upstream or downstream tasks. + depending on the weight rule. + + - WeightRule.ABSOLUTE - only own weight + - WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks + - WeightRule.UPSTREAM - adds priority weight of all upstream tasks + + """ if self.weight_rule == WeightRule.ABSOLUTE: return self.priority_weight elif self.weight_rule == WeightRule.DOWNSTREAM: @@ -568,10 +581,12 @@ def priority_weight_total(self): @cached_property def operator_extra_link_dict(self): + """Returns dictionary of all extra links for the operator""" return {link.name: link for link in self.operator_extra_links} @cached_property def global_operator_extra_link_dict(self): + """Returns dictionary of all global extra links""" from airflow.plugins_manager import global_operator_extra_links return {link.name: link for link in global_operator_extra_links} @@ -619,7 +634,9 @@ def __deepcopy__(self, memo): result = cls.__new__(cls) memo[id(self)] = result - shallow_copy = cls.shallow_copy_attrs + cls._base_operator_shallow_copy_attrs + # noinspection PyProtectedMember + shallow_copy = cls.shallow_copy_attrs + \ + cls._base_operator_shallow_copy_attrs for k, v in list(self.__dict__.items()): if k not in shallow_copy: @@ -649,7 +666,7 @@ def render_template_from_field(self, attr, content, context, jinja_env): if isinstance(content, six.string_types): result = jinja_env.from_string(content).render(**context) elif isinstance(content, tuple): - if type(content) is not tuple: + if type(content) is not tuple: # pylint: disable=unidiomatic-typecheck # Special case for named tuples result = content.__class__(*(rt(attr, e, context) for e in content)) else: @@ -724,6 +741,7 @@ def upstream_list(self): @property def upstream_task_ids(self): + """@property: list of ids of tasks directly upstream""" return self._upstream_task_ids @property @@ -733,6 +751,7 @@ def downstream_list(self): @property def downstream_task_ids(self): + """@property: list of ids of tasks directly downstream""" return self._downstream_task_ids @provide_session @@ -827,14 +846,15 @@ def run( start_date = start_date or self.start_date end_date = end_date or self.end_date or timezone.utcnow() - for dt in self.dag.date_range(start_date, end_date=end_date): - TaskInstance(self, dt).run( + for execution_date in self.dag.date_range(start_date, end_date=end_date): + TaskInstance(self, execution_date).run( mark_success=mark_success, ignore_depends_on_past=( - dt == start_date and ignore_first_depends_on_past), + execution_date == start_date and ignore_first_depends_on_past), ignore_ti_state=ignore_ti_state) def dry_run(self): + """Performs dry run for the operator - just render template fields.""" self.log.info('Dry run') for attr in self.template_fields: content = getattr(self, attr) @@ -868,31 +888,35 @@ def __repr__(self): @property def task_type(self): + """@property: type of the task""" return self.__class__.__name__ def add_only_new(self, item_set, item): + """Adds only new items to item set""" if item in item_set: self.log.warning( - 'Dependency {self}, {item} already registered' - ''.format(self=self, item=item)) + 'Dependency %s, %s already registered', self, item) else: item_set.add(item) def _set_relatives(self, task_or_task_list, upstream=False): + """Sets relatives for the task.""" try: task_list = list(task_or_task_list) except TypeError: task_list = [task_or_task_list] - for t in task_list: - if not isinstance(t, BaseOperator): + for task in task_list: + if not isinstance(task, BaseOperator): raise AirflowException( "Relationships can only be set between " - "Operators; received {}".format(t.__class__.__name__)) + "Operators; received {}".format(task.__class__.__name__)) # relationships can only be set if the tasks share a single DAG. Tasks # without a DAG are assigned to that DAG. - dags = {t._dag.dag_id: t._dag for t in [self] + task_list if t.has_dag()} + dags = { + task._dag.dag_id: task._dag # pylint: disable=protected-access + for task in [self] + task_list if task.has_dag()} if len(dags) > 1: raise AirflowException( @@ -965,7 +989,7 @@ def xcom_pull( @cached_property def extra_links(self): - # type: () -> Iterable[str] + """@property: extra links for the task. """ return list(set(self.operator_extra_link_dict.keys()) .union(self.global_operator_extra_link_dict.keys())) @@ -984,6 +1008,8 @@ def get_extra_links(self, dttm, link_name): return self.operator_extra_link_dict[link_name].get_link(self, dttm) elif link_name in self.global_operator_extra_link_dict: return self.global_operator_extra_link_dict[link_name].get_link(self, dttm) + else: + return None class BaseOperatorLink: diff --git a/airflow/models/taskfail.py b/airflow/models/taskfail.py index ad0861a0485a4..3fb08b3cee000 100644 --- a/airflow/models/taskfail.py +++ b/airflow/models/taskfail.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""Taskfail tracks the failed run durations of each task instance""" from sqlalchemy import Column, Index, Integer, String from airflow.models.base import Base, ID_LEN diff --git a/airflow/models/taskreschedule.py b/airflow/models/taskreschedule.py index 4bdb6431f1ae4..ebca6ee4e4ee6 100644 --- a/airflow/models/taskreschedule.py +++ b/airflow/models/taskreschedule.py @@ -16,7 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""TaskReschedule tracks rescheduled task instances.""" from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc from airflow.models.base import Base, ID_LEN diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index e3cc3186802a3..e1acb5799405b 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -16,16 +16,19 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +""" +Implements Docker operator +""" import json +import ast +from docker import APIClient, tls + from airflow.hooks.docker_hook import DockerHook from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.utils.file import TemporaryDirectory -from docker import APIClient, tls -import ast class DockerOperator(BaseOperator): @@ -252,6 +255,12 @@ def execute(self, context): if self.xcom_all else str(line) def get_command(self): + """ + Retrieve command(s). if command string starts with [, it returns the command list) + + :return: the command (or commands) + :rtype: str | List[str] + """ if isinstance(self.command, str) and self.command.strip().find('[') == 0: commands = ast.literal_eval(self.command) else: @@ -266,11 +275,14 @@ def on_kill(self): def __get_tls_config(self): tls_config = None if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key: + # Ignore type error on SSL version here - it is deprecated and type annotation is wrong + # it should be string + # noinspection PyTypeChecker tls_config = tls.TLSConfig( ca_cert=self.tls_ca_cert, client_cert=(self.tls_client_cert, self.tls_client_key), verify=True, - ssl_version=self.tls_ssl_version, + ssl_version=self.tls_ssl_version, # type: ignore assert_hostname=self.tls_hostname ) self.docker_url = self.docker_url.replace('tcp://', 'https://') diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 3fc6aa345d79f..f072e193bbb65 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -1,4 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,6 +31,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +"""Kerberos security provider""" import socket import subprocess @@ -28,6 +46,13 @@ def renew_from_kt(principal, keytab): + """ + Renew kerberos token from keytab + + :param principal: principal + :param keytab: keytab file + :return: None + """ # The config is specified in seconds. But we ask for that same amount in # minutes to give ourselves a large renewal buffer. @@ -45,7 +70,7 @@ def renew_from_kt(principal, keytab): "-c", configuration.conf.get('kerberos', 'ccache'), # specify credentials cache cmd_principal ] - log.info("Reinitting kerberos from keytab: %s", " ".join(cmdv)) + log.info("Re-initialising kerberos from keytab: %s", " ".join(cmdv)) subp = subprocess.Popen(cmdv, stdout=subprocess.PIPE, @@ -72,6 +97,12 @@ def renew_from_kt(principal, keytab): def perform_krb181_workaround(principal): + """ + Workaround for Kerberos 1.8.1. + + :param principal: principal name + :return: None + """ cmdv = [configuration.conf.get('kerberos', 'kinit_path'), "-c", configuration.conf.get('kerberos', 'ccache'), "-R"] # Renew ticket_cache @@ -111,6 +142,13 @@ def detect_conf_var(): def run(principal, keytab): + """ + Run the kerbros renewer. + + :param principal: principal name + :param keytab: keytab file + :return: None + """ if not keytab: log.debug("Keytab renewer not starting, no keytab configured") sys.exit(0) diff --git a/airflow/security/utils.py b/airflow/security/utils.py index e10510e24eb71..4dc8a58d8c9c6 100644 --- a/airflow/security/utils.py +++ b/airflow/security/utils.py @@ -1,4 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,7 +32,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +"""Various security-related utils.""" import re import socket @@ -34,6 +51,7 @@ def get_components(principal): def replace_hostname_pattern(components, host=None): + """Replaces hostname with the right pattern including lowercase of the name.""" fqdn = host if not fqdn or fqdn == '0.0.0.0': fqdn = get_hostname() @@ -41,7 +59,7 @@ def replace_hostname_pattern(components, host=None): def get_fqdn(hostname_or_ip=None): - # Get hostname + """Retrieves FQDN - hostname for the IP or hostname.""" try: if hostname_or_ip: fqdn = socket.gethostbyaddr(hostname_or_ip)[0] @@ -56,6 +74,7 @@ def get_fqdn(hostname_or_ip=None): def principal_from_username(username, realm): + """Retrieves principal from the user name and realm.""" if ('@' not in username) and realm: username = "{}@{}".format(username, realm) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 111f26c279cf0..041b3b291abe8 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -16,9 +16,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +"""File logging handler for tasks.""" import logging import os +from typing import Optional + import requests from airflow import configuration as conf @@ -33,15 +35,12 @@ class FileTaskHandler(logging.Handler): task instance logs. It creates and delegates log handling to `logging.FileHandler` after receiving task instance context. It reads logs from task instance's host machine. + :param base_log_folder: Base log folder to place logs. + :param filename_template: template filename string """ - def __init__(self, base_log_folder, filename_template): - """ - :param base_log_folder: Base log folder to place logs. - :param filename_template: template filename string - """ super(FileTaskHandler, self).__init__() - self.handler = None + self.handler = None # type: Optional[logging.FileHandler] self.local_base = base_log_folder self.filename_template, self.filename_jinja_template = \ parse_template_string(filename_template) @@ -49,23 +48,25 @@ def __init__(self, base_log_folder, filename_template): def set_context(self, ti): """ Provide task_instance context to airflow task handler. + :param ti: task instance object """ local_loc = self._init_file(ti) self.handler = logging.FileHandler(local_loc) - self.handler.setFormatter(self.formatter) + if self.formatter: + self.handler.setFormatter(self.formatter) self.handler.setLevel(self.level) def emit(self, record): - if self.handler is not None: + if self.handler: self.handler.emit(record) def flush(self): - if self.handler is not None: + if self.handler: self.handler.flush() def close(self): - if self.handler is not None: + if self.handler: self.handler.close() def _render_filename(self, ti, try_number): @@ -99,9 +100,9 @@ def _read(self, ti, try_number, metadata=None): if os.path.exists(location): try: - with open(location) as f: + with open(location) as file: log += "*** Reading local file: {}\n".format(location) - log += "".join(f.readlines()) + log += "".join(file.readlines()) except Exception as e: log = "*** Failed to load local log file: {}\n".format(location) log += "*** {}\n".format(str(e)) @@ -159,13 +160,13 @@ def read(self, task_instance, try_number=None, metadata=None): try_numbers = [try_number] logs = [''] * len(try_numbers) - metadatas = [{}] * len(try_numbers) - for i, try_number in enumerate(try_numbers): - log, metadata = self._read(task_instance, try_number, metadata) + metadata_array = [{}] * len(try_numbers) + for i, try_number_element in enumerate(try_numbers): + log, metadata = self._read(task_instance, try_number_element, metadata) logs[i] += log - metadatas[i] = metadata + metadata_array[i] = metadata - return logs, metadatas + return logs, metadata_array def _init_file(self, ti): """ diff --git a/dev/airflow-jira b/dev/airflow-jira index 4a9833e7cb07b..7d5360bd154bf 100755 --- a/dev/airflow-jira +++ b/dev/airflow-jira @@ -1,21 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. # Utility for creating well-formed pull request merges and pushing them to # Apache. diff --git a/dev/airflow-license b/dev/airflow-license index b7c7ad6d80af7..d52d13a4a24cb 100755 --- a/dev/airflow-license +++ b/dev/airflow-license @@ -1,21 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. from __future__ import print_function import os diff --git a/dev/airflow-pr b/dev/airflow-pr index 9b499ec0c2a53..2e4cd5d98e0b3 100755 --- a/dev/airflow-pr +++ b/dev/airflow-pr @@ -1,21 +1,21 @@ #!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. # Utility for creating well-formed pull request merges and pushing them to # Apache. diff --git a/docs/conf.py b/docs/conf.py index a58d5ba27633c..05d22a70503d5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,6 +19,7 @@ # specific language governing permissions and limitations # under the License. + # Airflow documentation build configuration file, created by # sphinx-quickstart on Thu Oct 9 20:50:01 2014. # diff --git a/docs/exts/__init__.py b/docs/exts/__init__.py index 645848f26c3b3..c718e012dfa4d 100644 --- a/docs/exts/__init__.py +++ b/docs/exts/__init__.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # diff --git a/docs/exts/docroles.py b/docs/exts/docroles.py index 00f88f1eba2b2..14150f9a80c3c 100644 --- a/docs/exts/docroles.py +++ b/docs/exts/docroles.py @@ -1,27 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. # +"""Document roles""" +from functools import partial from docutils import nodes, utils from sphinx.ext.autodoc.importer import import_module, mock -from functools import partial class RoleException(Exception): - pass + """Exception for roles extension """ def get_template_field(env, fullname): @@ -55,7 +59,15 @@ def get_template_field(env, fullname): return list(template_fields) -def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, content=[]): +# noinspection PyUnusedLocal +def template_field_role(app, + typ, # pylint: disable=unused-argument + rawtext, + text, + lineno, + inliner, + options=None, # pylint: disable=unused-argument + content=None): # pylint: disable=unused-argument """ A role that allows you to include a list of template fields in the middle of the text. This is especially useful when writing guides describing how to use the operator. @@ -63,13 +75,18 @@ def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, co Sample usage:: - :template-fields:`airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeSentimentOperator` + :template-fields: + `airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeSentimentOperator` For further information look at: * [http://docutils.sourceforge.net/docs/howto/rst-roles.html](Creating reStructuredText Interpreted Text Roles) """ + if options is None: + options = {} + if content is None: + content = [] text = utils.unescape(text) try: @@ -89,7 +106,8 @@ def template_field_role(app, typ, rawtext, text, lineno, inliner, options={}, co def setup(app): - from docutils.parsers.rst import roles + """Sets the extension up""" + from docutils.parsers.rst import roles # pylint: disable=wrong-import-order roles.register_local_role("template-fields", partial(template_field_role, app)) return {"version": "builtin", "parallel_read_safe": True, "parallel_write_safe": True} diff --git a/docs/exts/exampleinclude.py b/docs/exts/exampleinclude.py index 79eadd641da82..e9316ec5f5015 100644 --- a/docs/exts/exampleinclude.py +++ b/docs/exts/exampleinclude.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,6 +19,8 @@ # specific language governing permissions and limitations # under the License. + +"""Nice formatted include for examples""" from os import path from docutils import nodes @@ -37,8 +38,10 @@ logger = logging.getLogger(__name__) -class example_header(nodes.reference, nodes.FixedTextElement): - pass +class ExampleHeader(nodes.reference, nodes.FixedTextElement): # pylint: disable=too-many-ancestors + """ + Header for examples. + """ class ExampleInclude(SphinxDirective): @@ -80,10 +83,10 @@ def run(self): document = self.state.document if not document.settings.file_insertion_enabled: return [document.reporter.warning("File insertion disabled", line=self.lineno)] - # convert options['diff'] to absolute path + # convert options['diff'] to absolute a_path if "diff" in self.options: - _, path = self.env.relfn2path(self.options["diff"]) - self.options["diff"] = path + _, a_path = self.env.relfn2path(self.options["diff"]) + self.options["diff"] = a_path try: location = self.state_machine.get_source_and_line(self.lineno) @@ -114,27 +117,38 @@ def run(self): extra_args["linenostart"] = reader.lineno_start container_node = nodes.container("", literal_block=True, classes=["example-block-wrapper"]) - container_node += example_header(filename=filename) + container_node += ExampleHeader(filename=filename) container_node += retnode retnode = container_node return [retnode] - except Exception as exc: + except Exception as exc: # pylint: disable=broad-except return [document.reporter.warning(text_type(exc), line=self.lineno)] +# noinspection PyProtectedMember def register_source(app, env, modname): + """ + Registers source code. + + :param app: application + :param env: environment of the plugin + :param modname: name of the module to load + :return: True if the code is registered successfully, False otherwise + """ entry = env._viewcode_modules.get(modname, None) # type: ignore if entry is False: print("[%s] Entry is false for " % modname) - return + return False code_tags = app.emit_firstresult("viewcode-find-source", modname) if code_tags is None: + # noinspection PyBroadException try: analyzer = ModuleAnalyzer.for_module(modname) - except Exception as ex: - logger.info("Module \"%s\" could not be loaded. Full source will not be available.", modname) + except Exception as ex: # pylint: disable=broad-except + logger.info("Module \"%s\" could not be loaded. Full source will not be available. \"%s\"", + modname, ex) env._viewcode_modules[modname] = False # type: ignore return False @@ -150,15 +164,21 @@ def register_source(app, env, modname): code, tags = code_tags if entry is None or entry[0] != code: - # print("Registeted", entry) - entry = code, tags, {}, "" env._viewcode_modules[modname] = entry # type: ignore return True -def create_node(app, env, relative_path, show_button): +def create_node(env, relative_path, show_button): + """ + Creates documentation node for example include. + + :param env: environment of the documentation + :param relative_path: path of the code + :param show_button: whether to show "view code" button + :return paragraph with the node + """ pagename = "_modules/" + relative_path[:-3] header_classes = ["example-header"] @@ -183,7 +203,17 @@ def create_node(app, env, relative_path, show_button): return paragraph +# noinspection PyProtectedMember def doctree_read(app, doctree): + """ + Reads documentation tree for the application and register sources in the generated documentation. + + :param app: application + :param doctree: documentation tree + + :return None + + """ env = app.builder.env if not hasattr(env, "_viewcode_modules"): env._viewcode_modules = {} # type: ignore @@ -191,19 +221,25 @@ def doctree_read(app, doctree): if app.builder.name == "singlehtml": return - for objnode in doctree.traverse(example_header): + for objnode in doctree.traverse(ExampleHeader): filepath = objnode.get("filename") relative_path = path.relpath( filepath, path.commonprefix([app.config.exampleinclude_sourceroot, filepath]) ) modname = relative_path.replace("/", ".")[:-3] show_button = register_source(app, env, modname) - onlynode = create_node(app, env, relative_path, show_button) + onlynode = create_node(env, relative_path, show_button) objnode.replace_self(onlynode) def setup(app): + """ + Sets the plugin up and returns configuration of the plugin. + + :param app: application. + :return json description of the configuration that is needed by the plugin. + """ directives.register_directive("exampleinclude", ExampleInclude) app.connect("doctree-read", doctree_read) app.add_config_value("exampleinclude_sourceroot", None, "env") diff --git a/docs/exts/removemarktransform.py b/docs/exts/removemarktransform.py index 01ec440d0e112..13d792257aa90 100644 --- a/docs/exts/removemarktransform.py +++ b/docs/exts/removemarktransform.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # flake8: noqa # Disable Flake8 because of all the sphinx imports # @@ -20,10 +19,12 @@ # specific language governing permissions and limitations # under the License. +"""Remove Transform Mark for Sphinx""" import re from docutils import nodes -from pygments.lexers import guess_lexer, Python3Lexer, PythonLexer +# noinspection PyUnresolvedReferences +from pygments.lexers import guess_lexer, Python3Lexer, PythonLexer # pylint: disable=no-name-in-module from sphinx.transforms import SphinxTransform from sphinx.transforms.post_transforms.code import TrimDoctestFlagsTransform @@ -51,7 +52,7 @@ def apply(self, **kwargs): @staticmethod def is_pycode(node): - # type: (nodes.literal_block) -> bool + """Checks if the node is literal block of python""" if node.rawsource != node.astext(): return False # skip parsed-literal node @@ -59,16 +60,18 @@ def is_pycode(node): if language in ("py", "py3", "python", "python3", "default"): return True elif language == "guess": + # noinspection PyBroadException try: lexer = guess_lexer(node.rawsource) - return isinstance(lexer, PythonLexer) or isinstance(lexer, Python3Lexer) - except Exception: + return isinstance(lexer, (PythonLexer, Python3Lexer)) + except Exception: # pylint: disable=broad-except pass return False def setup(app): + """Sets the transform up""" app.add_post_transform(TrimDocMarkerFlagsTransform) return {"version": "builtin", "parallel_read_safe": False, "parallel_write_safe": False} diff --git a/scripts/ci/ci_refresh_pylint_todo.sh b/scripts/ci/ci_refresh_pylint_todo.sh new file mode 100755 index 0000000000000..e7e62ae6c6d6e --- /dev/null +++ b/scripts/ci/ci_refresh_pylint_todo.sh @@ -0,0 +1,37 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -euo pipefail + +MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +export AIRFLOW_CI_SILENT=${AIRFLOW_CI_SILENT:="false"} +export ASSUME_QUIT_TO_ALL_QUESTIONS=${ASSUME_QUIT_TO_ALL_QUESTIONS:="true"} + +# shellcheck source=scripts/ci/_utils.sh +. "${MY_DIR}/_utils.sh" + +basic_sanity_checks + +script_start + +rebuild_ci_slim_image_if_needed + +refresh_pylint_todo + +script_end diff --git a/scripts/ci/in_container/_in_container_utils.sh b/scripts/ci/in_container/_in_container_utils.sh index 8a6ee7223f441..7eac280d2957b 100644 --- a/scripts/ci/in_container/_in_container_utils.sh +++ b/scripts/ci/in_container/_in_container_utils.sh @@ -121,4 +121,58 @@ function in_container_basic_sanity_check() { in_container_cleanup_pycache } +function in_container_refresh_pylint_todo() { + print_in_container_info + print_in_container_info "Refreshing list of all non-pylint compliant files. This can take some time." + print_in_container_info + + print_in_container_info + print_in_container_info "Finding list all non-pylint compliant files everywhere except 'tests' folder" + print_in_container_info + + # Using path -prune is much better in the local environment on OSX because we have host + # Files mounted and node_modules is a huge directory which takes many seconds to even scan + # -prune works better than -not path because it skips traversing the whole directory. -not path traverses + # the directory and only excludes it after all of it is scanned + find . \ + -path "./airflow/www/node_modules" -prune -o \ + -path "./airflow/www_rbac/node_modules" -prune -o \ + -path "./airflow/_vendor" -prune -o \ + -path "./airflow/migrations/versions" -prune -o \ + -path "./.eggs" -prune -o \ + -path "./docs/_build" -prune -o \ + -path "./build" -prune -o \ + -path "./tests" -prune -o \ + -name "*.py" \ + -not -name 'webserver_config.py' | \ + grep ".*.py$" | \ + xargs pylint | tee "${MY_DIR}/../pylint_todo_main.txt" + + grep -v "\*\*" < "${MY_DIR}/../pylint_todo_main.txt" | \ + grep -v "^$" | grep -v "\-\-\-" | grep -v "^Your code has been" | \ + awk 'FS=":" {print "./"$1}' | sort | uniq > "${MY_DIR}/../pylint_todo_new.txt" + + print_in_container_info + print_in_container_info "So far found $(wc -l <"${MY_DIR}/../pylint_todo_new.txt") files" + print_in_container_info + + print_in_container_info + print_in_container_info "Finding list of all non-pylint compliant files in 'tests' folder" + print_in_container_info + + find "./tests" -name "*.py" -print0 | \ + xargs -0 pylint --disable="${DISABLE_CHECKS_FOR_TESTS}" | tee "${MY_DIR}/../pylint_todo_tests.txt" + + grep -v "\*\*" < "${MY_DIR}/../pylint_todo_tests.txt" | \ + grep -v "^$" | grep -v "\-\-\-" | grep -v "^Your code has been" | \ + awk 'FS=":" {print "./"$1}' | sort | uniq >> "${MY_DIR}/../pylint_todo_new.txt" + + rm -fv "${MY_DIR}/../pylint_todo_main.txt" "${MY_DIR}/../pylint_todo_tests.txt" + mv -v "${MY_DIR}/../pylint_todo_new.txt" "${MY_DIR}/../pylint_todo.txt" + + print_in_container_info + print_in_container_info "Found $(wc -l <"${MY_DIR}/../pylint_todo.txt") files" + print_in_container_info +} + export DISABLE_CHECKS_FOR_TESTS="missing-docstring,no-self-use,too-many-public-methods,protected-access" diff --git a/scripts/ci/in_container/refresh_pylint_todo.sh b/scripts/ci/in_container/refresh_pylint_todo.sh new file mode 100755 index 0000000000000..f1e382455201a --- /dev/null +++ b/scripts/ci/in_container/refresh_pylint_todo.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Script to run Pylint on main code. Can be started from any working directory +set -uo pipefail + +MY_DIR=$(cd "$(dirname "$0")" || exit 1; pwd) + +# shellcheck source=scripts/ci/in_container/_in_container_utils.sh +. "${MY_DIR}/_in_container_utils.sh" + +in_container_basic_sanity_check + +in_container_script_start + +in_container_refresh_pylint_todo + +in_container_script_end diff --git a/setup.py b/setup.py index d125bf10ed08e..7126a8df2331e 100644 --- a/setup.py +++ b/setup.py @@ -16,10 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Setup for the Airflow library.""" - - -from setuptools import setup, find_packages, Command +"""Setup.py for the Airflow project.""" import imp import io @@ -29,6 +26,8 @@ import subprocess import unittest +from setuptools import setup, find_packages, Command + logger = logging.getLogger(__name__) # noinspection PyUnresolvedReferences diff --git a/tests/contrib/hooks/test_grpc_hook.py b/tests/contrib/hooks/test_grpc_hook.py index 24f2837992c41..222a31d55778d 100644 --- a/tests/contrib/hooks/test_grpc_hook.py +++ b/tests/contrib/hooks/test_grpc_hook.py @@ -1,16 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import unittest try: from StringIO import StringIO @@ -51,6 +56,7 @@ def get_airflow_connection_with_port(): ) +# noinspection PyMethodMayBeStatic,PyUnusedLocal class StubClass(object): def __init__(self, channel): pass @@ -67,6 +73,7 @@ def setUp(self): configuration.load_test_config() self.channel_mock = mock.patch('grpc.Channel').start() + # noinspection PyUnusedLocal def custom_conn_func(self, connection): mocked_channel = self.channel_mock.return_value return mocked_channel @@ -84,7 +91,7 @@ def test_no_auth_connection(self, mock_get_connection, mock_insecure_channel): expected_url = "test:8080" mock_insecure_channel.assert_called_once_with(expected_url) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('grpc.insecure_channel') @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @@ -99,7 +106,7 @@ def test_connection_with_port(self, mock_get_connection, mock_insecure_channel): expected_url = "test.com:1234" mock_insecure_channel.assert_called_once_with(expected_url) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.contrib.hooks.grpc_hook.open') @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @@ -131,7 +138,7 @@ def test_connection_with_ssl(self, expected_url, mock_credential_object ) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.contrib.hooks.grpc_hook.open') @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @@ -163,7 +170,7 @@ def test_connection_with_tls(self, expected_url, mock_credential_object ) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @mock.patch('google.auth.jwt.OnDemandCredentials.from_signing_credentials') @@ -194,7 +201,7 @@ def test_connection_with_jwt(self, None, expected_url ) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @mock.patch('google.auth.transport.requests.Request') @@ -226,7 +233,7 @@ def test_connection_with_google_oauth(self, "request", expected_url ) - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') def test_custom_connection(self, mock_get_connection): @@ -237,7 +244,7 @@ def test_custom_connection(self, mock_get_connection): channel = hook.get_conn() - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') def test_custom_connection_with_no_connection_func(self, mock_get_connection): @@ -273,7 +280,7 @@ def test_connection_with_interceptors(self, channel = hook.get_conn() - self.assertEquals(channel, mocked_channel) + self.assertEqual(channel, mocked_channel) mock_intercept_channel.assert_called_once_with(mocked_channel, "test1") @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @@ -289,7 +296,7 @@ def test_simple_run(self, mock_get_conn, mock_get_connection): response = hook.run(StubClass, "single_call", data={'data': 'hello'}) - self.assertEquals(next(response), "hello") + self.assertEqual(next(response), "hello") @mock.patch('airflow.hooks.base_hook.BaseHook.get_connection') @mock.patch('airflow.contrib.hooks.grpc_hook.GrpcHook.get_conn') @@ -304,4 +311,4 @@ def test_stream_run(self, mock_get_conn, mock_get_connection): response = hook.run(StubClass, "stream_call", data={'data': ['hello!', "hi"]}) - self.assertEquals(next(response), ["streaming", "call"]) + self.assertEqual(next(response), ["streaming", "call"]) diff --git a/tests/contrib/minikube/test_kubernetes_executor.py b/tests/contrib/minikube/test_kubernetes_executor.py index adf19a756fe80..eecdbf83c4cfc 100644 --- a/tests/contrib/minikube/test_kubernetes_executor.py +++ b/tests/contrib/minikube/test_kubernetes_executor.py @@ -17,15 +17,17 @@ import os +import re +import time import unittest + from subprocess import check_call, check_output import requests.exceptions import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -import time + import six -import re try: check_call(["/usr/local/bin/kubectl", "get", "pods"]) diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py index 154dc5332b34c..eba4b48b9a297 100644 --- a/tests/contrib/minikube/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -42,7 +42,8 @@ ) -class KubernetesPodOperatorTest(unittest.TestCase): +# pylint: disable=unused-argument +class TestKubernetesPodOperator(unittest.TestCase): @staticmethod def test_config_path_move(): diff --git a/tests/contrib/operators/test_aws_athena_operator.py b/tests/contrib/operators/test_aws_athena_operator.py index 01c4c9cf73237..866965650644b 100644 --- a/tests/contrib/operators/test_aws_athena_operator.py +++ b/tests/contrib/operators/test_aws_athena_operator.py @@ -1,21 +1,21 @@ # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + # import unittest @@ -48,6 +48,8 @@ } +# noinspection PyUnusedLocal +# pylint: disable=unused-argument class TestAWSAthenaOperator(unittest.TestCase): def setUp(self): @@ -145,6 +147,7 @@ def test_xcom_push_and_pull(self, mock_conn, mock_run_query, mock_check_query_st self.assertEqual(ti.xcom_pull(task_ids='test_aws_athena_operator'), ATHENA_QUERY_ID) +# pylint: enable=unused-argument if __name__ == '__main__': diff --git a/tests/contrib/operators/test_sagemaker_base_operator.py b/tests/contrib/operators/test_sagemaker_base_operator.py index 5ab7a614ae3b1..5d22c420613a2 100644 --- a/tests/contrib/operators/test_sagemaker_base_operator.py +++ b/tests/contrib/operators/test_sagemaker_base_operator.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. diff --git a/tests/contrib/operators/test_sagemaker_training_operator.py b/tests/contrib/operators/test_sagemaker_training_operator.py index 8d81c6147b486..c5cc0c67f148f 100644 --- a/tests/contrib/operators/test_sagemaker_training_operator.py +++ b/tests/contrib/operators/test_sagemaker_training_operator.py @@ -1,18 +1,18 @@ # -*- coding: utf-8 -*- -# + # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# 'License'); you may not use this file except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an -# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. @@ -80,6 +80,8 @@ } +# noinspection PyUnusedLocal +# pylint: disable=unused-argument class TestSageMakerTrainingOperator(unittest.TestCase): def setUp(self): @@ -122,6 +124,7 @@ def test_execute_with_failure(self, mock_training, mock_client): 'ResponseMetadata': {'HTTPStatusCode': 404}} self.assertRaises(AirflowException, self.sagemaker.execute, None) +# pylint: enable=unused-argument if __name__ == '__main__': diff --git a/tests/contrib/sensors/test_gcs_upload_session_sensor.py b/tests/contrib/sensors/test_gcs_upload_session_sensor.py index ec9ee77273254..75fa963db8ad6 100644 --- a/tests/contrib/sensors/test_gcs_upload_session_sensor.py +++ b/tests/contrib/sensors/test_gcs_upload_session_sensor.py @@ -1,11 +1,10 @@ # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE object +# or more contributor license agreements. See the NOTICE file # distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this object +# regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the -# "License"); you may not use this object except in compliance +# "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 @@ -16,7 +15,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# import unittest from tests.compat import mock diff --git a/tests/contrib/utils/gcp_authenticator.py b/tests/contrib/utils/gcp_authenticator.py index d17949050f7f9..2ea3b502f0cf1 100644 --- a/tests/contrib/utils/gcp_authenticator.py +++ b/tests/contrib/utils/gcp_authenticator.py @@ -19,6 +19,7 @@ import json import os import subprocess +from typing import Optional from airflow import settings, AirflowException from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor @@ -51,20 +52,16 @@ class GcpAuthenticator(LoggingCommandExecutor): """ - Manages authentication to Google Cloud Platform. It helps to manage - connection - it can authenticate with the gcp key name specified + Initialises the authenticator. + + :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values) + :param project_extra: optional extra project parameter passed to google cloud + connection """ - original_account = None + original_account = None # type: Optional[str] def __init__(self, gcp_key, project_extra=None): - """ - Initialises the authenticator. - - :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values) - :param project_extra: optional extra project parameter passed to google cloud - connection - """ super(GcpAuthenticator, self).__init__() self.gcp_key = gcp_key self.project_extra = project_extra diff --git a/tests/dags/test_clear_subdag.py b/tests/dags/test_clear_subdag.py index 467ed10a22e45..7385b686f676b 100644 --- a/tests/dags/test_clear_subdag.py +++ b/tests/dags/test_clear_subdag.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,7 +14,9 @@ # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations +# under the License. +# import datetime diff --git a/tests/dags/test_example_bash_operator.py b/tests/dags/test_example_bash_operator.py index 96a53aa921a52..92b45e7064ea3 100644 --- a/tests/dags/test_example_bash_operator.py +++ b/tests/dags/test_example_bash_operator.py @@ -16,13 +16,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from datetime import timedelta import airflow from builtins import range from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG -from datetime import timedelta args = { diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py index a3a5b449b8b34..8943a10978342 100644 --- a/tests/kubernetes/test_pod_launcher.py +++ b/tests/kubernetes/test_pod_launcher.py @@ -14,15 +14,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import unittest +import mock from requests.exceptions import BaseHTTPError from airflow import AirflowException from airflow.contrib.kubernetes.pod_launcher import PodLauncher -import unittest -import mock - class TestPodLauncher(unittest.TestCase): diff --git a/tests/operators/test_docker_operator.py b/tests/operators/test_docker_operator.py index a3b9792f9663a..b60f069d0d87e 100644 --- a/tests/operators/test_docker_operator.py +++ b/tests/operators/test_docker_operator.py @@ -20,6 +20,7 @@ import unittest import logging +from airflow.exceptions import AirflowException try: from airflow.operators.docker_operator import DockerOperator from airflow.hooks.docker_hook import DockerHook @@ -27,7 +28,6 @@ except ImportError: pass -from airflow.exceptions import AirflowException from tests.compat import mock @@ -124,7 +124,7 @@ def test_execute_unicode_logs(self, client_class_mock): client_class_mock.return_value = client_mock - originalRaiseExceptions = logging.raiseExceptions + originalRaiseExceptions = logging.raiseExceptions # pylint: disable=invalid-name logging.raiseExceptions = True operator = DockerOperator(image='ubuntu', owner='unittest', task_id='unittest') diff --git a/tests/utils/log/elasticmock/__init__.py b/tests/utils/log/elasticmock/__init__.py index 6d4ce51fa26ba..287963368c19c 100644 --- a/tests/utils/log/elasticmock/__init__.py +++ b/tests/utils/log/elasticmock/__init__.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -21,7 +38,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +"""Elastic mock module used for testing""" from functools import wraps from typing import Dict @@ -33,7 +50,8 @@ ELASTIC_INSTANCES = {} # type: Dict[str, FakeElasticsearch] -def _get_elasticmock(hosts=None, *args, **kwargs): +# noinspection PyUnusedLocal +def _get_elasticmock(hosts=None, *args, **kwargs): # pylint: disable=unused-argument host = _normalize_hosts(hosts)[0] elastic_key = '{0}:{1}'.format( host.get('host', 'localhost'), host.get('port', 9200) @@ -47,11 +65,12 @@ def _get_elasticmock(hosts=None, *args, **kwargs): return connection -def elasticmock(f): - @wraps(f) +def elasticmock(function): + """Elasticmock decorator""" + @wraps(function) def decorated(*args, **kwargs): ELASTIC_INSTANCES.clear() with patch('elasticsearch.Elasticsearch', _get_elasticmock): - result = f(*args, **kwargs) + result = function(*args, **kwargs) return result return decorated diff --git a/tests/utils/log/elasticmock/fake_elasticsearch.py b/tests/utils/log/elasticmock/fake_elasticsearch.py index f068ede0e5636..ffadb7953b196 100644 --- a/tests/utils/log/elasticmock/fake_elasticsearch.py +++ b/tests/utils/log/elasticmock/fake_elasticsearch.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -32,6 +49,7 @@ from .utilities import get_random_id +# noinspection PyShadowingBuiltins class FakeElasticsearch(Elasticsearch): __documents_dict = None @@ -102,15 +120,7 @@ def exists(self, index, doc_type, id, params=None): def get(self, index, id, doc_type='_all', params=None): result = None if index in self.__documents_dict: - for document in self.__documents_dict[index]: - if document.get('_id') == id: - if doc_type == '_all': - result = document - break - else: - if document.get('_type') == doc_type: - result = document - break + result = self.find_document(doc_type, id, index, result) if result: result['found'] = True @@ -125,6 +135,17 @@ def get(self, index, id, doc_type='_all', params=None): return result + def find_document(self, doc_type, id, index, result): + for document in self.__documents_dict[index]: + if document.get('_id') == id: + if doc_type == '_all': + result = document + break + elif document.get('_type') == doc_type: + result = document + break + return result + @query_params('_source', '_source_exclude', '_source_include', 'parent', 'preference', 'realtime', 'refresh', 'routing', 'version', 'version_type') @@ -173,7 +194,7 @@ def count(self, index=None, doc_type=None, body=None, params=None): def search(self, index=None, doc_type=None, body=None, params=None): searchable_indexes = self._normalize_index_to_list(index) - matches = self._find_match(index, doc_type, body, params) + matches = self._find_match(index, doc_type, body) result = { 'hits': { @@ -251,7 +272,7 @@ def suggest(self, body, index=None, params=None): ] return result_dict - def _find_match(self, index, doc_type, body, params=None): + def _find_match(self, index, doc_type, body): searchable_indexes = self._normalize_index_to_list(index) searchable_doc_types = self._normalize_doc_type_to_list(doc_type) @@ -259,23 +280,29 @@ def _find_match(self, index, doc_type, body, params=None): matches = [] for searchable_index in searchable_indexes: - for document in self.__documents_dict[searchable_index]: - if searchable_doc_types\ - and document.get('_type') not in searchable_doc_types: - continue - - if 'match_phrase' in must: - for query_id in must['match_phrase']: - query_val = must['match_phrase'][query_id] - if query_id in document['_source']: - if query_val in document['_source'][query_id]: - # use in as a proxy for match_phrase - matches.append(document) - else: - matches.append(document) + self.find_document_in_searchable_index(matches, must, searchable_doc_types, searchable_index) return matches + def find_document_in_searchable_index(self, matches, must, searchable_doc_types, searchable_index): + for document in self.__documents_dict[searchable_index]: + if searchable_doc_types and document.get('_type') not in searchable_doc_types: + continue + + if 'match_phrase' in must: + self.match_must_phrase(document, matches, must) + else: + matches.append(document) + + @staticmethod + def match_must_phrase(document, matches, must): + for query_id in must['match_phrase']: + query_val = must['match_phrase'][query_id] + if query_id in document['_source']: + if query_val in document['_source'][query_id]: + # use in as a proxy for match_phrase + matches.append(document) + def _normalize_index_to_list(self, index): # Ensure to have a list of index if index is None: diff --git a/tests/utils/log/elasticmock/utilities/__init__.py b/tests/utils/log/elasticmock/utilities/__init__.py index 19438ba1c5e46..d9abb4839fcc3 100644 --- a/tests/utils/log/elasticmock/utilities/__init__.py +++ b/tests/utils/log/elasticmock/utilities/__init__.py @@ -1,4 +1,21 @@ # -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # # The MIT License (MIT) # @@ -21,7 +38,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. - +"""Utilities for Elastic mock""" import random import string @@ -30,4 +47,5 @@ def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE): + """Returns random if for elasticsearch""" return ''.join(random.choice(CHARSET_FOR_ELASTICSEARCH_ID) for _ in range(size))