Skip to content

Commit

Permalink
[AIRFLOW-5256] Related pylint changes for common licences in python f…
Browse files Browse the repository at this point in the history
…iles
  • Loading branch information
potiuk committed Sep 17, 2019
1 parent dca766b commit eee6687
Show file tree
Hide file tree
Showing 74 changed files with 1,031 additions and 582 deletions.
14 changes: 12 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for shell files
name: Add licence for all shell files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [shell]
files: ^breeze$|^breeze-complete$|\.sh$
Expand All @@ -102,6 +102,16 @@ repos:
- --license-filepath
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for all python files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [python]
args:
- --comment-style
- "|#|"
- --license-filepath
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for all XML files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
Expand All @@ -113,7 +123,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for yaml files
name: Add licence for all yaml files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [yaml]
args:
Expand Down
6 changes: 5 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ as follows:
3) Fix all the issues reported by pylint
4) Re-run [scripts/ci/ci_pylint.sh](scripts/ci/ci_pylint.sh)
5) If you see "success" - submit PR following [Pull Request guidelines](#pull-request-guidelines)
6) You can refresh periodically [scripts/ci/pylint_todo.txt](scripts/ci/pylint_todo.txt) file.
You can do it by running
[scripts/ci/ci_refresh_pylint_todo.sh](scripts/ci/ci_refresh_pylint_todo.sh).
This can take quite some time (especially on MacOS)!

There are following guidelines when fixing pylint errors:
You can follow these guidelines when fixing pylint errors:

* Ideally fix the errors rather than disable pylint checks - often you can easily refactor the code
(IntelliJ/PyCharm might be helpful when extracting methods in complex code or moving methods around)
Expand Down
17 changes: 17 additions & 0 deletions airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#
# Copyright (c) 2013, Michael Komitee
# All rights reserved.
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Default celery configuration."""
import ssl

from airflow.configuration import conf
Expand Down
38 changes: 25 additions & 13 deletions airflow/contrib/hooks/grpc_hook.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
# -*- coding: utf-8 -*-

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""GRPC Hook"""

import grpc
from google import auth as google_auth
Expand Down Expand Up @@ -58,7 +64,7 @@ def get_conn(self):

if auth_type == "NO_AUTH":
channel = grpc.insecure_channel(base_url)
elif auth_type == "SSL" or auth_type == "TLS":
elif auth_type in {"SSL", "TLS"}:
credential_file_name = self._get_field("credential_pem_file")
creds = grpc.ssl_channel_credentials(open(credential_file_name).read())
channel = grpc.secure_channel(base_url, creds)
Expand Down Expand Up @@ -91,7 +97,9 @@ def get_conn(self):

return channel

def run(self, stub_class, call_func, streaming=False, data={}):
def run(self, stub_class, call_func, streaming=False, data=None):
if data is None:
data = {}
with self.get_conn() as channel:
stub = stub_class(channel)
try:
Expand All @@ -102,10 +110,14 @@ def run(self, stub_class, call_func, streaming=False, data={}):
else:
yield from response
except grpc.RpcError as ex:
# noinspection PyUnresolvedReferences
self.log.exception(
"Error occurred when calling the grpc service: {0}, method: {1} \
status code: {2}, error details: {3}"
.format(stub.__class__.__name__, call_func, ex.code(), ex.details()))
.format(stub.__class__.__name__,
call_func,
ex.code(), # pylint: disable=no-member
ex.details())) # pylint: disable=no-member
raise ex

def _get_field(self, field_name, default=None):
Expand Down
58 changes: 35 additions & 23 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@
# specific language governing permissions and limitations
# under the License.
#

"""Qubole hook"""
import os
import pathlib
import time
import datetime
import re

from qds_sdk.qubole import Qubole
from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \
PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \
DbImportCommand, SqlCommand

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.configuration import conf
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.models import TaskInstance

from qds_sdk.qubole import Qubole
from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \
PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \
DbImportCommand, SqlCommand

COMMAND_CLASSES = {
"hivecmd": HiveCommand,
"prestocmd": PrestoCommand,
Expand All @@ -57,20 +57,24 @@


def flatten_list(list_of_lists):
"""Flatten the list"""
return [element for array in list_of_lists for element in array]


def filter_options(options):
"""Remove options from the list"""
options_to_remove = ["help", "print-logs-live", "print-logs"]
return [option for option in options if option not in options_to_remove]


def get_options_list(command_class):
"""Get options list"""
options_list = [option.get_opt_string().strip("--") for option in command_class.optparser.option_list]
return filter_options(options_list)


def build_command_args():
"""Build Command argument from command and options"""
command_args, hyphen_args = {}, set()
for cmd in COMMAND_CLASSES:

Expand All @@ -95,7 +99,8 @@ def build_command_args():


class QuboleHook(BaseHook):
def __init__(self, *args, **kwargs):
"""Hook for Qubole communication"""
def __init__(self, *args, **kwargs): # pylint: disable=unused-argument
conn = self.get_connection(kwargs['qubole_conn_id'])
Qubole.configure(api_token=conn.password, api_url=conn.host)
self.task_id = kwargs['task_id']
Expand All @@ -107,6 +112,7 @@ def __init__(self, *args, **kwargs):

@staticmethod
def handle_failure_retry(context):
"""Handle retries in case of failures"""
ti = context['ti']
cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id)

Expand All @@ -123,6 +129,7 @@ def handle_failure_retry(context):
cmd.cancel()

def execute(self, context):
"""Execute call"""
args = self.cls.parse(self.create_cmd_args(context))
self.cmd = self.cls.create(**args)
self.task_instance = context['task_instance']
Expand Down Expand Up @@ -197,7 +204,7 @@ def get_log(self, ti):
"""
if self.cmd is None:
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
Command.get_log_id(self.cls, cmd_id)
Command.get_log_id(cmd_id)

def get_jobs_id(self, ti):
"""
Expand All @@ -207,8 +214,9 @@ def get_jobs_id(self, ti):
"""
if self.cmd is None:
cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=self.task_id)
Command.get_jobs_id(self.cls, cmd_id)
Command.get_jobs_id(cmd_id)

# noinspection PyMethodMayBeStatic
def get_extra_links(self, operator, dttm):
"""
Get link to qubole command result page.
Expand All @@ -229,28 +237,25 @@ def get_extra_links(self, operator, dttm):
return url

def create_cmd_args(self, context):
"""Creates command arguments"""
args = []
cmd_type = self.kwargs['command_type']
inplace_args = None
tags = {self.dag_id, self.task_id, context['run_id']}
positional_args_list = flatten_list(POSITIONAL_ARGS.values())

for k, v in self.kwargs.items():
if k in COMMAND_ARGS[cmd_type]:
if k in HYPHEN_ARGS:
args.append("--{0}={1}".format(k.replace('_', '-'), v))
elif k in positional_args_list:
inplace_args = v
elif k == 'tags':
if isinstance(v, str):
tags.add(v)
elif isinstance(v, (list, tuple)):
for val in v:
tags.add(val)
for key, value in self.kwargs.items():
if key in COMMAND_ARGS[cmd_type]:
if key in HYPHEN_ARGS:
args.append("--{0}={1}".format(key.replace('_', '-'), value))
elif key in positional_args_list:
inplace_args = value
elif key == 'tags':
self._add_tags(tags, value)
else:
args.append("--{0}={1}".format(k, v))
args.append("--{0}={1}".format(key, value))

if k == 'notify' and v is True:
if key == 'notify' and value is True:
args.append("--notify")

args.append("--tags={0}".format(','.join(filter(None, tags))))
Expand All @@ -259,3 +264,10 @@ def create_cmd_args(self, context):
args += inplace_args.split(' ')

return args

@staticmethod
def _add_tags(tags, value):
if isinstance(value, str):
tags.add(value)
elif isinstance(value, (list, tuple)):
tags.extend(value)
1 change: 0 additions & 1 deletion airflow/contrib/hooks/ssh_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from airflow.hooks.base_hook import BaseHook


# noinspection PyAbstractClass
class SSHHook(BaseHook):
"""
Hook for ssh remote execution using Paramiko.
Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/operators/aws_sqs_publish_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Licensed to the ApachMee Software Foundation (ASF) under one

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
Expand All @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.

"""Publish message to SQS queue"""
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.aws_sqs_hook import SQSHook
Expand All @@ -38,7 +39,6 @@ class SQSPublishOperator(BaseOperator):
:param aws_conn_id: AWS connection id (default: aws_default)
:type aws_conn_id: str
"""

template_fields = ('sqs_queue', 'message_content', 'delay_seconds')
ui_color = '#6ad3fa'

Expand Down
5 changes: 1 addition & 4 deletions airflow/contrib/operators/docker_swarm_operator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
'''
Run ephemeral Docker Swarm services
'''
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -19,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Run ephemeral Docker Swarm services"""

from docker import types

Expand Down
6 changes: 3 additions & 3 deletions airflow/contrib/operators/kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Executes task in a Kubernetes POD"""
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand All @@ -23,7 +23,7 @@
from airflow.utils.state import State


class KubernetesPodOperator(BaseOperator):
class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-attributes
"""
Execute a task in a Kubernetes Pod
Expand Down Expand Up @@ -172,7 +172,7 @@ def execute(self, context):
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))

@apply_defaults
def __init__(self,
def __init__(self, # pylint: disable=too-many-arguments,too-many-locals
namespace,
image,
name,
Expand Down
Loading

0 comments on commit eee6687

Please sign in to comment.