Skip to content

Commit

Permalink
Fix issues with tf_job_gpu test (#241)
Browse files Browse the repository at this point in the history
The setup cluster step should wait for the TfJob operator deployment to
be ready.

Ensure that all exceptions result in a failure message being reported to Gubernator.

Upgrade and fix issues with Kubernetes py client 4.0.0; Fixes #242

Bugs with gpu_test Fix #240
  • Loading branch information
jlewi authored Dec 22, 2017
1 parent e108d55 commit 37af20d
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 16 deletions.
11 changes: 11 additions & 0 deletions py/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
from py import prow # pylint: disable=ungrouped-imports
from py import util # pylint: disable=ungrouped-imports
import requests
from retrying import retry

E2E_DAG = "tf_k8s_tests"

def is_retryable_error(exception):
"""Return True if we should retry; False otherwise"""
return isinstance(exception, requests.exceptions.ReadTimeout)

# Currently we don't use airflow/api/client because there's not actually much
# functionality in the client. So rather than take a dependency on airflow
# we just copied the code we need. We can revisit that if the Airflow client
Expand All @@ -40,6 +45,12 @@ def __init__(self, base_url, credentials=None, verify=True):
self._credentials = credentials
self._verify = verify


# Use exponential backoff with a max of 10 seconds between retries.
# Give up a after a minute.
@retry(retry_on_exception=is_retryable_error,
wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_delay=60*1000)
def _request(self, url, method="GET", json_body=None, timeout=10):
"""Request a given URL.
Expand Down
3 changes: 3 additions & 0 deletions py/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ def setup(args):
start = time.time()
util.run(["helm", "install", chart, "-n", "tf-job", "--wait", "--replace",
"--set", "rbac.install=true,cloud=gke"])
util.wait_for_deployment(api_client, "default", "tf-job-operator")
except subprocess.CalledProcessError as e:
t.failure = "helm install failed;\n" + e.output
except util.TimeoutError as e:
t.failure = e.message
finally:
t.time = time.time() - start
t.name = "helm-tfjob-install"
Expand Down
3 changes: 3 additions & 0 deletions py/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def run_test(args):
except util.TimeoutError:
t.failure = "Timeout waiting for {0} in namespace {1} to finish.".format(
name, namespace)
except Exception as e: # pylint: disable-msg=broad-except
# We want to catch all exceptions because we warm the test as failed.
t.failure = e.message
finally:
t.time = time.time() - start
if args.junit_path:
Expand Down
50 changes: 37 additions & 13 deletions py/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from googleapiclient import errors
from kubernetes import client as k8s_client
from kubernetes.config import kube_config
from kubernetes.client import configuration
from kubernetes.client import configuration as kubernetes_configuration
from kubernetes.client import rest

# Default name for the repo organization and name.
Expand Down Expand Up @@ -248,21 +248,35 @@ def configure_kubectl(project, zone, cluster_name):
run(["gcloud", "--project=" + project, "container",
"clusters", "--zone=" + zone, "get-credentials", cluster_name])

def wait_for_tiller_to_be_ready(api_client):
def wait_for_deployment(api_client, namespace, name):
"""Wait for deployment to be ready.
Args:
api_client: K8s api client to use.
namespace: The name space for the deployment.
name: The name of the deployment):
Raises:
TimeoutError: If timeout waiting for deployment to be ready.
"""
# Wait for tiller to be ready
end_time = datetime.datetime.now() + datetime.timedelta(minutes=2)

ext_client = k8s_client.ExtensionsV1beta1Api(api_client)

while datetime.datetime.now() < end_time:
deploy = ext_client.read_namespaced_deployment("tiller-deploy", "kube-system")
deploy = ext_client.read_namespaced_deployment(name, namespace)
if deploy.status.ready_replicas >= 1:
logging.info("tiller is ready")
logging.info("Deployment %s in namespace %s is ready", name, namespace)
return
logging.info("Waiting for tiller")
logging.info("Waiting for deployment %s in namespace %s", name, namespace)
time.sleep(10)

raise ValueError("Timeout waiting for tiller")
logging.error("Timeout waiting for deployment %s in namespace %s to be "
"ready", name, namespace)
raise TimeoutError(
"Timeout waiting for deployment {0} in namespace {1}".format(
name, namespace))

def install_gpu_drivers(api_client):
"""Install GPU drivers on the cluster.
Expand Down Expand Up @@ -372,7 +386,7 @@ def setup_cluster(api_client):

if use_gpus:
install_gpu_drivers(api_client)
wait_for_tiller_to_be_ready(api_client)
wait_for_deployment(api_client, "kube-system", "tiller-deploy")
if use_gpus:
wait_for_gpu_driver_install(api_client)

Expand All @@ -399,8 +413,12 @@ def _refresh_credentials():
# TODO(jlewi): This is a work around for
# https://github.com/kubernetes-incubator/client-python/issues/339.
# Consider getting rid of this and adopting the solution to that issue.
#
# This function is based on
# https://github.com/kubernetes-client/python-base/blob/master/config/kube_config.py#L331
# we modify it though so that we can pass through the function to get credentials.
def load_kube_config(config_file=None, context=None,
client_configuration=configuration,
client_configuration=None,
persist_config=True,
get_google_credentials=_refresh_credentials,
**kwargs):
Expand All @@ -426,9 +444,15 @@ def _save_kube_config(config_map):
yaml.safe_dump(config_map, f, default_flow_style=False)
config_persister = _save_kube_config

kube_config._get_kube_config_loader_for_yaml_file( # pylint: disable=protected-access
loader = kube_config._get_kube_config_loader_for_yaml_file( # pylint: disable=protected-access
config_file, active_context=context,
client_configuration=client_configuration,
config_persister=config_persister,
get_google_credentials=get_google_credentials,
**kwargs).load_and_set()
config_persister=config_persister,
get_google_credentials=get_google_credentials,
**kwargs)

if client_configuration is None:
config = type.__call__(kubernetes_configuration.Configuration)
loader.load_and_set(config)
kubernetes_configuration.Configuration.set_default(config)
else:
loader.load_and_set(client_configuration)
151 changes: 151 additions & 0 deletions py/util_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import print_function

from kubernetes import client as k8s_client
import mock
import unittest

from py import util

DEPLOYMENT_READY = """{
"apiVersion": "v1",
"items": [
{
"apiVersion": "extensions/v1beta1",
"kind": "Deployment",
"metadata": {
"annotations": {
"deployment.kubernetes.io/revision": "1"
},
"creationTimestamp": "2017-12-22T13:58:29Z",
"generation": 1,
"labels": {
"name": "tf-job-operator"
},
"name": "tf-job-operator",
"namespace": "default",
"resourceVersion": "666",
"selfLink": "/apis/extensions/v1beta1/namespaces/default/deployments/tf-job-operator",
"uid": "3060361a-e720-11e7-ada7-42010a8e00a9"
},
"spec": {
"replicas": 1,
"selector": {
"matchLabels": {
"name": "tf-job-operator"
}
},
"strategy": {
"rollingUpdate": {
"maxSurge": 1,
"maxUnavailable": 1
},
"type": "RollingUpdate"
},
"template": {
"metadata": {
"creationTimestamp": null,
"labels": {
"name": "tf-job-operator"
}
},
"spec": {
"containers": [
{
"command": [
"/opt/mlkube/tf_operator",
"--controller_config_file=/etc/config/controller_config_file.yaml",
"-alsologtostderr",
"-v=1"
],
"env": [
{
"name": "MY_POD_NAMESPACE",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.namespace"
}
}
},
{
"name": "MY_POD_NAME",
"valueFrom": {
"fieldRef": {
"apiVersion": "v1",
"fieldPath": "metadata.name"
}
}
}
],
"image": "gcr.io/mlkube-testing/tf_operator:v20171222-e108d55",
"imagePullPolicy": "IfNotPresent",
"name": "tf-job-operator",
"resources": {},
"terminationMessagePath": "/dev/termination-log",
"terminationMessagePolicy": "File",
"volumeMounts": [
{
"mountPath": "/etc/config",
"name": "config-volume"
}
]
}
],
"dnsPolicy": "ClusterFirst",
"restartPolicy": "Always",
"schedulerName": "default-scheduler",
"securityContext": {},
"serviceAccount": "tf-job-operator",
"serviceAccountName": "tf-job-operator",
"terminationGracePeriodSeconds": 30,
"volumes": [
{
"configMap": {
"defaultMode": 420,
"name": "tf-job-operator-config"
},
"name": "config-volume"
}
]
}
}
},
"status": {
"availableReplicas": 1,
"conditions": [
{
"lastTransitionTime": "2017-12-22T13:58:29Z",
"lastUpdateTime": "2017-12-22T13:58:29Z",
"message": "Deployment has minimum availability.",
"reason": "MinimumReplicasAvailable",
"status": "True",
"type": "Available"
}
],
"observedGeneration": 1,
"readyReplicas": 1,
"replicas": 1,
"updatedReplicas": 1
}
}
],
"kind": "List",
"metadata": {
"resourceVersion": "",
"selfLink": ""
}
}"""

class UtilTest(unittest.TestCase):
def test_wait_for_deployment(self):
api_client = mock.MagicMock(spec=k8s_client.ApiClient)

response = k8s_client.ExtensionsV1beta1Deployment()
response.status = k8s_client.ExtensionsV1beta1DeploymentStatus()
response.status.ready_replicas = 1
api_client.call_api.return_value = response
util.wait_for_deployment(api_client, "some-namespace", "some-deployment")


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion test-infra/airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ RUN set -ex \
&& git checkout f87d8aca93cf2c6df21bea7b13b6703d91f09865 \
&& python setup.py install \
&& pip install --upgrade six pyyaml google-api-python-client \
google-cloud-storage google-auth-httplib2 pylint kubernetes mock \
google-cloud-storage google-auth-httplib2 pylint kubernetes==4.0.0 mock retrying \
# Airflow 1.9.0rc02 requires funcsigs 1.0.0; newer versions cause airflow
# scheduler to complain; it looks like mock might pull in a newer version.
# so we force a particular version.
Expand Down
2 changes: 1 addition & 1 deletion test-infra/airflow/Dockerfile.prow
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
#
# TODO(jlewi) With Docker v17.0 we should just be able to use Docker
# build args to specify the base image.
FROM gcr.io/mlkube-testing/airflow:v20171221-19ff62e-e3b0c4
FROM gcr.io/mlkube-testing/airflow:v20171222-f2644ab-dirty-6078d0
ENTRYPOINT ["python", "-m", "py.airflow"]
2 changes: 1 addition & 1 deletion test-infra/airflow/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
spec:
containers:
- name: airflow
image: gcr.io/mlkube-testing/airflow:v20171221-19ff62e-e3b0c4
image: gcr.io/mlkube-testing/airflow:v20171222-f2644ab-dirty-6078d0
# What arguments do we need to set to use the LocalExecutor
command:
# Tells entrypoint.sh to start the webserver
Expand Down

0 comments on commit 37af20d

Please sign in to comment.