diff --git a/py/build_and_push_image.py b/py/build_and_push_image.py index 7cc2f26175..813f331472 100644 --- a/py/build_and_push_image.py +++ b/py/build_and_push_image.py @@ -14,12 +14,12 @@ def GetGitHash(root_dir=None): # The image tag is based on the githash. - git_hash = subprocess.check_output( - ["git", "rev-parse", "--short", "HEAD"], cwd=root_dir).decode("utf-8") + git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"], + cwd=root_dir).decode("utf-8") git_hash = git_hash.strip() - modified_files = subprocess.check_output( - ["git", "ls-files", "--modified"], cwd=root_dir) + modified_files = subprocess.check_output(["git", "ls-files", "--modified"], + cwd=root_dir) untracked_files = subprocess.check_output( ["git", "ls-files", "--others", "--exclude-standard"], cwd=root_dir) if modified_files or untracked_files: diff --git a/py/cleanpod_policy_tests.py b/py/cleanpod_policy_tests.py index 59e4b6eb51..63bb4234a0 100644 --- a/py/cleanpod_policy_tests.py +++ b/py/cleanpod_policy_tests.py @@ -11,7 +11,9 @@ CLEANPOD_RUNNING_COMPONENT_NAME = "clean_pod_running" CLEANPOD_NONE_COMPONENT_NAME = "clean_pod_none" + class CleanPodPolicyTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -19,13 +21,15 @@ def __init__(self, args): self.namespace = namespace self.tfjob_version = args.tfjob_version self.params = args.params - super(CleanPodPolicyTests, self).__init__(class_name="CleanPodPolicyTests", name=name) + super(CleanPodPolicyTests, self).__init__( + class_name="CleanPodPolicyTests", name=name) def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy): api_client = k8s_client.ApiClient() # Setup the ksonnet app - ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params) + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -34,14 +38,20 @@ def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy): # Wait for the job to either be in Running state or a terminal state logging.info("Wait for conditions Running, Succeeded, or Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) # Wait for the job to complete. logging.info("Waiting for job to finish.") results = tf_job_client.wait_for_job( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -55,31 +65,37 @@ def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy): if clean_pod_policy == "All": pod_labels = tf_job_client.get_labels(self.name) pod_selector = tf_job_client.to_selector(pod_labels) - k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, pod_selector) + k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, + pod_selector) # Only running pods (PS) are deleted, completed pods are not. elif clean_pod_policy == "Running": - tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace, - self.name, "Chief", ["Completed"]) - tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace, - self.name, "Worker", ["Completed"]) + tf_job_client.wait_for_replica_type_in_phases( + api_client, self.namespace, self.name, "Chief", ["Succeeded"]) + tf_job_client.wait_for_replica_type_in_phases( + api_client, self.namespace, self.name, "Worker", ["Succeeded"]) pod_labels = tf_job_client.get_labels(self.name, "PS") pod_selector = tf_job_client.to_selector(pod_labels) - k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, pod_selector) + k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, + pod_selector) # No pods are deleted. elif clean_pod_policy == "None": - tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace, - self.name, "Chief", ["Completed"]) - tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace, - self.name, "Worker", ["Completed"]) - tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace, - self.name, "PS", ["Running"]) + tf_job_client.wait_for_replica_type_in_phases( + api_client, self.namespace, self.name, "Chief", ["Succeeded"]) + tf_job_client.wait_for_replica_type_in_phases( + api_client, self.namespace, self.name, "Worker", ["Succeeded"]) + tf_job_client.wait_for_replica_type_in_phases( + api_client, self.namespace, self.name, "PS", ["Running"]) # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) # Verify that all pods are deleted when the job completes. @@ -97,5 +113,6 @@ def test_cleanpod_none(self): return self.run_tfjob_with_cleanpod_policy( CLEANPOD_NONE_COMPONENT_NAME + "_" + self.tfjob_version, "None") + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/deploy.py b/py/deploy.py index 907d692fdb..74a9ba4c96 100755 --- a/py/deploy.py +++ b/py/deploy.py @@ -86,8 +86,8 @@ def ks_deploy(app_dir, component, params, env=None, account=None): raise for k, v in params.iteritems(): - util.run( - ["ks", "param", "set", "--env=" + env, component, k, v], cwd=app_dir) + util.run(["ks", "param", "set", "--env=" + env, component, k, v], + cwd=app_dir) apply_command = ["ks", "apply", env, "-c", component] if account: @@ -201,7 +201,7 @@ def setup_kubeflow(args): "tfJobImage": args.image, "name": "kubeflow-core", "namespace": args.namespace, - "tfJobVersion": args.tf_job_version, + "tfJobVersion": args.tf_job_version, } component = "core" @@ -231,10 +231,14 @@ def setup_kubeflow(args): finally: # Run kubectl describe to get useful information about the deployment. # This will help troubleshoot any errors. - util.run(["kubectl", "-n", args.namespace, "describe", "deploy", - tf_job_deployment_name]) - util.run(["kubectl", "-n", args.namespace, "describe", "pods", "-l", - "name=tf-job-operator"]) + util.run([ + "kubectl", "-n", args.namespace, "describe", "deploy", + tf_job_deployment_name + ]) + util.run([ + "kubectl", "-n", args.namespace, "describe", "pods", "-l", + "name=tf-job-operator" + ]) # Reraise the exception so that the step fails because there's no point # continuing the test. @@ -251,6 +255,7 @@ def setup_kubeflow(args): gcs_client = storage.Client(project=args.project) test_util.create_junit_xml_file([t], args.junit_path, gcs_client) + def teardown(args): """Teardown the resources.""" gke = discovery.build("container", "v1") diff --git a/py/distributed_training_tests.py b/py/distributed_training_tests.py index 3e8127ebfb..343ba61f5c 100644 --- a/py/distributed_training_tests.py +++ b/py/distributed_training_tests.py @@ -8,7 +8,9 @@ TFJOB_COMPONENT_NAME = "distributed_training" + class DistributedTrainingJobTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -25,7 +27,8 @@ def run_distributed_training_job(self, component): api_client = k8s_client.ApiClient() # Setup the ksonnet app - ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params) + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -34,14 +37,20 @@ def run_distributed_training_job(self, component): # Wait for the job to either be in Running state or a terminal state logging.info("Wait for conditions Running, Succeeded, or Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) # Wait for the job to complete. logging.info("Waiting for job to finish.") results = tf_job_client.wait_for_job( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -58,17 +67,23 @@ def run_distributed_training_job(self, component): logging.warning(creation_failures) # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) # Run a distributed training TFJob, wait for it to complete, and check for pod/service # creation errors. def test_distributed_training_independent_worker(self): - self.run_distributed_training_job(TFJOB_COMPONENT_NAME + "_" + self.tfjob_version) + self.run_distributed_training_job(TFJOB_COMPONENT_NAME + "_" + + self.tfjob_version) + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/estimator_runconfig_tests.py b/py/estimator_runconfig_tests.py index 44a0472fc0..fa6169c4b2 100644 --- a/py/estimator_runconfig_tests.py +++ b/py/estimator_runconfig_tests.py @@ -10,6 +10,7 @@ COMPONENT_NAME = "estimator_runconfig" + def get_runconfig(master_host, namespace, target): """Issue a request to get the runconfig of the specified replica running test_server. Args: @@ -17,10 +18,13 @@ def get_runconfig(master_host, namespace, target): namespace: The namespace target: The K8s service corresponding to the pod to call. """ - response = tf_operator_util.send_request(master_host, namespace, target, "runconfig", {}) + response = tf_operator_util.send_request(master_host, namespace, target, + "runconfig", {}) return yaml.load(response) -def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_workers): + +def verify_runconfig(master_host, namespace, job_name, replica, num_ps, + num_workers): """Verifies that the TF RunConfig on the specified replica is the same as expected. Args: master_host: The IP address of the master e.g. https://35.188.37.10 @@ -46,7 +50,8 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work ps_list.append("{name}-ps-{index}:2222".format(name=job_name, index=i)) worker_list = [] for i in range(num_workers): - worker_list.append("{name}-worker-{index}:2222".format(name=job_name, index=i)) + worker_list.append("{name}-worker-{index}:2222".format( + name=job_name, index=i)) cluster_spec = { "chief": chief_list, "ps": ps_list, @@ -54,7 +59,8 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work } for i in range(num_replicas): - full_target = "{name}-{replica}-{index}".format(name=job_name, replica=replica.lower(), index=i) + full_target = "{name}-{replica}-{index}".format( + name=job_name, replica=replica.lower(), index=i) actual_config = get_runconfig(master_host, namespace, full_target) expected_config = { "task_type": replica, @@ -62,7 +68,7 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work "cluster_spec": cluster_spec, "is_chief": is_chief, "master": "grpc://{target}:2222".format(target=full_target), - "num_worker_replicas": num_workers + 1, # Chief is also a worker + "num_worker_replicas": num_workers + 1, # Chief is also a worker "num_ps_replicas": num_ps, } # Compare expected and actual configs @@ -74,6 +80,7 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work class EstimatorRunconfigTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -81,7 +88,8 @@ def __init__(self, args): self.namespace = namespace self.tfjob_version = args.tfjob_version self.params = args.params - super(EstimatorRunconfigTests, self).__init__(class_name="EstimatorRunconfigTests", name=name) + super(EstimatorRunconfigTests, self).__init__( + class_name="EstimatorRunconfigTests", name=name) # Run a TFJob, verify that the TensorFlow runconfig specs are set correctly. def test_tfjob_and_verify_runconfig(self): @@ -90,7 +98,8 @@ def test_tfjob_and_verify_runconfig(self): component = COMPONENT_NAME + "_" + self.tfjob_version # Setup the ksonnet app - ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params) + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -99,24 +108,34 @@ def test_tfjob_and_verify_runconfig(self): # Wait for the job to either be in Running state or a terminal state logging.info("Wait for conditions Running, Succeeded, or Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) - num_ps = results.get("spec", {}).get("tfReplicaSpecs", {}).get( - "PS", {}).get("replicas", 0) + num_ps = results.get("spec", {}).get("tfReplicaSpecs", + {}).get("PS", {}).get("replicas", 0) num_workers = results.get("spec", {}).get("tfReplicaSpecs", {}).get( "Worker", {}).get("replicas", 0) - verify_runconfig(masterHost, self.namespace, self.name, "chief", num_ps, num_workers) - verify_runconfig(masterHost, self.namespace, self.name, "worker", num_ps, num_workers) - verify_runconfig(masterHost, self.namespace, self.name, "ps", num_ps, num_workers) + verify_runconfig(masterHost, self.namespace, self.name, "chief", num_ps, + num_workers) + verify_runconfig(masterHost, self.namespace, self.name, "worker", num_ps, + num_workers) + verify_runconfig(masterHost, self.namespace, self.name, "ps", num_ps, + num_workers) - tf_job_client.terminate_replicas(api_client, self.namespace, self.name, "chief", 1) + tf_job_client.terminate_replicas(api_client, self.namespace, self.name, + "chief", 1) # Wait for the job to complete. logging.info("Waiting for job to finish.") results = tf_job_client.wait_for_job( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -126,12 +145,17 @@ def test_tfjob_and_verify_runconfig(self): logging.error(self.failure) # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/invalid_tfjob_tests.py b/py/invalid_tfjob_tests.py index bdd408dd65..3a7b9d624c 100644 --- a/py/invalid_tfjob_tests.py +++ b/py/invalid_tfjob_tests.py @@ -9,7 +9,9 @@ INVALID_TFJOB_COMPONENT_NAME = "invalid_tfjob" + class InvalidTfJobTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -17,7 +19,8 @@ def __init__(self, args): self.namespace = namespace self.tfjob_version = args.tfjob_version self.params = args.params - super(InvalidTfJobTests, self).__init__(class_name="InvalidTfJobTests", name=name) + super(InvalidTfJobTests, self).__init__( + class_name="InvalidTfJobTests", name=name) def test_invalid_tfjob_spec(self): api_client = k8s_client.ApiClient() @@ -25,7 +28,7 @@ def test_invalid_tfjob_spec(self): # Setup the ksonnet app ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, - self.params) + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -33,8 +36,11 @@ def test_invalid_tfjob_spec(self): logging.info("Wait for conditions Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -54,12 +60,17 @@ def test_invalid_tfjob_spec(self): logging.error(self.failure) # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/k8s_util.py b/py/k8s_util.py index 9e02587ae7..a050dbc350 100644 --- a/py/k8s_util.py +++ b/py/k8s_util.py @@ -9,19 +9,47 @@ from kubernetes import client as k8s_client from kubernetes.client import rest + +def get_container_start_time(client, namespace, pod_selector, index, phase): + """ get start time of container in the pod with pod_name, + we assume there is only one container. + + Args: + client: K8s api client. + namespace: Namespace. + pod_selector: Selector for the pods. + index: Index of the pods + phase: expected of the phase when getting the start time + Returns: + container_start_time: container start time in datetime datatype + """ + pods = list_pods(client, namespace, pod_selector) + logging.info("%s pods matched %s pods", len(pods.items), pod_selector) + pod = pods.items[index] + + if phase == "Running": + container_start_time = pod.status.container_statuses[ + 0].state.running.started_at + else: + container_start_time = pod.status.container_statuses[ + 0].state.terminated.started_at + + return container_start_time + + def log_pods(pods): """Log information about pods.""" for p in pods.items: logging.info("Pod name=%s Phase=%s", p.metadata.name, p.status.phase) -def wait_for_pods_to_be_in_phases(client, - namespace, - pod_selector, - phases, - timeout=datetime.timedelta(minutes=5), - polling_interval=datetime.timedelta( - seconds=30)): +def wait_for_pods_to_be_in_phases( + client, + namespace, + pod_selector, + phases, + timeout=datetime.timedelta(minutes=5), + polling_interval=datetime.timedelta(seconds=30)): """Wait for the pods matching the selector to be in the specified state Args: @@ -35,8 +63,10 @@ def wait_for_pods_to_be_in_phases(client, invoked after we poll the job. Callable takes a single argument which is the job. """ + time.sleep(polling_interval.seconds) end_time = datetime.datetime.now() + timeout while True: + pods = list_pods(client, namespace, pod_selector) logging.info("%s pods matched %s pods", len(pods.items), pod_selector) @@ -44,6 +74,8 @@ def wait_for_pods_to_be_in_phases(client, is_match = True for p in pods.items: if p.status.phase not in phases: + # for debug + logging.info("pod in phase %s", p.status.phase) is_match = False if is_match: @@ -54,20 +86,21 @@ def wait_for_pods_to_be_in_phases(client, if datetime.datetime.now() + polling_interval > end_time: logging.info("Latest pod phases") log_pods(pods) - logging.error("Timeout waiting for pods to be in phase: %s", - phases) - raise util.TimeoutError("Timeout waiting for pods to be in states %s" % - phases) + logging.error("Timeout waiting for pods to be in phase: %s", phases) + raise util.TimeoutError( + "Timeout waiting for pods to be in states %s" % phases) + time.sleep(polling_interval.seconds) return None -def wait_for_pods_to_be_deleted(client, - namespace, - pod_selector, - timeout=datetime.timedelta(minutes=5), - polling_interval=datetime.timedelta( - seconds=30)): + +def wait_for_pods_to_be_deleted( + client, + namespace, + pod_selector, + timeout=datetime.timedelta(minutes=5), + polling_interval=datetime.timedelta(seconds=30)): """Wait for the specified job to be deleted. Args: @@ -94,6 +127,7 @@ def wait_for_pods_to_be_deleted(client, time.sleep(polling_interval.seconds) + def list_pods(client, namespace, label_selector): core = k8s_client.CoreV1Api(client) try: @@ -115,10 +149,11 @@ def list_pods(client, namespace, label_selector): message = body.get("message") logging.exception(("Exception when calling DefaultApi->" - "apis_fqdn_v1_namespaces_namespace_resource_post: %s"), - message) + "apis_fqdn_v1_namespaces_namespace_resource_post: %s"), + message) raise e + def get_events(client, namespace, uid): """Get the events for the provided object.""" core = k8s_client.CoreV1Api(client) @@ -142,8 +177,8 @@ def get_events(client, namespace, uid): message = body.get("message") logging.exception(("Exception when calling DefaultApi->" - "apis_fqdn_v1_namespaces_namespace_resource_post: %s"), - message) + "apis_fqdn_v1_namespaces_namespace_resource_post: %s"), + message) raise e matching = [] @@ -155,6 +190,7 @@ def get_events(client, namespace, uid): return matching + def parse_events(events): """Parse events. diff --git a/py/ks_util.py b/py/ks_util.py index ba673d2d97..27f71b3d35 100644 --- a/py/ks_util.py +++ b/py/ks_util.py @@ -17,13 +17,12 @@ def setup_ks_app(app_dir, env, namespace, component, params): # Create a new environment for this run try: util.run(["ks", "env", "add", env, "--namespace=" + namespace], - cwd=app_dir) + cwd=app_dir) except subprocess.CalledProcessError as e: if not re.search(".*environment.*already exists.*", e.output): raise for pair in params.split(","): k, v = pair.split("=", 1) - util.run( - ["ks", "param", "set", "--env=" + env, component, k, v], - cwd=app_dir) + util.run(["ks", "param", "set", "--env=" + env, component, k, v], + cwd=app_dir) diff --git a/py/py_checks.py b/py/py_checks.py index a834267635..88c73d7805 100644 --- a/py/py_checks.py +++ b/py/py_checks.py @@ -52,8 +52,8 @@ def run_lint(args): for f in fnmatch.filter(files, pat): full_path = os.path.join(root, f) try: - util.run( - ["pylint", "--rcfile=" + rc_file, full_path], cwd=args.src_dir) + util.run(["pylint", "--rcfile=" + rc_file, full_path], + cwd=args.src_dir) except subprocess.CalledProcessError: failed_files.append(full_path[len(args.src_dir):]) diff --git a/py/release.py b/py/release.py index 1ee381aba7..63b36db0c3 100755 --- a/py/release.py +++ b/py/release.py @@ -151,11 +151,14 @@ def build_operator_image(root_dir, "github.com/kubeflow/tf-operator/dashboard/backend", ] for t in targets: - if t in ["github.com/kubeflow/tf-operator/cmd/tf-operator.v2", - "github.com/kubeflow/tf-operator/cmd/tf-operator.v1beta1"]: + if t in [ + "github.com/kubeflow/tf-operator/cmd/tf-operator.v2", + "github.com/kubeflow/tf-operator/cmd/tf-operator.v1beta1" + ]: util.run([ "go", "install", "-ldflags", - "-X github.com/kubeflow/tf-operator/pkg/version.GitSHA={}".format(commit), t + "-X github.com/kubeflow/tf-operator/pkg/version.GitSHA={}".format( + commit), t ]) util.run(["go", "install", t]) @@ -174,8 +177,7 @@ def build_operator_image(root_dir, # List of paths to copy relative to root. sources = [ - "build/images/tf_operator/Dockerfile", - "examples/tf_sample/tf_smoke.py", + "build/images/tf_operator/Dockerfile", "examples/tf_sample/tf_smoke.py", os.path.join(go_path, bin_path, "tf-operator.v2"), os.path.join(go_path, bin_path, "tf-operator.v1beta1"), os.path.join(go_path, bin_path, "backend"), "dashboard/frontend/build" diff --git a/py/release_test.py b/py/release_test.py index 70bfff6909..c6d5e266d0 100644 --- a/py/release_test.py +++ b/py/release_test.py @@ -35,12 +35,8 @@ def test_build_postsubmit( # pylint: disable=no-self-use @mock.patch("py.release.util.install_go_deps") @mock.patch("py.release.util.clone_repo") @mock.patch("py.release.build_and_push") - def test_build_pr(# pylint: disable=no-self-use - self, - mock_build_and_push, - mock_clone, - _mock_install, - _mock_os, + def test_build_pr( # pylint: disable=no-self-use + self, mock_build_and_push, mock_clone, _mock_install, _mock_os, _mock_makedirs): parser = release.build_parser() args = parser.parse_args( @@ -79,5 +75,6 @@ def test_update_values(self): apiVersion: v1beta1""" self.assertEqual(expected, output) + if __name__ == "__main__": unittest.main() diff --git a/py/replica_restart_policy_tests.py b/py/replica_restart_policy_tests.py new file mode 100644 index 0000000000..fd61223acc --- /dev/null +++ b/py/replica_restart_policy_tests.py @@ -0,0 +1,158 @@ +import json +import logging +from kubernetes import client as k8s_client +from kubeflow.testing import test_util, util +from py import ks_util +from py import test_runner +from py import tf_job_client + +REPLICA_RESTART_POLICY_ALWAYS_COMPONENT_NAME = "replica_restart_policy_always" +REPLICA_RESTART_POLICY_ONFAILURE_COMPONENT_NAME = "replica_restart_policy_onfailure" +REPLICA_RESTART_POLICY_NEVER_COMPONENT_NAME = "replica_restart_policy_never" +REPLICA_RESTART_POLICY_EXITCODE_COMPONENT_NAME = "replica_restart_policy_exitcode" + + +class ReplicaRestartPolicyTests(test_util.TestCase): + + def __init__(self, args): + namespace, name, env = test_runner.parse_runtime_params(args) + self.app_dir = args.app_dir + self.env = env + self.namespace = namespace + self.tfjob_version = args.tfjob_version + self.params = args.params + super(ReplicaRestartPolicyTests, self).__init__( + class_name="ReplicaRestartPolicyTests", name=name) + + def run_tfjob_with_replica_restart_policy(self, component, + replica_restart_policy, exit_code): + api_client = k8s_client.ApiClient() + + # Setup the ksonnet app + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) + + # Create the TF job + util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) + logging.info("Created job %s in namespaces %s", self.name, self.namespace) + + # Wait for the job to either be in Running state or a terminal state + logging.info("Wait for conditions Running, Succeeded, or Failed") + results = tf_job_client.wait_for_condition( + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) + logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) + + if replica_restart_policy == "Always" and exit_code == 0: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, True) + + elif replica_restart_policy == "Always" and exit_code == 1: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, True) + + elif replica_restart_policy == "OnFailure" and exit_code == 1: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, True) + + elif replica_restart_policy == "OnFailure" and exit_code == 0: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, False) + + elif replica_restart_policy == "Never" and exit_code == 1: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, False) + + elif replica_restart_policy == "Never" and exit_code == 0: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, False) + + elif replica_restart_policy == "ExitCode" and exit_code == 1: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, False) + + else: + res = tf_job_client.terminate_and_verify_start_time( + api_client, self.namespace, self.name, "ps", 0, exit_code, True) + + if res is False: + self.failure = "Job {0} in namespace {1} with restart policy {2} failed test \ + with exit_code {3}".format(self.name, self.namespace, + replica_restart_policy, exit_code) + logging.error(self.failure) + return + + # Delete the TFJob. + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) + tf_job_client.wait_for_delete( + api_client, + self.namespace, + self.name, + self.tfjob_version, + status_callback=tf_job_client.log_status) + + # Verify that the pod is restarted even after the container exits with success. + # We terminate PS with exit_code=0, and verify it is restarted. + def test_restart_always_exit_code_0(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_ALWAYS_COMPONENT_NAME + "_" + self.tfjob_version, + "Always", 0) + + # Verify that the pod is restarted after the container exits with 1. + # We terminate PS with exit_code=1, and verify it is restarted. + def test_restart_always_exit_code_1(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_ALWAYS_COMPONENT_NAME + "_" + self.tfjob_version, + "Always", 1) + + # Verify that the pod is restarted after failure. + # We terminate PS with exit_code=1, and verify it is restarted. + def test_restart_onfailure_exit_code_1(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_ONFAILURE_COMPONENT_NAME + "_" + + self.tfjob_version, "OnFailure", 1) + + # Verify that the pod is restarted after failure. + # We terminate PS with exit_code=0, and verify it is not restarted. + def test_restart_onfailure_exit_code_0(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_ONFAILURE_COMPONENT_NAME + "_" + + self.tfjob_version, "OnFailure", 0) + + # Verify that the pod is never restarted. + # We terminate PS with exit_code=1, and verify it is not restarted. + def test_restart_never_exit_code_1(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_NEVER_COMPONENT_NAME + "_" + self.tfjob_version, + "Never", 1) + + # Verify that the pod is never restarted. + # We terminate PS with exit_code=0, and verify it is not restarted. + def test_restart_never_exit_code_0(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_NEVER_COMPONENT_NAME + "_" + self.tfjob_version, + "Never", 0) + + # Verify that the pod is not restarted after permanent error ( 1-127 ). + # We terminate PS with exit_code=1, and verify its phase becomes Failed. + def test_restart_exitcode_permanent_error(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_EXITCODE_COMPONENT_NAME + "_" + self.tfjob_version, + "ExitCode", 1) + + # Verify that the pod is not restarted after retryable error. + # We terminate PS with exit_code=130, and verify it is restarted. + def test_restart_exitcode_retryable_error(self): + return self.run_tfjob_with_replica_restart_policy( + REPLICA_RESTART_POLICY_EXITCODE_COMPONENT_NAME + "_" + self.tfjob_version, + "ExitCode", 130) + + +if __name__ == "__main__": + test_runner.main(module=__name__) diff --git a/py/shutdown_policy_tests.py b/py/shutdown_policy_tests.py index debbac9b47..d24e590d6c 100644 --- a/py/shutdown_policy_tests.py +++ b/py/shutdown_policy_tests.py @@ -9,7 +9,9 @@ MASTER_IS_CHIEF_COMPONENT_NAME = "master_is_chief" WORKER0_IS_CHIEF_COMPONENT_NAME = "worker0_is_chief" + class ShutdownPolicyTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -17,13 +19,15 @@ def __init__(self, args): self.namespace = namespace self.tfjob_version = args.tfjob_version self.params = args.params - super(ShutdownPolicyTests, self).__init__(class_name="ShutdownPolicyTests", name=name) + super(ShutdownPolicyTests, self).__init__( + class_name="ShutdownPolicyTests", name=name) def run_tfjob_with_shutdown_policy(self, component, shutdown_policy): api_client = k8s_client.ApiClient() # Setup the ksonnet app - ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params) + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -32,19 +36,27 @@ def run_tfjob_with_shutdown_policy(self, component, shutdown_policy): # Wait for the job to either be in Running state or a terminal state logging.info("Wait for conditions Running, Succeeded, or Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) if shutdown_policy == "worker": - tf_job_client.terminate_replicas(api_client, self.namespace, self.name, "worker", 1) + tf_job_client.terminate_replicas(api_client, self.namespace, self.name, + "worker", 1) else: - tf_job_client.terminate_replicas(api_client, self.namespace, self.name, "chief", 1) + tf_job_client.terminate_replicas(api_client, self.namespace, self.name, + "chief", 1) # Wait for the job to complete. logging.info("Waiting for job to finish.") results = tf_job_client.wait_for_job( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -55,11 +67,15 @@ def run_tfjob_with_shutdown_policy(self, component, shutdown_policy): return # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) # Tests launching a TFJob with a Chief replica. Terminate the chief replica, and @@ -74,5 +90,6 @@ def test_shutdown_worker0(self): return self.run_tfjob_with_shutdown_policy( WORKER0_IS_CHIEF_COMPONENT_NAME + "_" + self.tfjob_version, "worker") + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/simple_tfjob_tests.py b/py/simple_tfjob_tests.py index d481a563ae..eae7243457 100644 --- a/py/simple_tfjob_tests.py +++ b/py/simple_tfjob_tests.py @@ -9,7 +9,9 @@ CPU_TFJOB_COMPONENT_NAME = "simple_tfjob" GPU_TFJOB_COMPONENT_NAME = "gpu_tfjob" + class SimpleTfJobTests(test_util.TestCase): + def __init__(self, args): namespace, name, env = test_runner.parse_runtime_params(args) self.app_dir = args.app_dir @@ -17,14 +19,16 @@ def __init__(self, args): self.namespace = namespace self.tfjob_version = args.tfjob_version self.params = args.params - super(SimpleTfJobTests, self).__init__(class_name="SimpleTfJobTests", name=name) + super(SimpleTfJobTests, self).__init__( + class_name="SimpleTfJobTests", name=name) # Run a generic TFJob, wait for it to complete, and check for pod/service creation errors. def run_simple_tfjob(self, component): api_client = k8s_client.ApiClient() # Setup the ksonnet app - ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params) + ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, + self.params) # Create the TF job util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir) @@ -33,14 +37,20 @@ def run_simple_tfjob(self, component): # Wait for the job to either be in Running state or a terminal state logging.info("Wait for conditions Running, Succeeded, or Failed") results = tf_job_client.wait_for_condition( - api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"], - version=self.tfjob_version, status_callback=tf_job_client.log_status) + api_client, + self.namespace, + self.name, ["Running", "Succeeded", "Failed"], + version=self.tfjob_version, + status_callback=tf_job_client.log_status) logging.info("Current TFJob:\n %s", json.dumps(results, indent=2)) # Wait for the job to complete. logging.info("Waiting for job to finish.") results = tf_job_client.wait_for_job( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) @@ -62,11 +72,15 @@ def run_simple_tfjob(self, component): logging.warning(creation_failures) # Delete the TFJob. - tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version) - logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name, - self.namespace) + tf_job_client.delete_tf_job( + api_client, self.namespace, self.name, version=self.tfjob_version) + logging.info("Waiting for job %s in namespaces %s to be deleted.", + self.name, self.namespace) tf_job_client.wait_for_delete( - api_client, self.namespace, self.name, self.tfjob_version, + api_client, + self.namespace, + self.name, + self.tfjob_version, status_callback=tf_job_client.log_status) # Run a generic TFJob, wait for it to complete, and check for pod/service creation errors. @@ -77,5 +91,6 @@ def test_simple_tfjob_cpu(self): def test_simple_tfjob_gpu(self): self.run_simple_tfjob(GPU_TFJOB_COMPONENT_NAME + "_" + self.tfjob_version) + if __name__ == "__main__": test_runner.main(module=__name__) diff --git a/py/test_runner.py b/py/test_runner.py index ffe4c56ee0..3ab9fd1b57 100644 --- a/py/test_runner.py +++ b/py/test_runner.py @@ -19,8 +19,8 @@ # between retries is because we have multiple tests running in parallel # that are all modifying the same ksonnet app via ks. I think this can # lead to failures. -@retrying.retry(stop_max_attempt_number=10, wait_random_min=1000, - wait_random_max=10000) +@retrying.retry( + stop_max_attempt_number=10, wait_random_min=1000, wait_random_max=10000) def run_test(test_case, test_func, args): # pylint: disable=too-many-branches,too-many-statements """Run a test.""" gcs_client = storage.Client(project=args.project) @@ -41,7 +41,7 @@ def run_test(test_case, test_func, args): # pylint: disable=too-many-branches,t start = time.time() - try: # pylint: disable=too-many-nested-blocks + try: # pylint: disable=too-many-nested-blocks # We repeat the test multiple times. # This ensures that if we delete the job we can create a new job with the # same name. @@ -77,10 +77,12 @@ def run_test(test_case, test_func, args): # pylint: disable=too-many-branches,t finally: test_case.time = time.time() - start if args.artifacts_path: - test_util.create_junit_xml_file([test_case], + test_util.create_junit_xml_file( + [test_case], args.artifacts_path + "/junit_" + test_func.__name__ + ".xml", gcs_client) + def parse_runtime_params(args): salt = uuid.uuid4().hex[0:4] @@ -107,6 +109,7 @@ def parse_runtime_params(args): return namespace, name, env + def add_common_args(parser): """Add a set of common parser arguments.""" parser.add_argument( @@ -156,7 +159,7 @@ def add_common_args(parser): default=None, type=str, help="(Optional) the name for the ksonnet environment; if not specified " - "a random one is created.") + "a random one is created.") parser.add_argument( "--num_trials", @@ -177,7 +180,8 @@ def main(module=None): # pylint: disable=too-many-locals level=logging.INFO, format=('%(levelname)s|%(asctime)s' '|%(pathname)s|%(lineno)d| %(message)s'), - datefmt='%Y-%m-%dT%H:%M:%S',) + datefmt='%Y-%m-%dT%H:%M:%S', + ) util.maybe_activate_service_account() diff --git a/py/tf_job_client.py b/py/tf_job_client.py index 8e12ff4a27..44c03981b2 100644 --- a/py/tf_job_client.py +++ b/py/tf_job_client.py @@ -14,7 +14,6 @@ from py import k8s_util from py import util - TF_JOB_GROUP = "kubeflow.org" TF_JOB_PLURAL = "tfjobs" TF_JOB_KIND = "TFJob" @@ -22,6 +21,7 @@ # How long to wait in seconds for requests to the ApiServer TIMEOUT = 120 + def create_tf_job(client, spec, version="v1beta1"): """Create a TFJob. @@ -69,10 +69,16 @@ def delete_tf_job(client, namespace, name, version="v1beta1"): } logging.info("Deleting job %s.%s", namespace, name) thread = crd_api.delete_namespaced_custom_object( - TF_JOB_GROUP, version, namespace, TF_JOB_PLURAL, name, body, + TF_JOB_GROUP, + version, + namespace, + TF_JOB_PLURAL, + name, + body, async_req=True) api_response = thread.get(TIMEOUT) - logging.info("Deleting job %s.%s returned: %s", namespace, name, api_response) + logging.info("Deleting job %s.%s returned: %s", namespace, name, + api_response) return api_response except rest.ApiException as e: message = "" @@ -99,22 +105,24 @@ def delete_tf_job(client, namespace, name, version="v1beta1"): def log_status(tf_job): """A callback to use with wait_for_job.""" all_conditions = tf_job.get("status", {}).get("conditions", []) - conditions = [] if all_conditions is None else [c.get("type", "") for c in all_conditions] + conditions = [] if all_conditions is None else [ + c.get("type", "") for c in all_conditions + ] logging.info("Job %s in namespace %s; uid=%s; conditions=%s", tf_job.get("metadata", {}).get("name"), tf_job.get("metadata", {}).get("namespace"), - tf_job.get("metadata", {}).get("uid"), - conditions) + tf_job.get("metadata", {}).get("uid"), conditions) + # pylint: disable=too-many-arguments def wait_for_condition(client, - namespace, - name, - expected_condition, - version="v1beta1", - timeout=datetime.timedelta(minutes=10), - polling_interval=datetime.timedelta(seconds=30), - status_callback=None): + namespace, + name, + expected_condition, + version="v1beta1", + timeout=datetime.timedelta(minutes=10), + polling_interval=datetime.timedelta(seconds=30), + status_callback=None): """Waits until any of the specified conditions occur. Args: @@ -163,8 +171,7 @@ def wait_for_condition(client, if datetime.datetime.now() + polling_interval > end_time: raise util.JobTimeoutError( "Timeout waiting for job {0} in namespace {1} to enter one of the " - "conditions {2}.".format( - name, namespace, conditions), results) + "conditions {2}.".format(name, namespace, conditions), results) time.sleep(polling_interval.seconds) @@ -172,6 +179,7 @@ def wait_for_condition(client, # this code is unreachable. return None + def wait_for_job(client, namespace, name, @@ -192,11 +200,13 @@ def wait_for_job(client, is the job. """ return wait_for_condition( - client, namespace, name, ["Succeeded", "Failed"], - version=version, - timeout=timeout, - polling_interval=polling_interval, - status_callback=status_callback) + client, + namespace, + name, ["Succeeded", "Failed"], + version=version, + timeout=timeout, + polling_interval=polling_interval, + status_callback=status_callback) def wait_for_delete(client, @@ -223,8 +233,7 @@ def wait_for_delete(client, while True: try: results = crd_api.get_namespaced_custom_object( - TF_JOB_GROUP, version, namespace, - TF_JOB_PLURAL, name) + TF_JOB_GROUP, version, namespace, TF_JOB_PLURAL, name) except rest.ApiException as e: if e.status == httplib.NOT_FOUND: return @@ -240,6 +249,7 @@ def wait_for_delete(client, time.sleep(polling_interval.seconds) + def get_labels(name, replica_type=None, replica_index=None): """Return labels. """ @@ -248,12 +258,13 @@ def get_labels(name, replica_type=None, replica_index=None): "tf_job_name": name, } if replica_type: - labels["tf-replica-type"] = replica_type + labels["tf-replica-type"] = str.lower(replica_type) if replica_index: labels["tf-replica-index"] = replica_index return labels + def to_selector(labels): parts = [] for k, v in labels.iteritems(): @@ -261,14 +272,18 @@ def to_selector(labels): return ",".join(parts) -def wait_for_replica_type_in_phases(api_client, namespace, tfjob_name, replica_type, phases): + +def wait_for_replica_type_in_phases(api_client, namespace, tfjob_name, + replica_type, phases): pod_labels = get_labels(tfjob_name, replica_type) pod_selector = to_selector(pod_labels) - k8s_util.wait_for_pods_to_be_in_phases(api_client, namespace, - pod_selector, - phases, - timeout=datetime.timedelta( - minutes=4)) + k8s_util.wait_for_pods_to_be_in_phases( + api_client, + namespace, + pod_selector, + phases, + timeout=datetime.timedelta(minutes=4)) + @retrying.retry(wait_fixed=10, stop_max_delay=60) def terminate_replica(master_host, namespace, target, exit_code=0): @@ -285,7 +300,13 @@ def terminate_replica(master_host, namespace, target, exit_code=0): } util.send_request(master_host, namespace, target, "exit", params) -def terminate_replicas(api_client, namespace, name, replica, num_targets): + +def terminate_replicas(api_client, + namespace, + name, + replica, + num_targets, + exit_code=0): """Terminates the specified replica(s). Args: @@ -294,6 +315,7 @@ def terminate_replicas(api_client, namespace, name, replica, num_targets): name: TFJob name replica: Replica type (chief, worker, ps) num_targets: Number of replicas to terminate. + exit_code: What exit code to terminate the pods with. """ target = "{name}-{replica}".format(name=name, replica=replica) pod_labels = get_labels(namespace, name) @@ -304,16 +326,16 @@ def terminate_replicas(api_client, namespace, name, replica, num_targets): # TODO(jlewi): We are get pods using a label selector so there is # a risk that the pod we actual care about isn't present. logging.info("Waiting for pods to be running before shutting down.") - k8s_util.wait_for_pods_to_be_in_phases(api_client, namespace, - pod_selector, - ["Running"], - timeout=datetime.timedelta( - minutes=4)) + k8s_util.wait_for_pods_to_be_in_phases( + api_client, + namespace, + pod_selector, ["Running"], + timeout=datetime.timedelta(minutes=4)) logging.info("Pods are ready") logging.info("Issuing the terminate request") for num in range(num_targets): full_target = target + "-{0}".format(num) - terminate_replica(masterHost, namespace, full_target) + terminate_replica(masterHost, namespace, full_target, exit_code) def job_succeeded(tfjob): @@ -345,7 +367,8 @@ def get_creation_failures_from_tfjob(api_client, namespace, tfjob): num_expected = 0 for replicakey in tfjob.get("spec", {}).get("tfReplicaSpecs", {}): - replica_spec = tfjob.get("spec", {}).get("tfReplicaSpecs", {}).get(replicakey, {}) + replica_spec = tfjob.get("spec", {}).get("tfReplicaSpecs", {}).get( + replicakey, {}) if replica_spec: num_expected += replica_spec.get("replicas", 1) @@ -357,7 +380,79 @@ def get_creation_failures_from_tfjob(api_client, namespace, tfjob): if len(created_services) != num_expected: message = ("Expected {0} services to be created but only " - "got {1} create events.").format(num_expected, len(created_services)) + "got {1} create events.").format(num_expected, + len(created_services)) creation_failures.append(message) return creation_failures + + +def get_start_time_by_index(api_client, namespace, name, replica_type, + replica_index, phase): + """Returns the start time of the specified pod. + + Args: + api_client: The K8s API client. + namespace: The K8s namespace. + name: TFJob name. + replica_type: Replica type (chief, worker, ps). + replica_index: Index of the replicas. + phase: expected of the phase when getting the start time + """ + pod_labels = get_labels(name, replica_type) + pod_selector = to_selector(pod_labels) + return k8s_util.get_container_start_time(api_client, namespace, pod_selector, + replica_index, phase) + + +def terminate_and_verify_start_time(api_client, namespace, name, replica_type, + replica_index, exit_code, expect_restart): + """ Return True for passing the test and False for failing the test. + # if expect_restart is true, check that the second restart time is after the first. + # if expect_restart is false, check that the restart time has not changed. + + Args: + api_client: The K8s API client. + namespace: The K8s namespace. + name: TFJob name. + replica_type: Replica type (chief, worker, ps). + replica_index: Index of the replicas. + exit_code: exit_code for the pod to exit with. + expect_restart: expectation of whether the pod will restart after being terminated + """ + wait_for_replica_type_in_phases(api_client, namespace, name, "ps", + ["Running"]) + first_start_time = get_start_time_by_index(api_client, namespace, name, + replica_type, replica_index, "Running") + terminate_replicas(api_client, namespace, name, "ps", 1, exit_code) + + if expect_restart: + wait_for_replica_type_in_phases(api_client, namespace, name, "ps", + ["Running"]) + restart_time = get_start_time_by_index( + api_client, namespace, name, replica_type, replica_index, "Running") + logging.info("First start time: %s, restart time: %s", + str(first_start_time), str(restart_time)) + if restart_time <= first_start_time: + return False + + elif expect_restart is False and exit_code == 0: + wait_for_replica_type_in_phases(api_client, namespace, name, "ps", + ["Succeeded"]) + restart_time = get_start_time_by_index( + api_client, namespace, name, replica_type, replica_index, "Succeeded") + logging.info("First start time: %s, restart time: %s", + str(first_start_time), str(restart_time)) + if restart_time != first_start_time: + return False + else: + wait_for_replica_type_in_phases(api_client, namespace, name, "ps", + ["Failed"]) + restart_time = get_start_time_by_index( + api_client, namespace, name, replica_type, replica_index, "Failed") + logging.info("First start time: %s, restart time: %s", + str(first_start_time), str(restart_time)) + if restart_time != first_start_time: + return False + + return True diff --git a/py/util.py b/py/util.py index f53a8d3456..6e618dbe94 100755 --- a/py/util.py +++ b/py/util.py @@ -46,10 +46,7 @@ def run(command, cwd=None, env=None, dryrun=False): # In case the release is done from a non-linux machine # we enforce correct GOOS and GOARCH - extra_envs = { - "GOOS": "linux", - "GOARCH": "amd64" - } + extra_envs = {"GOOS": "linux", "GOARCH": "amd64"} if not env: env = os.environ.copy() @@ -116,17 +113,15 @@ def send_request(master_host, namespace, target, rpc, params): } url = ("{master}/api/v1/namespaces/{namespace}/services/{service}:2222" "/proxy/{rpc}").format( - master=master_host, namespace=namespace, service=target, rpc=rpc) - r = requests.get(url, - headers=headers, params=params, - verify=False) + master=master_host, namespace=namespace, service=target, rpc=rpc) + r = requests.get(url, headers=headers, params=params, verify=False) if r.status_code == requests.codes.NOT_FOUND: logging.info("Request to %s returned 404", url) return "" if r.status_code != requests.codes.OK: - msg = "Request to {0} exited with status code: {1}".format(url, - r.status_code) + msg = "Request to {0} exited with status code: {1}".format( + url, r.status_code) logging.error(msg) raise RuntimeError(msg) @@ -164,22 +159,20 @@ def clone_repo(dest, if branches: for b in branches: - run( - [ - "git", - "fetch", - "origin", - b, - ], cwd=dest) + run([ + "git", + "fetch", + "origin", + b, + ], cwd=dest) if not sha: b = branches[-1].split(":", 1)[-1] - run( - [ - "git", - "checkout", - b, - ], cwd=dest) + run([ + "git", + "checkout", + b, + ], cwd=dest) if sha: run(["git", "checkout", sha], cwd=dest) @@ -338,8 +331,9 @@ def wait_for_deployment(api_client, namespace, name): logging.info("Waiting for deployment %s in namespace %s", name, namespace) time.sleep(10) - logging.error("Timeout waiting for deployment %s in namespace %s to be " - "ready", name, namespace) + logging.error( + "Timeout waiting for deployment %s in namespace %s to be " + "ready", name, namespace) raise TimeoutError( "Timeout waiting for deployment {0} in namespace {1}".format( name, namespace)) @@ -372,8 +366,9 @@ def wait_for_statefulset(api_client, namespace, name): logging.info("Waiting for Statefulset %s in namespace %s", name, namespace) time.sleep(10) - logging.error("Timeout waiting for statefulset %s in namespace %s to be " - "ready", name, namespace) + logging.error( + "Timeout waiting for statefulset %s in namespace %s to be " + "ready", name, namespace) raise TimeoutError( "Timeout waiting for statefulset {0} in namespace {1}".format( name, namespace)) @@ -460,6 +455,7 @@ def setup_cluster(api_client): class TimeoutError(Exception): # pylint: disable=redefined-builtin """An error indicating an operation timed out.""" + class JobTimeoutError(TimeoutError): """An error indicating the job timed out. @@ -470,6 +466,7 @@ def __init__(self, message, job): super(JobTimeoutError, self).__init__(message) self.job = job + GCS_REGEX = re.compile("gs://([^/]*)(/.*)?") diff --git a/test/workflows/components/params.libsonnet b/test/workflows/components/params.libsonnet index 797687b422..27b85215ca 100644 --- a/test/workflows/components/params.libsonnet +++ b/test/workflows/components/params.libsonnet @@ -61,6 +61,26 @@ "invalid_tfjob_v1alpha2": { name: "invalid-tfjob", }, + replica_restart_policy_always_v1alpha2: { + name: "replica-restart-policy-always", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_onfailure_v1alpha2: { + name: "replica-restart-policy-onfailure", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_never_v1alpha2: { + name: "replica-restart-policy-never", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_exitcode_v1alpha2: { + name: "replica-restart-policy-exitcode", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, // v1beta1 components simple_tfjob_v1beta1: { name: "simple-001", @@ -108,5 +128,25 @@ "invalid_tfjob_v1beta1": { name: "invalid-tfjob", }, + replica_restart_policy_always_v1beta1: { + name: "replica-restart-policy-always", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_onfailure_v1beta1: { + name: "replica-restart-policy-onfailure", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_never_v1beta1: { + name: "replica-restart-policy-never", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, + replica_restart_policy_exitcode_v1beta1: { + name: "replica-restart-policy-exitcode", + namespace: "kubeflow-test-infra", + image: "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest" + }, }, } diff --git a/test/workflows/components/replica_restart_policy_always_v1alpha2.jsonnet b/test/workflows/components/replica_restart_policy_always_v1alpha2.jsonnet new file mode 100644 index 0000000000..029d549e92 --- /dev/null +++ b/test/workflows/components/replica_restart_policy_always_v1alpha2.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_always_v1alpha2; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1alpha2", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "Always", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "Always", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_always_v1beta1.jsonnet b/test/workflows/components/replica_restart_policy_always_v1beta1.jsonnet new file mode 100644 index 0000000000..d9ebd2f4b2 --- /dev/null +++ b/test/workflows/components/replica_restart_policy_always_v1beta1.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_always_v1beta1; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1beta1", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "Always", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "Always", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_exitcode_v1alpha2.jsonnet b/test/workflows/components/replica_restart_policy_exitcode_v1alpha2.jsonnet new file mode 100644 index 0000000000..313655a52a --- /dev/null +++ b/test/workflows/components/replica_restart_policy_exitcode_v1alpha2.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_exitcode_v1alpha2; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1alpha2", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "ExitCode", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "ExitCode", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_exitcode_v1beta1.jsonnet b/test/workflows/components/replica_restart_policy_exitcode_v1beta1.jsonnet new file mode 100644 index 0000000000..1b5582178c --- /dev/null +++ b/test/workflows/components/replica_restart_policy_exitcode_v1beta1.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_exitcode_v1beta1; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1beta1", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "ExitCode", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "ExitCode", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_never_v1alpha2.jsonnet b/test/workflows/components/replica_restart_policy_never_v1alpha2.jsonnet new file mode 100644 index 0000000000..2417aae37f --- /dev/null +++ b/test/workflows/components/replica_restart_policy_never_v1alpha2.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_never_v1alpha2; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1alpha2", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "Never", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "Never", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_never_v1beta1.jsonnet b/test/workflows/components/replica_restart_policy_never_v1beta1.jsonnet new file mode 100644 index 0000000000..a27e4d05f0 --- /dev/null +++ b/test/workflows/components/replica_restart_policy_never_v1beta1.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_never_v1beta1; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1beta1", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "Never", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "Never", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_onfailure_v1alpha2.jsonnet b/test/workflows/components/replica_restart_policy_onfailure_v1alpha2.jsonnet new file mode 100644 index 0000000000..04b8ea2b66 --- /dev/null +++ b/test/workflows/components/replica_restart_policy_onfailure_v1alpha2.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_onfailure_v1alpha2; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1alpha2", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "OnFailure", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "OnFailure", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/replica_restart_policy_onfailure_v1beta1.jsonnet b/test/workflows/components/replica_restart_policy_onfailure_v1beta1.jsonnet new file mode 100644 index 0000000000..e0d4b0d729 --- /dev/null +++ b/test/workflows/components/replica_restart_policy_onfailure_v1beta1.jsonnet @@ -0,0 +1,53 @@ +local params = std.extVar("__ksonnet/params").components.replica_restart_policy_onfailure_v1beta1; + +local k = import "k.libsonnet"; + +local defaultTestImage = "gcr.io/kubeflow-images-staging/tf-operator-test-server:latest"; + +local parts(namespace, name, image) = { + local actualImage = if image != "" then + image + else defaultTestImage, + job:: { + apiVersion: "kubeflow.org/v1beta1", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + tfReplicaSpecs: { + PS: { + replicas: 1, + restartPolicy: "OnFailure", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + Worker: { + replicas: 2, + restartPolicy: "OnFailure", + template: { + spec: { + containers: [ + { + name: "tensorflow", + image: actualImage, + }, + ], + }, + }, + }, + }, + }, + }, +}; + +std.prune(k.core.v1.list.new([parts(params.namespace, params.name, params.image).job])) diff --git a/test/workflows/components/workflows.libsonnet b/test/workflows/components/workflows.libsonnet index 34d77350f2..5d2a0fba86 100644 --- a/test/workflows/components/workflows.libsonnet +++ b/test/workflows/components/workflows.libsonnet @@ -284,6 +284,11 @@ template: "invalid-tfjob-tests", dependencies: ["setup-kubeflow"], }, + { + name: "replica-restart-policy-tests", + template: "replica-restart-policy-tests", + dependencies: ["setup-kubeflow"], + }, ], //tasks }, }, @@ -382,6 +387,8 @@ "distributed-training-tests"), $.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTestTemplate( "invalid-tfjob-tests"), + $.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTestTemplate( + "replica-restart-policy-tests"), $.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTemplate("create-pr-symlink", [ "python", "-m",