Skip to content

Commit

Permalink
[AIRFLOW-5448] Handle istio-proxy for Kubernetes Pods (apache#62)
Browse files Browse the repository at this point in the history
Istio service mesh is not compatible by default with Kubernetes Jobs.
The normal behavior is that a Job will be started, get an istio-proxy
sidecar attached to it via the istio mutating webhook, run until
completion, then the 'main' container in the pod stops, but istio-proxy
hangs around indefinitely. This change handles cleanly exiting the
Istio sidecar 'istio-proxy' when a Kubernetes Executor task completes.

(cherry picked from commit 84fa48f)
  • Loading branch information
sjmiller609 authored and kaxil committed Apr 14, 2020
1 parent 832460a commit c288706
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 5 deletions.
3 changes: 3 additions & 0 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from urllib3.exceptions import HTTPError, ReadTimeoutError

from airflow.configuration import conf
from airflow.contrib.kubernetes.istio import Istio
from airflow.contrib.kubernetes.pod_launcher import PodLauncher
from airflow.contrib.kubernetes.kube_client import get_kube_client
from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration
Expand Down Expand Up @@ -331,6 +332,7 @@ def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube
self.watcher_queue = watcher_queue
self.resource_version = resource_version
self.kube_config = kube_config
self.istio = Istio(get_kube_client())

def run(self):
"""Performs watching"""
Expand Down Expand Up @@ -383,6 +385,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
event=event,
)
last_resource_version = task.metadata.resource_version
self.istio.handle_istio_proxy(task)

return last_resource_version

Expand Down
152 changes: 152 additions & 0 deletions airflow/contrib/kubernetes/istio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from kubernetes.stream import stream
try:
from packaging.version import parse as semantic_version
except ImportError:
# Python 2
from distutils.version import LooseVersion as semantic_version # type: ignore


class SidecarNames:
""" Define strings that indicate container names
"""
ISTIO_PROXY = 'istio-proxy'


class Istio(LoggingMixin):
""" Handle all Istio-related logic
"""

def __init__(self, kube_client):
super(Istio, self).__init__()
self._client = kube_client

def handle_istio_proxy(self, pod):
"""If an istio-proxy sidecar is detected, and all other containers
are terminated, then attempt to cleanly shutdown the sidecar.
If we detect a version of Istio before it's compatible with Kubernetes
Jobs, then raise an informative error message.
Args:
pod (V1Pod): The pod which we are checking for the sidecar
Returns:
(bool): True if we detect and exit istio-proxy,
False if we do not detect istio-proxy
Raises:
AirflowException: if we find an istio-proxy, and we can't shut it down.
"""
if self._should_shutdown_istio_proxy(pod):
self.log.info("Detected that a task finished and needs " +
"an istio-proxy sidecar to be cleaned up. " +
"pod name: {}".format(pod.metadata.name))
self._shutdown_istio_proxy(pod)
return True
return False

def _should_shutdown_istio_proxy(self, pod):
"""Look for an istio-proxy, and decide if it should be shutdown.
Args:
pod (V1Pod): The pod which we are checking for the sidecar
Returns:
(bool): True if we detect istio-proxy, and all other containers
are finished running, otherwise false
"""
if pod.status.phase != "Running":
return False
found_istio = False
for container_status in pod.status.container_statuses:
if container_status.name == SidecarNames.ISTIO_PROXY and \
container_status.state.running:
found_istio = True
continue
if not container_status.state.terminated:
# Any state besides 'terminated' should be
# considered still busy
return False
# If we didn't find istio at all, then we should
# not shut it down. Also we should only shut it down
# if it has state "running".
return found_istio

def _shutdown_istio_proxy(self, pod):
"""Shutdown the istio-proxy on the provided pod
Args:
pod (V1Pod): The pod which the container is in
Returns:
None
Raises:
AirflowException: if we find an istio-proxy, and we can't shut it down.
"""
for container in pod.spec.containers:

# Skip unless it's a sidecar named as SidecarNames.ISTIO_PROXY.
if container.name != SidecarNames.ISTIO_PROXY:
continue

# Check if supported version of istio-proxy.
# If we can't tell the version, proceed anyways.
if ":" in container.image:
_, tag = container.image.split(":")
if semantic_version(tag) < semantic_version("1.3.0-rc.0"):
raise AirflowException(
'Please use istio version 1.3.0+ for KubernetesExecutor compatibility.' +
' Detected version {}'.format(tag))

# Determine the istio-proxy statusPort,
# which is where /quitquitquit is implemented.
# Default to 15020.
status_port = "15020"
for i in range(len(container.args)):
arg = container.args[i]
if arg.strip() == "--statusPort":
status_port = container.args[i + 1].strip()
break
if arg.strip()[:13] == "--statusPort=":
status_port = arg.strip()[13:]
break

self.log.info("Shutting down istio-proxy in pod {}".format(pod.metadata.name))
self._post_quitquitquit(pod, container, status_port)

def _post_quitquitquit(self, pod, container, status_port):
""" Send the curl to shutdown the isto-proxy container
"""
# Use exec to curl localhost inside of the sidecar.
_ = stream(
self._client.connect_get_namespaced_pod_exec,
pod.metadata.name,
pod.metadata.namespace,
tty=False,
stderr=True,
stdin=False,
stdout=True,
container=container.name,
command=[
'/bin/sh',
'-c',
'curl -XPOST http://127.0.0.1:{}/quitquitquit'.format(status_port)])
27 changes: 22 additions & 5 deletions airflow/contrib/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airflow.utils.state import State
from airflow import AirflowException

from airflow.contrib.kubernetes.istio import Istio
from airflow.contrib.kubernetes.pod import Pod
from airflow.contrib.kubernetes.kubernetes_request_factory import \
pod_request_factory as pod_factory
Expand All @@ -48,6 +49,20 @@ class PodStatus(object):
SUCCEEDED = 'succeeded'


class SleepConfig:
''' Configure sleeps used for polling
'''
# Only polls during the start of a pod
POD_STARTING_POLL = 1
# Used to detect all cleanup jobs are completed
# and the entire Pod is cleaned up
POD_RUNNING_POLL = 1
# Polls for the duration of the task execution
# to detect when the task is done. The difference
# between this and POD_RUNNING_POLL is sidecars.
BASE_CONTAINER_RUNNING_POLL = 2


class PodLauncher(LoggingMixin):
"""Launches PODS"""
def __init__(self,
Expand All @@ -70,6 +85,7 @@ def __init__(self,
self.extract_xcom = extract_xcom
self.kube_req_factory = pod_factory.ExtractXcomPodRequestFactory(
) if extract_xcom else pod_factory.SimplePodRequestFactory()
self.istio = Istio(self._client)

def run_pod_async(self, pod, **kwargs):
"""Runs POD asynchronously"""
Expand Down Expand Up @@ -111,7 +127,7 @@ def run_pod(self, pod, startup_timeout=120, get_logs=True):
delta = dt.now() - curr_time
if delta.total_seconds() >= startup_timeout:
raise AirflowException("Pod took too long to start")
time.sleep(1)
time.sleep(SleepConfig.POD_STARTING_POLL)
self.log.debug('Pod not yet started')

return self._monitor_pod(pod, get_logs)
Expand All @@ -124,16 +140,17 @@ def _monitor_pod(self, pod, get_logs):
for line in logs:
self.log.info(line)
result = None
while self.base_container_is_running(pod):
self.log.info('Container %s has state %s', pod.name, State.RUNNING)
time.sleep(SleepConfig.BASE_CONTAINER_RUNNING_POLL)
if self.extract_xcom:
while self.base_container_is_running(pod):
self.log.info('Container %s has state %s', pod.name, State.RUNNING)
time.sleep(2)
result = self._extract_xcom(pod)
self.log.info(result)
result = json.loads(result)
self.istio.handle_istio_proxy(self.read_pod(pod))
while self.pod_is_running(pod):
self.log.info('Pod %s has state %s', pod.name, State.RUNNING)
time.sleep(2)
time.sleep(SleepConfig.POD_RUNNING_POLL)
return self._task_status(self.read_pod(pod)), result

def _task_status(self, event):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ def write_version(filename=os.path.join(*[my_dir, "airflow", "git_version"])):
kubernetes = [
'cryptography>=2.0.0',
'kubernetes>=3.0.0',
'packaging>=19.1',
]
ldap = [
'ldap3>=2.5.1',
Expand Down
130 changes: 130 additions & 0 deletions tests/kubernetes/test_istio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow import AirflowException
from airflow.contrib.kubernetes.istio import Istio

import unittest
from tests.compat import MagicMock, patch


def mock_stream(func, *args, **kwargs):
print('calling func')
return func(*args, **kwargs)


class TestIstio(unittest.TestCase):

def setUp(self):
mock_kube_client = MagicMock()
self.istio = Istio(mock_kube_client)

def _mock_pod(self, image="istio/proxyv2:1.3.0", args=[]): # noqa
sidecar = MagicMock()
sidecar.name = "istio-proxy"
sidecar.namespace = "fake-namespace"
sidecar.image = image
sidecar.args = args
pod = MagicMock()
pod.spec.containers = [sidecar]
pod.status.phase = "Running"
pod.metadata.name = "fake-pod-name"
pod.metadata.namespace = "fake-namespace"
container_status1 = MagicMock()
container_status1.name = "istio-proxy"
container_status1.state.running = True
container_status1.state.terminated = False
container_status2 = MagicMock()
container_status2.name = "base"
container_status2.state.running = False
container_status2.state.terminated = True
pod.status.container_statuses = [container_status1,
container_status2]
return pod

def test_handle_istio_proxy_low_version(self):
pod = self._mock_pod(image="istio/proxyv2:1.2.9")
self.assertRaises(AirflowException,
self.istio.handle_istio_proxy,
pod)

def _handle_istio_proxy_with_sidecar_args(self, args):
pod = self._mock_pod(args=args)
self.istio.handle_istio_proxy(pod)

@patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy(self):
args = ["proxy", "sidecar", "--statusPort", "12345"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.\
assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh',
'-c',
'curl -XPOST http://127.0.0.1:12345/quitquitquit'])

@patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy_other_cli_format(self):
args = ["proxy", "sidecar", "--statusPort=12345"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.\
assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh',
'-c',
'curl -XPOST http://127.0.0.1:12345/quitquitquit'])

@patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_proxy_no_cli_argument(self):
args = ["proxy", "sidecar"]
self._handle_istio_proxy_with_sidecar_args(args)
self.istio._client.connect_get_namespaced_pod_exec.\
assert_called_once_with(
'fake-pod-name',
'fake-namespace',
tty=False,
container='istio-proxy',
stderr=True,
stdin=False,
stdout=True,
command=['/bin/sh',
'-c',
'curl -XPOST http://127.0.0.1:15020/quitquitquit'])

@patch("airflow.contrib.kubernetes.istio.stream", new=mock_stream)
def test_handle_istio_with_no_sidecar(self):
pod = MagicMock()
pod.spec.containers = []
self.istio.handle_istio_proxy(MagicMock())
self.istio._client.connect_get_namespaced_pod_exec.\
assert_not_called()


if __name__ == "__main__":
unittest.main()

0 comments on commit c288706

Please sign in to comment.