Skip to content

Commit

Permalink
Enable the E2E tests for v1alpha2. (#667)
Browse files Browse the repository at this point in the history
* Fix #634

* Speedup the E2E test by running the build and setup cluster steps in parallel
  * To do this we split the setup step into two steps 1. setting up the cluster
    and 2. setting up Kubeflow.

    Fix #659

* Shorten the name of the workflow for v1alpha2
   * Otherwise the label for the workflow pod becomes too long and argo
     can't run it.

* Pin the test worker image so that we don't get broken when someone updates
  the latest image
  * Make it a parameter in the prow_config.yaml

* Use a file lock to ensure only one instance of test_runner is modifying
  the ksonnet app at a time; this should help with various test flakes.
  • Loading branch information
jlewi authored and k8s-ci-robot committed Jun 16, 2018
1 parent b2ac020 commit 7707ec0
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 121 deletions.
19 changes: 10 additions & 9 deletions prow_config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# This file configures the workflows to trigger in our Prow jobs.
# see kubeflow/testing/py/run_e2e_workflow.py
#
# The testWorkerImage should be the same for all workflows.
workflows:
- app_dir: kubeflow/tf-operator/test/workflows
component: workflows
name: tfjob-e2e
name: v1
params:
tfJobVersion: v1alpha1
# TODO*https://github.com/kubeflow/tf-operator/issues/634)
# Enable the v1alpha2 once the job successfully completes
# when master completes.
#- app_dir: kubeflow/tf-operator/test/workflows
# component: workflows
# name: tfjob-e2e-v1alpha2
# params:
# tfJobVersion: v1alpha2
testWorkerImage: gcr.io/kubeflow-ci/test-worker:v20180615-e6b2059-e3b0c4
- app_dir: kubeflow/tf-operator/test/workflows
component: workflows
name: v2
params:
tfJobVersion: v1alpha2
testWorkerImage: gcr.io/kubeflow-ci/test-worker:v20180615-e6b2059-e3b0c4
109 changes: 86 additions & 23 deletions py/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def ks_deploy(app_dir, component, params, env=None, account=None):
util.run(apply_command, cwd=app_dir)


def setup(args):
def setup_cluster(args):
"""Setup a GKE cluster for TensorFlow jobs.
Args:
Expand Down Expand Up @@ -144,6 +144,55 @@ def setup(args):
# Create an API client object to talk to the K8s master.
api_client = k8s_client.ApiClient()

t = test_util.TestCase()
try:
start = time.time()

account = util.run_and_output(
["gcloud", "config", "get-value", "account", "--quiet"]).strip()
logging.info("Using GCP account %s", account)
util.run([
"kubectl", "create", "clusterrolebinding", "default-admin",
"--clusterrole=cluster-admin", "--user=" + account
])

_setup_namespace(api_client, args.namespace)

# Setup GPUs.
util.setup_cluster(api_client)

# Reraise the exception so that the step fails because there's no point
# continuing the test.
except subprocess.CalledProcessError as e:
t.failure = "setup-cluster failed;\n" + (e.output or "")
raise
except util.TimeoutError as e:
t.failure = e.message
raise
finally:
t.time = time.time() - start
t.name = "setup-cluster"
t.class_name = "GKE"
gcs_client = storage.Client(project=args.project)
test_util.create_junit_xml_file([t], args.junit_path, gcs_client)


def setup_kubeflow(args):
"""Setup Kubeflow.
Args:
args: Command line arguments that control the setup process.
"""
project = args.project
cluster_name = args.cluster
zone = args.zone

util.configure_kubectl(project, zone, cluster_name)

util.load_kube_config()
# Create an API client object to talk to the K8s master.
api_client = k8s_client.ApiClient()

t = test_util.TestCase()
try:
start = time.time()
Expand All @@ -160,17 +209,9 @@ def setup(args):
account = util.run_and_output(
["gcloud", "config", "get-value", "account", "--quiet"]).strip()
logging.info("Using GCP account %s", account)
util.run([
"kubectl", "create", "clusterrolebinding", "default-admin",
"--clusterrole=cluster-admin", "--user=" + account
])

_setup_namespace(api_client, args.namespace)
ks_deploy(args.test_app_dir, component, params, account=account)

# Setup GPUs.
util.setup_cluster(api_client)

# Verify that the TfJob operator is actually deployed.
if args.tf_job_version == "v1alpha1":
tf_job_deployment_name = "tf-job-operator"
Expand All @@ -182,8 +223,18 @@ def setup(args):
logging.info("Verifying TfJob deployment %s started.",
tf_job_deployment_name)

# TODO(jlewi): We should verify the image of the operator is the correct.
util.wait_for_deployment(api_client, args.namespace, tf_job_deployment_name)
# TODO(jlewi): We should verify the image of the operator is the correct
# one.
try:
util.wait_for_deployment(api_client, args.namespace,
tf_job_deployment_name)
finally:
# Run kubectl describe to get useful information about the deployment.
# This will help troubleshoot any errors.
util.run(["kubectl", "-n", args.namespace, "describe", "deploy",
tf_job_deployment_name])
util.run(["kubectl", "-n", args.namespace, "describe", "pods", "-l",
"name=tf-job-operator"])

# Reraise the exception so that the step fails because there's no point
# continuing the test.
Expand All @@ -200,7 +251,6 @@ def setup(args):
gcs_client = storage.Client(project=args.project)
test_util.create_junit_xml_file([t], args.junit_path, gcs_client)


def teardown(args):
"""Teardown the resources."""
gke = discovery.build("container", "v1")
Expand Down Expand Up @@ -239,6 +289,8 @@ def main(): # pylint: disable=too-many-locals

util.maybe_activate_service_account()

now = datetime.datetime.now()

# create the top-level parser
parser = argparse.ArgumentParser(description="Setup clusters for testing.")
subparsers = parser.add_subparsers()
Expand All @@ -247,7 +299,7 @@ def main(): # pylint: disable=too-many-locals
# setup
#
parser_setup = subparsers.add_parser(
"setup", help="Setup a cluster for testing.")
"setup_cluster", help="Setup a cluster for testing.")

parser_setup.add_argument(
"--accelerator",
Expand All @@ -256,30 +308,41 @@ def main(): # pylint: disable=too-many-locals
help="Accelerator to add to the cluster. Should be of the form type=count.")

parser_setup.add_argument(
"--namespace",
default="kubeflow-" + now.strftime("%m%d-%H%M-") + uuid.uuid4().hex[0:4],
help="The directory containing the ksonnet app used for testing.",
)
parser_setup.set_defaults(func=setup_cluster)
add_common_args(parser_setup)

parser_kubeflow = subparsers.add_parser(
"setup_kubeflow", help="Deploy Kubeflow for testing.")

parser_kubeflow.add_argument(
"--tf_job_version",
dest="tf_job_version",
help="Which version of the TFJobOperator to deploy.")

parser_setup.set_defaults(func=setup)
add_common_args(parser_setup)
parser_kubeflow.set_defaults(func=setup_kubeflow)

parser_setup.add_argument(
"--test_app_dir",
help="The directory containing the ksonnet app used for testing.",
)

now = datetime.datetime.now()
parser_setup.add_argument(
parser_kubeflow.add_argument(
"--namespace",
default="kubeflow-" + now.strftime("%m%d-%H%M-") + uuid.uuid4().hex[0:4],
help="The directory containing the ksonnet app used for testing.",
)

parser_setup.add_argument(
parser_kubeflow.add_argument(
"--image",
help="The image to use",
)

add_common_args(parser_kubeflow)

parser_kubeflow.add_argument(
"--test_app_dir",
help="The directory containing the ksonnet app used for testing.",
)

#############################################################################
# teardown
#
Expand Down
121 changes: 83 additions & 38 deletions py/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import datetime
import filelock
import httplib
import logging
import json
Expand Down Expand Up @@ -164,6 +165,20 @@ def get_labels(name, runtime_id, replica_type=None, replica_index=None):
labels["task_index"] = replica_index
return labels

def get_labels_v1alpha2(namespace, name, replica_type=None,
replica_index=None):
"""Return labels.
"""
labels = {
"group_name": "kubeflow.org",
"tf_job_key": "{0}-{1}".format(namespace, name),
}
if replica_type:
labels["tf-replica-type"] = replica_type

if replica_index:
labels["tf-replica-index"] = replica_index
return labels

def to_selector(labels):
parts = []
Expand Down Expand Up @@ -302,7 +317,59 @@ def terminateReplica(masterHost, namespace, target, exitCode=0):

logging.info("URL %s returned; %s", url, r.content)

@retrying.retry
def _setup_ks_app(args):
"""Setup the ksonnet app"""
salt = uuid.uuid4().hex[0:4]

lock_file = os.path.join(args.app_dir, "app.lock")
logging.info("Acquiring lock on file: %s", lock_file)
lock = filelock.FileLock(lock_file, timeout=60)
with lock:
# Create a new environment for this run
if args.environment:
env = args.environment
else:
env = "test-env-{0}".format(salt)

name = None
namespace = None
for pair in args.params.split(","):
k, v = pair.split("=", 1)
if k == "name":
name = v

if k == "namespace":
namespace = v

if not name:
raise ValueError("name must be provided as a parameter.")

if not namespace:
raise ValueError("namespace must be provided as a parameter.")

try:
util.run(["ks", "env", "add", env, "--namespace=" + namespace],
cwd=args.app_dir)
except subprocess.CalledProcessError as e:
if not re.search(".*environment.*already exists.*", e.output):
raise

for pair in args.params.split(","):
k, v = pair.split("=", 1)
util.run(
["ks", "param", "set", "--env=" + env, args.component, k, v],
cwd=args.app_dir)

return namespace, name, env

return "", "", ""

# One of the reasons we set so many retries and a random amount of wait
# between retries is because we have multiple tests running in parallel
# that are all modifying the same ksonnet app via ks. I think this can
# lead to failures.
@retrying.retry(stop_max_attempt_number=10, wait_random_min=1000,
wait_random_max=10000)
def run_test(args): # pylint: disable=too-many-branches,too-many-statements
"""Run a test."""
gcs_client = storage.Client(project=args.project)
Expand All @@ -323,46 +390,15 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements

api_client = k8s_client.ApiClient()
masterHost = api_client.configuration.host
salt = uuid.uuid4().hex[0:4]

# Create a new environment for this run
env = "test-env-{0}".format(salt)

name = None
namespace = None
for pair in args.params.split(","):
k, v = pair.split("=", 1)
if k == "name":
name = v

if k == "namespace":
namespace = v

if not name:
raise ValueError("name must be provided as a parameter.")

if not namespace:
raise ValueError("namespace must be provided as a parameter.")

try:
util.run(["ks", "env", "add", env, "--namespace=" + namespace],
cwd=args.app_dir)
except subprocess.CalledProcessError as e:
if not re.search(".*environment.*already exists.*", e.output):
raise

for pair in args.params.split(","):
k, v = pair.split("=", 1)
util.run(
["ks", "param", "set", "--env=" + env, args.component, k, v],
cwd=args.app_dir)

t = test_util.TestCase()
t.class_name = "tfjob_test"
namespace, name, env = _setup_ks_app(args)
t.name = os.path.basename(name)

start = time.time()


try: # pylint: disable=too-many-nested-blocks
# We repeat the test multiple times.
# This ensures that if we delete the job we can create a new job with the
Expand All @@ -383,8 +419,9 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements
api_client, namespace, name, ["Running", "Done", "Failed"],
status_callback=tf_job_client.log_status)
else:
raise NotImplementedError("Need to implement logic to wait for "
"v1alpha2 job to start or finish")
results = tf_job_client.wait_for_condition(
api_client, namespace, name, ["Running", "Succeeded", "Failed"],
status_callback=tf_job_client.log_status)

logging.info("Current TFJob:\n %s", json.dumps(results, indent=2))

Expand All @@ -410,7 +447,8 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements
pod_selector = to_selector(pod_labels)
else:
target = "{name}-{replica}-0".format(name=name, replica=replica)
raise NotImplementedError("Need to set pod selector for v1alpha2.")
pod_labels = get_labels(namespace, name)
pod_selector = to_selector(pod_labels)

# Wait for the pods to be ready before we shutdown
# TODO(jlewi): We are get pods using a label selector so there is
Expand All @@ -427,7 +465,8 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements

logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client, namespace, name, args.tfjob_version, status_callback=tf_job_client.log_status)
api_client, namespace, name, args.tfjob_version,
status_callback=tf_job_client.log_status)

if args.tfjob_version == "v1alpha1":
if results.get("status", {}).get("state", {}).lower() != "succeeded":
Expand Down Expand Up @@ -577,6 +616,12 @@ def add_common_args(parser):
type=str,
help="The TFJob version to use.")

parser.add_argument(
"--environment",
default=None,
type=str,
help="(Optional) the name for the ksonnet environment; if not specified "
"a random one is created.")

def build_parser():
# create the top-level parser
Expand Down
Loading

0 comments on commit 7707ec0

Please sign in to comment.