diff --git a/testing/tf_job_simple_test.py b/testing/tf_job_simple_test.py index 2acc39f1097..b21b17d8a42 100644 --- a/testing/tf_job_simple_test.py +++ b/testing/tf_job_simple_test.py @@ -23,7 +23,8 @@ import argparse import logging import os - +import re +import subprocess from kubeflow.testing import test_helper, util from retrying import retry @@ -67,12 +68,33 @@ def wait_for_tf_job(): raise Exception("Could not find services with label tf_job_name=mycnnjob") logging.info("Found services with label tf_job_name=mycnnjob") +@retry(stop_max_attempt_number=3) def test_tf_job_simple(test_case): # pylint: disable=redefined-outer-name args = parse_args() - util.run(["ks", "init", "tf-job-simple-app"]) + try: + util.run(["ks", "init", "tf-job-simple-app", "--skip-default-registries"]) + except subprocess.CalledProcessError as e: + # Keep going if the app already exists. This is a sign the a previous + # attempt failed and we are retrying. + if not re.search(".*already exists.*", e.output): + raise + os.chdir("tf-job-simple-app") - util.run(["ks", "registry", "add", "kubeflow", args.src_dir + "/kubeflow"]) - util.run(["ks", "pkg", "install", "kubeflow/examples"]) + try: + util.run(["ks", "registry", "add", "kubeflow", args.src_dir + "/kubeflow"]) + except subprocess.CalledProcessError as e: + # Keep going if the registry has already been added. + # This is a sign the a previous attempt failed and we are retrying. + if not re.search(".*already exists.*", e.output): + raise + + try: + util.run(["ks", "pkg", "install", "kubeflow/examples"]) + except subprocess.CalledProcessError as e: + # Keep going if the package has already been added. + # This is a sign the a previous attempt failed and we are retrying. + if not re.search(".*already exists.*", e.output): + raise if args.tf_job_version == "v1alpha2": prototype_name = "tf-job-simple" diff --git a/testing/vm_util.py b/testing/vm_util.py index 10ad3120bcb..5c42b74b384 100644 --- a/testing/vm_util.py +++ b/testing/vm_util.py @@ -3,6 +3,7 @@ import datetime import logging import os +import sock import ssl import subprocess import time @@ -45,6 +46,9 @@ def wait_for_operation(client, else: op = client.globalOperations().get(project=project, operation=op_id).execute() + except socket.Error as e: + logging.error("Ignoring error %s", e) + continue except ssl.SSLError as e: logging.error("Ignoring error %s", e) continue @@ -77,28 +81,28 @@ def wait_for_vm(project, zone, vm, timeout=datetime.timedelta(minutes=5), return except subprocess.CalledProcessError: pass - + if datetime.datetime.now() > endtime: raise util.TimeoutError( ("Timed out waiting for VM to {0} be sshable. Check firewall rules " "aren't blocking ssh.").format(vm)) time.sleep(polling_interval.total_seconds()) - + def execute(project, zone, vm, commands): """Execute the supplied commands on the VM.""" util.run(["gcloud", "compute", "--project=" + project, "ssh", "--zone=" + zone, vm, "--", " && ".join(commands)]) - + def execute_script(project, zone, vm, script): """Execute the specified script on the VM.""" target_path = os.path.join("/tmp", os.path.basename(script) + "." + uuid.uuid4().hex[0:4]) - + target = "{0}:{1}".format(vm, target_path) logging.info("Copying %s to %s", script, target) util.run(["gcloud", "compute", "--project=" + project, "scp", script, target, "--zone=" + zone]) - + util.run(["gcloud", "compute", "--project=" + project, "ssh", "--zone=" + zone, vm, "--", "chmod a+rx " + target_path + " && " + target_path])