Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an e2etest for testing restart policy #873

Merged
merged 25 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7fd61f0
Add an e2etest for testing restart policy
ChanYiLin Nov 21, 2018
a2aec37
fix component removing jsonnet
ChanYiLin Nov 21, 2018
ef4778d
fix list pods
ChanYiLin Nov 21, 2018
6eb0009
fix label selector
ChanYiLin Nov 21, 2018
b5a3c0b
add v1beta1 and wait for pod to be terminated
ChanYiLin Nov 21, 2018
bac4d66
change exit code to 130
ChanYiLin Nov 21, 2018
9be66ab
test restart
ChanYiLin Nov 21, 2018
cdf212e
wait for time interval and start to get status
ChanYiLin Nov 21, 2018
c1d4e51
change pod start time to container start time
ChanYiLin Nov 21, 2018
76e701a
fix function call name
ChanYiLin Nov 21, 2018
6e44724
make sure ps in running state before get start time
ChanYiLin Nov 21, 2018
7af1283
fix clean pod policy e2e test bug by changing completed to succeeded
ChanYiLin Nov 22, 2018
0465605
fix version in the component
ChanYiLin Nov 22, 2018
280c75c
refactor the code and add more tests
ChanYiLin Nov 26, 2018
c6b3550
fix pylint
ChanYiLin Nov 26, 2018
c75f87e
refactor the code by minimizing return statements
ChanYiLin Nov 26, 2018
548fd8d
fix mistake
ChanYiLin Nov 26, 2018
5b17b72
remove redundant function
ChanYiLin Nov 26, 2018
2a6e403
get container start time
ChanYiLin Nov 26, 2018
fc5bc24
fix pylint
ChanYiLin Nov 26, 2018
fad31c3
fix get time in different phase
ChanYiLin Nov 26, 2018
a5b96d3
fix function parameter
ChanYiLin Nov 26, 2018
20e37b8
fix typos
ChanYiLin Nov 26, 2018
d6ca456
change exit_code 128 to 130
ChanYiLin Nov 26, 2018
16907d1
wait for ps in running phase before get time
ChanYiLin Nov 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions py/build_and_push_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

def GetGitHash(root_dir=None):
# The image tag is based on the githash.
git_hash = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"], cwd=root_dir).decode("utf-8")
git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"],
cwd=root_dir).decode("utf-8")
git_hash = git_hash.strip()

modified_files = subprocess.check_output(
["git", "ls-files", "--modified"], cwd=root_dir)
modified_files = subprocess.check_output(["git", "ls-files", "--modified"],
cwd=root_dir)
untracked_files = subprocess.check_output(
["git", "ls-files", "--others", "--exclude-standard"], cwd=root_dir)
if modified_files or untracked_files:
Expand Down
59 changes: 38 additions & 21 deletions py/cleanpod_policy_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
CLEANPOD_RUNNING_COMPONENT_NAME = "clean_pod_running"
CLEANPOD_NONE_COMPONENT_NAME = "clean_pod_none"


class CleanPodPolicyTests(test_util.TestCase):

def __init__(self, args):
namespace, name, env = test_runner.parse_runtime_params(args)
self.app_dir = args.app_dir
self.env = env
self.namespace = namespace
self.tfjob_version = args.tfjob_version
self.params = args.params
super(CleanPodPolicyTests, self).__init__(class_name="CleanPodPolicyTests", name=name)
super(CleanPodPolicyTests, self).__init__(
class_name="CleanPodPolicyTests", name=name)

def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy):
api_client = k8s_client.ApiClient()

# Setup the ksonnet app
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params)
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component,
self.params)

# Create the TF job
util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir)
Expand All @@ -34,14 +38,20 @@ def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy):
# Wait for the job to either be in Running state or a terminal state
logging.info("Wait for conditions Running, Succeeded, or Failed")
results = tf_job_client.wait_for_condition(
api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version, status_callback=tf_job_client.log_status)
api_client,
self.namespace,
self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Current TFJob:\n %s", json.dumps(results, indent=2))

# Wait for the job to complete.
logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Final TFJob:\n %s", json.dumps(results, indent=2))

Expand All @@ -55,31 +65,37 @@ def run_tfjob_with_cleanpod_policy(self, component, clean_pod_policy):
if clean_pod_policy == "All":
pod_labels = tf_job_client.get_labels(self.name)
pod_selector = tf_job_client.to_selector(pod_labels)
k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, pod_selector)
k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace,
pod_selector)
# Only running pods (PS) are deleted, completed pods are not.
elif clean_pod_policy == "Running":
tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace,
self.name, "Chief", ["Completed"])
tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace,
self.name, "Worker", ["Completed"])
tf_job_client.wait_for_replica_type_in_phases(
api_client, self.namespace, self.name, "Chief", ["Succeeded"])
tf_job_client.wait_for_replica_type_in_phases(
api_client, self.namespace, self.name, "Worker", ["Succeeded"])
pod_labels = tf_job_client.get_labels(self.name, "PS")
pod_selector = tf_job_client.to_selector(pod_labels)
k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace, pod_selector)
k8s_util.wait_for_pods_to_be_deleted(api_client, self.namespace,
pod_selector)
# No pods are deleted.
elif clean_pod_policy == "None":
tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace,
self.name, "Chief", ["Completed"])
tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace,
self.name, "Worker", ["Completed"])
tf_job_client.wait_for_replica_type_in_phases(api_client, self.namespace,
self.name, "PS", ["Running"])
tf_job_client.wait_for_replica_type_in_phases(
api_client, self.namespace, self.name, "Chief", ["Succeeded"])
tf_job_client.wait_for_replica_type_in_phases(
api_client, self.namespace, self.name, "Worker", ["Succeeded"])
tf_job_client.wait_for_replica_type_in_phases(
api_client, self.namespace, self.name, "PS", ["Running"])

# Delete the TFJob.
tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name,
self.namespace)
tf_job_client.delete_tf_job(
api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.",
self.name, self.namespace)
tf_job_client.wait_for_delete(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)

# Verify that all pods are deleted when the job completes.
Expand All @@ -97,5 +113,6 @@ def test_cleanpod_none(self):
return self.run_tfjob_with_cleanpod_policy(
CLEANPOD_NONE_COMPONENT_NAME + "_" + self.tfjob_version, "None")


if __name__ == "__main__":
test_runner.main(module=__name__)
19 changes: 12 additions & 7 deletions py/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def ks_deploy(app_dir, component, params, env=None, account=None):
raise

for k, v in params.iteritems():
util.run(
["ks", "param", "set", "--env=" + env, component, k, v], cwd=app_dir)
util.run(["ks", "param", "set", "--env=" + env, component, k, v],
cwd=app_dir)

apply_command = ["ks", "apply", env, "-c", component]
if account:
Expand Down Expand Up @@ -201,7 +201,7 @@ def setup_kubeflow(args):
"tfJobImage": args.image,
"name": "kubeflow-core",
"namespace": args.namespace,
"tfJobVersion": args.tf_job_version,
"tfJobVersion": args.tf_job_version,
}

component = "core"
Expand Down Expand Up @@ -231,10 +231,14 @@ def setup_kubeflow(args):
finally:
# Run kubectl describe to get useful information about the deployment.
# This will help troubleshoot any errors.
util.run(["kubectl", "-n", args.namespace, "describe", "deploy",
tf_job_deployment_name])
util.run(["kubectl", "-n", args.namespace, "describe", "pods", "-l",
"name=tf-job-operator"])
util.run([
"kubectl", "-n", args.namespace, "describe", "deploy",
tf_job_deployment_name
])
util.run([
"kubectl", "-n", args.namespace, "describe", "pods", "-l",
"name=tf-job-operator"
])

# Reraise the exception so that the step fails because there's no point
# continuing the test.
Expand All @@ -251,6 +255,7 @@ def setup_kubeflow(args):
gcs_client = storage.Client(project=args.project)
test_util.create_junit_xml_file([t], args.junit_path, gcs_client)


def teardown(args):
"""Teardown the resources."""
gke = discovery.build("container", "v1")
Expand Down
33 changes: 24 additions & 9 deletions py/distributed_training_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

TFJOB_COMPONENT_NAME = "distributed_training"


class DistributedTrainingJobTests(test_util.TestCase):

def __init__(self, args):
namespace, name, env = test_runner.parse_runtime_params(args)
self.app_dir = args.app_dir
Expand All @@ -25,7 +27,8 @@ def run_distributed_training_job(self, component):
api_client = k8s_client.ApiClient()

# Setup the ksonnet app
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params)
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component,
self.params)

# Create the TF job
util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir)
Expand All @@ -34,14 +37,20 @@ def run_distributed_training_job(self, component):
# Wait for the job to either be in Running state or a terminal state
logging.info("Wait for conditions Running, Succeeded, or Failed")
results = tf_job_client.wait_for_condition(
api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version, status_callback=tf_job_client.log_status)
api_client,
self.namespace,
self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Current TFJob:\n %s", json.dumps(results, indent=2))

# Wait for the job to complete.
logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Final TFJob:\n %s", json.dumps(results, indent=2))

Expand All @@ -58,17 +67,23 @@ def run_distributed_training_job(self, component):
logging.warning(creation_failures)

# Delete the TFJob.
tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name,
self.namespace)
tf_job_client.delete_tf_job(
api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.",
self.name, self.namespace)
tf_job_client.wait_for_delete(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)

# Run a distributed training TFJob, wait for it to complete, and check for pod/service
# creation errors.
def test_distributed_training_independent_worker(self):
self.run_distributed_training_job(TFJOB_COMPONENT_NAME + "_" + self.tfjob_version)
self.run_distributed_training_job(TFJOB_COMPONENT_NAME + "_" +
self.tfjob_version)


if __name__ == "__main__":
test_runner.main(module=__name__)
64 changes: 44 additions & 20 deletions py/estimator_runconfig_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@

COMPONENT_NAME = "estimator_runconfig"


def get_runconfig(master_host, namespace, target):
"""Issue a request to get the runconfig of the specified replica running test_server.
Args:
master_host: The IP address of the master e.g. https://35.188.37.10
namespace: The namespace
target: The K8s service corresponding to the pod to call.
"""
response = tf_operator_util.send_request(master_host, namespace, target, "runconfig", {})
response = tf_operator_util.send_request(master_host, namespace, target,
"runconfig", {})
return yaml.load(response)

def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_workers):

def verify_runconfig(master_host, namespace, job_name, replica, num_ps,
num_workers):
"""Verifies that the TF RunConfig on the specified replica is the same as expected.
Args:
master_host: The IP address of the master e.g. https://35.188.37.10
Expand All @@ -46,23 +50,25 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work
ps_list.append("{name}-ps-{index}:2222".format(name=job_name, index=i))
worker_list = []
for i in range(num_workers):
worker_list.append("{name}-worker-{index}:2222".format(name=job_name, index=i))
worker_list.append("{name}-worker-{index}:2222".format(
name=job_name, index=i))
cluster_spec = {
"chief": chief_list,
"ps": ps_list,
"worker": worker_list,
}

for i in range(num_replicas):
full_target = "{name}-{replica}-{index}".format(name=job_name, replica=replica.lower(), index=i)
full_target = "{name}-{replica}-{index}".format(
name=job_name, replica=replica.lower(), index=i)
actual_config = get_runconfig(master_host, namespace, full_target)
expected_config = {
"task_type": replica,
"task_id": i,
"cluster_spec": cluster_spec,
"is_chief": is_chief,
"master": "grpc://{target}:2222".format(target=full_target),
"num_worker_replicas": num_workers + 1, # Chief is also a worker
"num_worker_replicas": num_workers + 1, # Chief is also a worker
"num_ps_replicas": num_ps,
}
# Compare expected and actual configs
Expand All @@ -74,14 +80,16 @@ def verify_runconfig(master_host, namespace, job_name, replica, num_ps, num_work


class EstimatorRunconfigTests(test_util.TestCase):

def __init__(self, args):
namespace, name, env = test_runner.parse_runtime_params(args)
self.app_dir = args.app_dir
self.env = env
self.namespace = namespace
self.tfjob_version = args.tfjob_version
self.params = args.params
super(EstimatorRunconfigTests, self).__init__(class_name="EstimatorRunconfigTests", name=name)
super(EstimatorRunconfigTests, self).__init__(
class_name="EstimatorRunconfigTests", name=name)

# Run a TFJob, verify that the TensorFlow runconfig specs are set correctly.
def test_tfjob_and_verify_runconfig(self):
Expand All @@ -90,7 +98,8 @@ def test_tfjob_and_verify_runconfig(self):
component = COMPONENT_NAME + "_" + self.tfjob_version

# Setup the ksonnet app
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component, self.params)
ks_util.setup_ks_app(self.app_dir, self.env, self.namespace, component,
self.params)

# Create the TF job
util.run(["ks", "apply", self.env, "-c", component], cwd=self.app_dir)
Expand All @@ -99,24 +108,34 @@ def test_tfjob_and_verify_runconfig(self):
# Wait for the job to either be in Running state or a terminal state
logging.info("Wait for conditions Running, Succeeded, or Failed")
results = tf_job_client.wait_for_condition(
api_client, self.namespace, self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version, status_callback=tf_job_client.log_status)
api_client,
self.namespace,
self.name, ["Running", "Succeeded", "Failed"],
version=self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Current TFJob:\n %s", json.dumps(results, indent=2))

num_ps = results.get("spec", {}).get("tfReplicaSpecs", {}).get(
"PS", {}).get("replicas", 0)
num_ps = results.get("spec", {}).get("tfReplicaSpecs",
{}).get("PS", {}).get("replicas", 0)
gaocegege marked this conversation as resolved.
Show resolved Hide resolved
num_workers = results.get("spec", {}).get("tfReplicaSpecs", {}).get(
"Worker", {}).get("replicas", 0)
verify_runconfig(masterHost, self.namespace, self.name, "chief", num_ps, num_workers)
verify_runconfig(masterHost, self.namespace, self.name, "worker", num_ps, num_workers)
verify_runconfig(masterHost, self.namespace, self.name, "ps", num_ps, num_workers)
verify_runconfig(masterHost, self.namespace, self.name, "chief", num_ps,
num_workers)
verify_runconfig(masterHost, self.namespace, self.name, "worker", num_ps,
num_workers)
verify_runconfig(masterHost, self.namespace, self.name, "ps", num_ps,
num_workers)

tf_job_client.terminate_replicas(api_client, self.namespace, self.name, "chief", 1)
tf_job_client.terminate_replicas(api_client, self.namespace, self.name,
"chief", 1)

# Wait for the job to complete.
logging.info("Waiting for job to finish.")
results = tf_job_client.wait_for_job(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)
logging.info("Final TFJob:\n %s", json.dumps(results, indent=2))

Expand All @@ -126,12 +145,17 @@ def test_tfjob_and_verify_runconfig(self):
logging.error(self.failure)

# Delete the TFJob.
tf_job_client.delete_tf_job(api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.", self.name,
self.namespace)
tf_job_client.delete_tf_job(
api_client, self.namespace, self.name, version=self.tfjob_version)
logging.info("Waiting for job %s in namespaces %s to be deleted.",
self.name, self.namespace)
tf_job_client.wait_for_delete(
api_client, self.namespace, self.name, self.tfjob_version,
api_client,
self.namespace,
self.name,
self.tfjob_version,
status_callback=tf_job_client.log_status)


if __name__ == "__main__":
test_runner.main(module=__name__)
Loading