Skip to content

Commit

Permalink
[AIRFLOW-5256] Related pylint changes for common licences in python f…
Browse files Browse the repository at this point in the history
…iles (#5786)

  (cherry picked from commit 4780105)
  • Loading branch information
potiuk committed Sep 18, 2019
1 parent 5034304 commit f8be014
Show file tree
Hide file tree
Showing 58 changed files with 848 additions and 370 deletions.
14 changes: 12 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for shell files
name: Add licence for all shell files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [shell]
files: ^breeze$|^breeze-complete$|\.sh$
Expand All @@ -102,6 +102,16 @@ repos:
- --license-filepath
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for all python files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [python]
args:
- --comment-style
- "|#|"
- --license-filepath
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for all XML files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
Expand All @@ -113,7 +123,7 @@ repos:
- license-templates/LICENSE.txt
- --fuzzy-match-generates-todo
- id: insert-license
name: Add licence for yaml files
name: Add licence for all yaml files
exclude: ^\.github/.*$"|^airflow/_vendor/.*$
types: [yaml]
args:
Expand Down
17 changes: 17 additions & 0 deletions airflow/api/auth/backend/kerberos_auth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#
# Copyright (c) 2013, Michael Komitee
# All rights reserved.
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Default celery configuration."""
import ssl

from airflow import configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# -*- coding: utf-8 -*-
#

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# 'License'); you may not use this file except in compliance
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Expand Down
45 changes: 31 additions & 14 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Kubernetes executor"""
import base64
import hashlib
from queue import Empty

import re
import json
import multiprocessing
from dateutil import parser
from uuid import uuid4

from dateutil import parser

import kubernetes
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
Expand All @@ -42,6 +44,9 @@
from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin

MAX_POD_ID_LEN = 253
MAX_LABEL_LEN = 63


class KubernetesExecutorConfig:
def __init__(self, image=None, image_pull_policy=None, request_memory=None,
Expand Down Expand Up @@ -125,6 +130,7 @@ def as_dict(self):


class KubeConfig:
"""Configuration for Kubernetes"""
core_section = 'core'
kubernetes_section = 'kubernetes'

Expand Down Expand Up @@ -280,8 +286,8 @@ def __init__(self):
# and only return a blank string if contexts are not set.
def _get_security_context_val(self, scontext):
val = configuration.get(self.kubernetes_section, scontext)
if len(val) == 0:
return val
if not val:
return 0
else:
return int(val)

Expand All @@ -308,7 +314,8 @@ def _validate(self):
'through ssh key, but not both')


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""
def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
multiprocessing.Process.__init__(self)
self.namespace = namespace
Expand All @@ -318,6 +325,7 @@ def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube
self.kube_config = kube_config

def run(self):
"""Performs watching"""
kube_client = get_kube_client()
while True:
try:
Expand Down Expand Up @@ -363,6 +371,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
return last_resource_version

def process_error(self, event):
"""Process error response"""
self.log.error(
'Encountered Error response from k8s list namespaced pod stream => %s',
event
Expand All @@ -371,16 +380,17 @@ def process_error(self, event):
if raw_object['code'] == 410:
self.log.info(
'Kubernetes resource version is too old, must reset to 0 => %s',
raw_object['message']
(raw_object['message'],)
)
# Return resource version 0
return '0'
raise AirflowException(
'Kubernetes failure for %s with code %s and message: %s',
raw_object['reason'], raw_object['code'], raw_object['message']
'Kubernetes failure for %s with code %s and message: %s' %
(raw_object['reason'], raw_object['code'], raw_object['message'])
)

def process_status(self, pod_id, status, labels, resource_version):
"""Process status response"""
if status == 'Pending':
self.log.info('Event: %s Pending', pod_id)
elif status == 'Failed':
Expand All @@ -399,6 +409,7 @@ def process_status(self, pod_id, status, labels, resource_version):


class AirflowKubernetesScheduler(LoggingMixin):
"""Airflow Scheduler for Kubernetes"""
def __init__(self, kube_config, task_queue, result_queue, kube_client, worker_uuid):
self.log.debug("Creating Kubernetes executor")
self.kube_config = kube_config
Expand Down Expand Up @@ -432,7 +443,6 @@ def _health_check_kube_watcher(self):

def run_next(self, next_job):
"""
The run_next command will check the task_queue for any un-run jobs.
It will then create a unique job-id, launch that job in the cluster,
and store relevant info in the current_jobs map so we can track the job's
Expand All @@ -457,6 +467,7 @@ def run_next(self, next_job):
self.log.debug("Kubernetes Job created!")

def delete_pod(self, pod_id):
"""Deletes POD"""
try:
self.kube_client.delete_namespaced_pod(
pod_id, self.namespace, body=client.V1DeleteOptions(),
Expand Down Expand Up @@ -487,6 +498,7 @@ def sync(self):
break

def process_watcher_task(self, task):
"""Process the task by watcher."""
pod_id, state, labels, resource_version = task
self.log.info(
'Attempting to finish pod; pod_id: %s; state: %s; labels: %s',
Expand Down Expand Up @@ -524,8 +536,6 @@ def _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid):
:param random_uuid: a uuid
:return: ``str`` valid Pod name of appropriate length
"""
MAX_POD_ID_LEN = 253

safe_key = safe_dag_id + safe_task_id

safe_pod_id = safe_key[:MAX_POD_ID_LEN - len(safe_uuid) - 1] + "-" + safe_uuid
Expand All @@ -543,8 +553,6 @@ def _make_safe_label_value(string):
way from the original value sent to this function, then we need to truncate to
53chars, and append it with a unique hash.
"""
MAX_LABEL_LEN = 63

safe_label = re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string)

if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
Expand Down Expand Up @@ -635,11 +643,13 @@ def _labels_to_key(self, labels):
return None

def terminate(self):
"""Termninates the watcher."""
self.watcher_queue.join()
self._manager.shutdown()


class KubernetesExecutor(BaseExecutor, LoggingMixin):
"""Executor for Kubernetes"""
def __init__(self):
self.kube_config = KubeConfig()
self.task_queue = None
Expand Down Expand Up @@ -676,6 +686,8 @@ def clear_not_launched_queued_tasks(self, session=None):
)

for task in queued_tasks:
# noinspection PyProtectedMember
# pylint: disable=protected-access
dict_string = (
"dag_id={},task_id={},execution_date={},airflow-worker={}".format(
AirflowKubernetesScheduler._make_safe_label_value(task.dag_id),
Expand All @@ -686,13 +698,14 @@ def clear_not_launched_queued_tasks(self, session=None):
self.worker_uuid
)
)
# pylint: enable=protected-access
kwargs = dict(label_selector=dict_string)
if self.kube_config.kube_client_request_args:
for key, value in self.kube_config.kube_client_request_args.items():
kwargs[key] = value
pod_list = self.kube_client.list_namespaced_pod(
self.kube_config.kube_namespace, **kwargs)
if len(pod_list.items) == 0:
if not pod_list.items:
self.log.info(
'TaskInstance: %s found in queued state but was not launched, '
'rescheduling', task
Expand Down Expand Up @@ -740,6 +753,7 @@ def _create_or_update_secret(secret_name, secret_path):
_create_or_update_secret(service_account['name'], service_account['path'])

def start(self):
"""Starts the executor"""
self.log.info('Start Kubernetes executor')
self.worker_uuid = KubeWorkerIdentifier.get_or_create_current_kube_worker_uuid()
self.log.debug('Start with worker_uuid: %s', self.worker_uuid)
Expand All @@ -759,6 +773,7 @@ def start(self):
self.clear_not_launched_queued_tasks()

def execute_async(self, key, command, queue=None, executor_config=None):
"""Executes task asynchronously"""
self.log.info(
'Add task %s with command %s with executor_config %s',
key, command, executor_config
Expand All @@ -767,6 +782,7 @@ def execute_async(self, key, command, queue=None, executor_config=None):
self.task_queue.put((key, command, kube_executor_config))

def sync(self):
"""Synchronize task state."""
if self.running:
self.log.debug('self.running: %s', self.running)
if self.queued_tasks:
Expand Down Expand Up @@ -825,6 +841,7 @@ def _change_state(self, key, state, pod_id):
self.event_buffer[key] = state

def end(self):
"""Called when the executor shuts down"""
self.log.info('Shutting down Kubernetes executor')
self.task_queue.join()
self.result_queue.join()
Expand Down
25 changes: 14 additions & 11 deletions airflow/contrib/hooks/azure_container_instance_hook.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os

Expand Down
28 changes: 15 additions & 13 deletions airflow/contrib/hooks/gcp_mlengine_hook.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import random
import time
from googleapiclient.errors import HttpError
Expand Down
Loading

0 comments on commit f8be014

Please sign in to comment.