diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index f180664c0bf45..62a9b8b524d34 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -33,6 +33,7 @@ from urllib3.exceptions import HTTPError, ReadTimeoutError from airflow.configuration import conf +from airflow.contrib.kubernetes.istio import Istio from airflow.contrib.kubernetes.pod_launcher import PodLauncher from airflow.contrib.kubernetes.kube_client import get_kube_client from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration @@ -331,6 +332,7 @@ def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube self.watcher_queue = watcher_queue self.resource_version = resource_version self.kube_config = kube_config + self.istio = Istio(get_kube_client()) def run(self): """Performs watching""" @@ -383,6 +385,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): event=event, ) last_resource_version = task.metadata.resource_version + self.istio.handle_istio_proxy(task) return last_resource_version diff --git a/airflow/contrib/kubernetes/istio.py b/airflow/contrib/kubernetes/istio.py new file mode 100644 index 0000000000000..7c24306a42523 --- /dev/null +++ b/airflow/contrib/kubernetes/istio.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import AirflowException +from airflow.utils.log.logging_mixin import LoggingMixin +from kubernetes.stream import stream +try: + from packaging.version import parse as semantic_version +except ImportError: + # Python 2 + from distutils.version import LooseVersion as semantic_version # type: ignore + + +class SidecarNames: + """ Define strings that indicate container names + """ + ISTIO_PROXY = 'istio-proxy' + + +class Istio(LoggingMixin): + """ Handle all Istio-related logic + """ + + def __init__(self, kube_client): + super(Istio, self).__init__() + self._client = kube_client + + def handle_istio_proxy(self, pod): + """If an istio-proxy sidecar is detected, and all other containers + are terminated, then attempt to cleanly shutdown the sidecar. + If we detect a version of Istio before it's compatible with Kubernetes + Jobs, then raise an informative error message. + + Args: + pod (V1Pod): The pod which we are checking for the sidecar + + Returns: + (bool): True if we detect and exit istio-proxy, + False if we do not detect istio-proxy + + Raises: + AirflowException: if we find an istio-proxy, and we can't shut it down. + """ + if self._should_shutdown_istio_proxy(pod): + self.log.info("Detected that a task finished and needs " + + "an istio-proxy sidecar to be cleaned up. " + + "pod name: {}".format(pod.metadata.name)) + self._shutdown_istio_proxy(pod) + return True + return False + + def _should_shutdown_istio_proxy(self, pod): + """Look for an istio-proxy, and decide if it should be shutdown. + + Args: + pod (V1Pod): The pod which we are checking for the sidecar + + Returns: + (bool): True if we detect istio-proxy, and all other containers + are finished running, otherwise false + """ + if pod.status.phase != "Running": + return False + found_istio = False + for container_status in pod.status.container_statuses: + if container_status.name == SidecarNames.ISTIO_PROXY and \ + container_status.state.running: + found_istio = True + continue + if not container_status.state.terminated: + # Any state besides 'terminated' should be + # considered still busy + return False + # If we didn't find istio at all, then we should + # not shut it down. Also we should only shut it down + # if it has state "running". + return found_istio + + def _shutdown_istio_proxy(self, pod): + """Shutdown the istio-proxy on the provided pod + + Args: + pod (V1Pod): The pod which the container is in + + Returns: + None + + Raises: + AirflowException: if we find an istio-proxy, and we can't shut it down. + """ + for container in pod.spec.containers: + + # Skip unless it's a sidecar named as SidecarNames.ISTIO_PROXY. + if container.name != SidecarNames.ISTIO_PROXY: + continue + + # Check if supported version of istio-proxy. + # If we can't tell the version, proceed anyways. + if ":" in container.image: + _, tag = container.image.split(":") + if semantic_version(tag) < semantic_version("1.3.0-rc.0"): + raise AirflowException( + 'Please use istio version 1.3.0+ for KubernetesExecutor compatibility.' + + ' Detected version {}'.format(tag)) + + # Determine the istio-proxy statusPort, + # which is where /quitquitquit is implemented. + # Default to 15020. + status_port = "15020" + for i in range(len(container.args)): + arg = container.args[i] + if arg.strip() == "--statusPort": + status_port = container.args[i + 1].strip() + break + if arg.strip()[:13] == "--statusPort=": + status_port = arg.strip()[13:] + break + + self.log.info("Shutting down istio-proxy in pod {}".format(pod.metadata.name)) + self._post_quitquitquit(pod, container, status_port) + + def _post_quitquitquit(self, pod, container, status_port): + """ Send the curl to shutdown the isto-proxy container + """ + # Use exec to curl localhost inside of the sidecar. + _ = stream( + self._client.connect_get_namespaced_pod_exec, + pod.metadata.name, + pod.metadata.namespace, + tty=False, + stderr=True, + stdin=False, + stdout=True, + container=container.name, + command=[ + '/bin/sh', + '-c', + 'curl -XPOST http://127.0.0.1:{}/quitquitquit'.format(status_port)]) diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index 51ee1348586b3..014bac981d194 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -33,6 +33,7 @@ from airflow.utils.state import State from airflow import AirflowException +from airflow.contrib.kubernetes.istio import Istio from airflow.contrib.kubernetes.pod import Pod from airflow.contrib.kubernetes.kubernetes_request_factory import \ pod_request_factory as pod_factory @@ -48,6 +49,20 @@ class PodStatus(object): SUCCEEDED = 'succeeded' +class SleepConfig: + ''' Configure sleeps used for polling + ''' + # Only polls during the start of a pod + POD_STARTING_POLL = 1 + # Used to detect all cleanup jobs are completed + # and the entire Pod is cleaned up + POD_RUNNING_POLL = 1 + # Polls for the duration of the task execution + # to detect when the task is done. The difference + # between this and POD_RUNNING_POLL is sidecars. + BASE_CONTAINER_RUNNING_POLL = 2 + + class PodLauncher(LoggingMixin): """Launches PODS""" def __init__(self, @@ -70,6 +85,7 @@ def __init__(self, self.extract_xcom = extract_xcom self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory( ) if extract_xcom else pod_factory.SimplePodRequestFactory() + self.istio = Istio(self._client) def run_pod_async(self, pod, **kwargs): """Runs POD asynchronously""" @@ -111,7 +127,7 @@ def run_pod(self, pod, startup_timeout=120, get_logs=True): delta = dt.now() - curr_time if delta.total_seconds() >= startup_timeout: raise AirflowException("Pod took too long to start") - time.sleep(1) + time.sleep(SleepConfig.POD_STARTING_POLL) self.log.debug('Pod not yet started') return self._monitor_pod(pod, get_logs) @@ -124,16 +140,17 @@ def _monitor_pod(self, pod, get_logs): for line in logs: self.log.info(line) result = None + while self.base_container_is_running(pod): + self.log.info('Container %s has state %s', pod.name, State.RUNNING) + time.sleep(SleepConfig.BASE_CONTAINER_RUNNING_POLL) if self.extract_xcom: - while self.base_container_is_running(pod): - self.log.info('Container %s has state %s', pod.name, State.RUNNING) - time.sleep(2) result = self._extract_xcom(pod) self.log.info(result) result = json.loads(result) + self.istio.handle_istio_proxy(self.read_pod(pod)) while self.pod_is_running(pod): self.log.info('Pod %s has state %s', pod.name, State.RUNNING) - time.sleep(2) + time.sleep(SleepConfig.POD_RUNNING_POLL) return self._task_status(self.read_pod(pod)), result def _task_status(self, event): diff --git a/setup.py b/setup.py index 047cf6b85bd1b..fe8529f4878b0 100644 --- a/setup.py +++ b/setup.py @@ -300,6 +300,7 @@ def write_version(filename=os.path.join(*[my_dir, "airflow", "git_version"])): kubernetes = [ 'cryptography>=2.0.0', 'kubernetes>=3.0.0', + 'packaging>=19.1', ] ldap = [ 'ldap3>=2.5.1', diff --git a/tests/kubernetes/test_istio.py b/tests/kubernetes/test_istio.py new file mode 100644 index 0000000000000..8a7b34ee6b3e1 --- /dev/null +++ b/tests/kubernetes/test_istio.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow import AirflowException +from airflow.contrib.kubernetes.istio import Istio + +import unittest +from tests.compat import MagicMock, patch + + +def mock_stream(func, *args, **kwargs): + print('calling func') + return func(*args, **kwargs) + + +class TestIstio(unittest.TestCase): + + def setUp(self): + mock_kube_client = MagicMock() + self.istio = Istio(mock_kube_client) + + def _mock_pod(self, image="istio/proxyv2:1.3.0", args=[]): # noqa + sidecar = MagicMock() + sidecar.name = "istio-proxy" + sidecar.namespace = "fake-namespace" + sidecar.image = image + sidecar.args = args + pod = MagicMock() + pod.spec.containers = [sidecar] + pod.status.phase = "Running" + pod.metadata.name = "fake-pod-name" + pod.metadata.namespace = "fake-namespace" + container_status1 = MagicMock() + container_status1.name = "istio-proxy" + container_status1.state.running = True + container_status1.state.terminated = False + container_status2 = MagicMock() + container_status2.name = "base" + container_status2.state.running = False + container_status2.state.terminated = True + pod.status.container_statuses = [container_status1, + container_status2] + return pod + + def test_handle_istio_proxy_low_version(self): + pod = self._mock_pod(image="istio/proxyv2:1.2.9") + self.assertRaises(AirflowException, + self.istio.handle_istio_proxy, + pod) + + def _handle_istio_proxy_with_sidecar_args(self, args): + pod = self._mock_pod(args=args) + self.istio.handle_istio_proxy(pod) + + @patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream) + def test_handle_istio_proxy(self): + args = ["proxy", "sidecar", "--statusPort", "12345"] + self._handle_istio_proxy_with_sidecar_args(args) + self.istio._client.connect_get_namespaced_pod_exec.\ + assert_called_once_with( + 'fake-pod-name', + 'fake-namespace', + tty=False, + container='istio-proxy', + stderr=True, + stdin=False, + stdout=True, + command=['/bin/sh', + '-c', + 'curl -XPOST http://127.0.0.1:12345/quitquitquit']) + + @patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream) + def test_handle_istio_proxy_other_cli_format(self): + args = ["proxy", "sidecar", "--statusPort=12345"] + self._handle_istio_proxy_with_sidecar_args(args) + self.istio._client.connect_get_namespaced_pod_exec.\ + assert_called_once_with( + 'fake-pod-name', + 'fake-namespace', + tty=False, + container='istio-proxy', + stderr=True, + stdin=False, + stdout=True, + command=['/bin/sh', + '-c', + 'curl -XPOST http://127.0.0.1:12345/quitquitquit']) + + @patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream) + def test_handle_istio_proxy_no_cli_argument(self): + args = ["proxy", "sidecar"] + self._handle_istio_proxy_with_sidecar_args(args) + self.istio._client.connect_get_namespaced_pod_exec.\ + assert_called_once_with( + 'fake-pod-name', + 'fake-namespace', + tty=False, + container='istio-proxy', + stderr=True, + stdin=False, + stdout=True, + command=['/bin/sh', + '-c', + 'curl -XPOST http://127.0.0.1:15020/quitquitquit']) + + @patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream) + def test_handle_istio_with_no_sidecar(self): + pod = MagicMock() + pod.spec.containers = [] + self.istio.handle_istio_proxy(MagicMock()) + self.istio._client.connect_get_namespaced_pod_exec.\ + assert_not_called() + + +if __name__ == "__main__": + unittest.main()