From 454c5c1aac7bd734fede009c4b201cfd1c58e3d8 Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Fri, 31 Aug 2018 22:19:08 -0700 Subject: [PATCH] If a TFJob spec is invalid mark the job as failed with an appropriate condition. (#815) * If a TFJob spec is invalid (e.g. can't be marshaled to TFJob YAML) we want to update the TFJob status to indicate it failed. * We need to use the REST API to update the TFJob status because we won't be able to deserialize the json to TFJob. * Related to #755 * I created invalid-tfjob.jsonnet which can be used in an E2E test but I haven't included the E2E test in this PR. * I tested it manually and got the following result apiVersion: kubeflow.org/v1alpha2 kind: TFJob metadata: clusterName: "" creationTimestamp: 2018-08-31T23:37:14Z generation: 1 labels: app.kubernetes.io/deploy-manager: ksonnet ksonnet.io/component: invalid-tfjob name: invalid-tfjob namespace: kubeflow resourceVersion: "1826961" selfLink: /apis/kubeflow.org/v1alpha2/namespaces/kubeflow/tfjobs/invalid-tfjob uid: ca7b4b02-ad76-11e8-be57-42010a8e0084 spec: notTheActualField: Ps: replicas: 2 restartPolicy: Never template: spec: containers: - image: busybox name: tensorflow Worker: replicas: 4 restartPolicy: Never template: spec: containers: - image: busybox name: tensorflow status: conditions: - lastTransitionTime: 2018-08-31T23:37:14Z lastUpdateTime: 2018-08-31T23:37:14Z message: 'Failed to marshal the object to TFJob; the spec is invalid: Failed to marshal the object to TFJob' reason: FailedInvalidTFJobSpec status: "True" type: Failed tfReplicaStatuses: null * Add an E2E test; test_runner.py was getting overly complicated so I created a new main file to run the test and just call methods in test_runner.py as needed. --- pkg/controller.v2/tensorflow/job.go | 40 +++++- pkg/util/k8sutil/client.go | 82 ++++++++++++ py/test_invalid_job.py | 126 ++++++++++++++++++ py/test_runner.py | 6 +- .../components/invalid-tfjob.jsonnet | 50 +++++++ test/workflows/components/params.libsonnet | 6 + test/workflows/components/workflows.libsonnet | 28 +++- 7 files changed, 332 insertions(+), 6 deletions(-) create mode 100644 pkg/util/k8sutil/client.go create mode 100644 py/test_invalid_job.py create mode 100644 test/workflows/components/invalid-tfjob.jsonnet diff --git a/pkg/controller.v2/tensorflow/job.go b/pkg/controller.v2/tensorflow/job.go index 60a21ab41f..f6219a498a 100644 --- a/pkg/controller.v2/tensorflow/job.go +++ b/pkg/controller.v2/tensorflow/job.go @@ -12,10 +12,12 @@ import ( tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2" tflogger "github.com/kubeflow/tf-operator/pkg/logger" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" + "k8s.io/apimachinery/pkg/runtime" ) const ( - failedMarshalTFJobReason = "FailedMarshalTFJob" + failedMarshalTFJobReason = "FailedInvalidTFJobSpec" ) // When a pod is added, set the defaults and enqueue the current tfjob. @@ -31,9 +33,43 @@ func (tc *TFController) addTFJob(obj interface{}) { logger.Errorf("Failed to convert the TFJob: %v", err) // Log the failure to conditions. if err == errFailedMarshal { - errMsg := fmt.Sprintf("Failed to unmarshal the object to TFJob object: %v", err) + errMsg := fmt.Sprintf("Failed to marshal the object to TFJob; the spec is invalid: %v", err) logger.Warn(errMsg) + // TODO(jlewi): v1 doesn't appear to define an error type. tc.Recorder.Event(un, v1.EventTypeWarning, failedMarshalTFJobReason, errMsg) + + status := tfv1alpha2.TFJobStatus{ + Conditions: []tfv1alpha2.TFJobCondition{ + tfv1alpha2.TFJobCondition{ + Type: tfv1alpha2.TFJobFailed, + Status: v1.ConditionTrue, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: failedMarshalTFJobReason, + Message: errMsg, + }, + }, + } + + statusMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&status) + + if err != nil { + logger.Errorf("Could not covert the TFJobStatus to unstructured; %v", err) + return + } + + client, err := k8sutil.NewCRDRestClient(&tfv1alpha2.SchemeGroupVersion) + + if err == nil { + metav1unstructured.SetNestedField(un.Object, statusMap, "status") + logger.Infof("Updating the job to; %+v", un.Object) + err = client.Update(un, tfv1alpha2.Plural) + if err != nil { + logger.Errorf("Could not update the TFJob; %v", err) + } + } else { + logger.Errorf("Could not create a REST client to update the TFJob") + } } return } diff --git a/pkg/util/k8sutil/client.go b/pkg/util/k8sutil/client.go new file mode 100644 index 0000000000..5caa78cd24 --- /dev/null +++ b/pkg/util/k8sutil/client.go @@ -0,0 +1,82 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sutil + +import ( + "fmt" + "net/http" + + tflogger "github.com/kubeflow/tf-operator/pkg/logger" + metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +// CRDRestClient defines an interface for working with CRDs using the REST client. +// In most cases we want to use the auto-generated clientset for specific CRDs. +// The only exception is when the CRD spec is invalid and we can't parse the type into the corresponding +// go struct. +type CRDClient interface { + // Update a TfJob. + Update(obj *metav1unstructured.Unstructured) error +} + +// CRDRestClient uses the Kubernetes rest interface to talk to the CRD. +type CRDRestClient struct { + restcli *rest.RESTClient +} + +func NewCRDRestClient(version *schema.GroupVersion) (*CRDRestClient, error) { + config, err := GetClusterConfig() + if err != nil { + return nil, err + } + config.GroupVersion = version + config.APIPath = "/apis" + config.ContentType = runtime.ContentTypeJSON + config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} + + restcli, err := rest.RESTClientFor(config) + if err != nil { + return nil, err + } + + cli := &CRDRestClient{ + restcli: restcli, + } + return cli, nil +} + +// HttpClient returns the http client used. +func (c *CRDRestClient) Client() *http.Client { + return c.restcli.Client +} + +func (c *CRDRestClient) Update(obj *metav1unstructured.Unstructured, plural string) error { + logger := tflogger.LoggerForUnstructured(obj, obj.GetKind()) + // TODO(jlewi): Can we just call obj.GetKind() to get the kind? I think that will return the singular + // not plural will that work? + if plural == "" { + logger.Errorf("Could not issue update because plural not set.") + return fmt.Errorf("plural must be set") + } + r := c.restcli.Put().Resource(plural).Namespace(obj.GetNamespace()).Name(obj.GetName()).Body(obj) + _, err := r.DoRaw() + if err != nil { + logger.Errorf("Could not issue update using URL: %v; error; %v", r.URL().String(), err) + } + return err +} diff --git a/py/test_invalid_job.py b/py/test_invalid_job.py new file mode 100644 index 0000000000..cccd9326af --- /dev/null +++ b/py/test_invalid_job.py @@ -0,0 +1,126 @@ +"""Run an E2E test to verify invalid TFJobs are handled correctly. + +If a TFJob is invalid it should be marked as failed with an appropriate +error message. +""" + +import argparse +import logging +import json +import os +import re +import retrying + +from kubernetes import client as k8s_client + +from kubeflow.testing import test_helper +from kubeflow.testing import util +from py import test_runner +from py import test_util +from py import tf_job_client +from py import util as tf_operator_util + +# One of the reasons we set so many retries and a random amount of wait +# between retries is because we have multiple tests running in parallel +# that are all modifying the same ksonnet app via ks. I think this can +# lead to failures. +@retrying.retry(stop_max_attempt_number=10, wait_random_min=1000, + wait_random_max=10000) +def run_test(args, test_case): # pylint: disable=too-many-branches,too-many-statements + """Run a test.""" + util.load_kube_config() + + api_client = k8s_client.ApiClient() + + t = test_util.TestCase() + t.class_name = "tfjob_test" + namespace, name, env = test_runner.setup_ks_app(args) + t.name = os.path.basename(name) + + try: # pylint: disable=too-many-nested-blocks + util.run(["ks", "apply", env, "-c", args.component], cwd=args.app_dir) + + logging.info("Created job %s in namespaces %s", name, namespace) + + logging.info("Wait for conditions Failed") + results = tf_job_client.wait_for_condition( + api_client, namespace, name, ["Succeeded", "Failed"], + status_callback=tf_job_client.log_status) + + logging.info("Final TFJob:\n %s", json.dumps(results, indent=2)) + + # For v1alpha2 check for non-empty completionTime + last_condition = results.get("status", {}).get("conditions", [])[-1] + if last_condition.get("type", "").lower() != "failed": + message = "Job {0} in namespace {1} did not fail; status {2}".format( + name, namespace, results.get("status", {})) + logging.error(message) + test_case.add_failure_info(message) + return + + pattern = ".*the spec is invalid.*" + condition_message = last_condition.get("message", "") + if not re.match(pattern, condition_message): + message = "Condition message {0} did not match pattern {1}".format( + condition_message, pattern) + logging.error(message) + test_case.add_failure_info(message) + except tf_operator_util.JobTimeoutError as e: + if e.job: + spec = "Job:\n" + json.dumps(e.job, indent=2) + else: + spec = "JobTimeoutError did not contain job" + message = ("Timeout waiting for {0} in namespace {1} to finish; ").format( + name, namespace) + spec + logging.exception(message) + test_case.add_failure_info(message) + except Exception as e: # pylint: disable-msg=broad-except + # TODO(jlewi): I'm observing flakes where the exception has message "status" + # in an effort to try to nail down this exception we print out more + # information about the exception. + message = "There was a problem running the job; Exception {0}".format(e) + logging.exception(message) + test_case.add_failure_info(message) + +def parse_args(): + """Parase arguments.""" + parser = argparse.ArgumentParser(description="Run a TFJob test.") + + parser.add_argument( + "--app_dir", + default=None, + type=str, + help="Directory containing the ksonnet app.") + + parser.add_argument( + "--component", + default="invalid-tfjob", + type=str, + help="The ksonnet component of the job to run.") + + parser.add_argument( + "--params", + default=None, + type=str, + help="Comma separated list of key value pairs to set on the component.") + + # parse the args and call whatever function was selected + args, _ = parser.parse_known_args() + + return args + +def test_invalid_job(test_case): # pylint: disable=redefined-outer-name + args = parse_args() + util.maybe_activate_service_account() + + run_test(args, test_case) + +def main(): + test_case = test_helper.TestCase( + name="test_invalid_job", test_func=test_invalid_job) + test_suite = test_helper.init( + name="test_invalid_job", test_cases=[test_case]) + test_suite.run() + +if __name__ == "__main__": + main() diff --git a/py/test_runner.py b/py/test_runner.py index 5f9fdf2d0d..b77b0eae39 100644 --- a/py/test_runner.py +++ b/py/test_runner.py @@ -325,7 +325,7 @@ def terminateReplica(masterHost, namespace, target, exitCode=0): logging.info("URL %s returned; %s", url, r.content) -def _setup_ks_app(args): +def setup_ks_app(args): """Setup the ksonnet app""" salt = uuid.uuid4().hex[0:4] @@ -334,7 +334,7 @@ def _setup_ks_app(args): lock = filelock.FileLock(lock_file, timeout=60) with lock: # Create a new environment for this run - if args.environment: + if "environment" in args and args.environment: env = args.environment else: env = "test-env-{0}".format(salt) @@ -401,7 +401,7 @@ def run_test(args): # pylint: disable=too-many-branches,too-many-statements t = test_util.TestCase() t.class_name = "tfjob_test" - namespace, name, env = _setup_ks_app(args) + namespace, name, env = setup_ks_app(args) t.name = os.path.basename(name) start = time.time() diff --git a/test/workflows/components/invalid-tfjob.jsonnet b/test/workflows/components/invalid-tfjob.jsonnet new file mode 100644 index 0000000000..7b1d444788 --- /dev/null +++ b/test/workflows/components/invalid-tfjob.jsonnet @@ -0,0 +1,50 @@ +// This is a test job to ensure we correctly handle the case where the job spec is not +// a valid TFJob and therefore can't be unmarshled to a TFJob struct. +// In this case we want to check that the TFJob status is updated correctly to reflect this. +// +local env = std.extVar("__ksonnet/environments"); +local params = std.extVar("__ksonnet/params").components["invalid-tfjob"]; + +local k = import "k.libsonnet"; + + +local name = params.name; +local namespace = env.namespace; + +local podTemplate = { + spec: { + containers: [ + { + name: "tensorflow", + // image doesn't matter because we won't actually create the pods + image: "busybox", + }, + ], + }, +}; + +local job = { + apiVersion: "kubeflow.org/v1alpha2", + kind: "TFJob", + metadata: { + name: name, + namespace: namespace, + }, + spec: { + // Provide invalid json + notTheActualField: { + Ps: { + replicas: 2, + restartPolicy: "Never", + template: podTemplate, + }, + Worker: { + replicas: 4, + restartPolicy: "Never", + template: podTemplate, + }, + }, + }, +}; // job. + +std.prune(k.core.v1.list.new([job])) diff --git a/test/workflows/components/params.libsonnet b/test/workflows/components/params.libsonnet index 547f72e5c8..12e351f2b6 100644 --- a/test/workflows/components/params.libsonnet +++ b/test/workflows/components/params.libsonnet @@ -1,5 +1,8 @@ { global: {}, + // TODO(jlewi): Having the component name not match the TFJob name is confusing. + // Job names can't have hyphens in the name. Moving forward we should use hyphens + // not underscores in component names. components: { // Component-level parameters, defined initially from 'ks prototype use ...' // Each object below should correspond to a component in the components/ directory @@ -61,5 +64,8 @@ namespace: "kubeflow-test-infra", image: "", }, + "invalid-tfjob": { + name: "invalid-tfjob", + }, }, } diff --git a/test/workflows/components/workflows.libsonnet b/test/workflows/components/workflows.libsonnet index 34742af3a5..a49128ee8b 100644 --- a/test/workflows/components/workflows.libsonnet +++ b/test/workflows/components/workflows.libsonnet @@ -65,6 +65,10 @@ else "gcr.io/kubeflow-ci/test-worker"; + + // value of KUBECONFIG environment variable. This should be a full path. + local kubeConfig = testDir + "/.kube/kubeconfig"; + // The name of the NFS volume claim to use for test files. // local nfsVolumeClaim = "kubeflow-testing"; local nfsVolumeClaim = "nfs-external"; @@ -139,6 +143,12 @@ }, }, }, + { + // We use a directory in our NFS share to store our kube config. + // This way we can configure it on a single step and reuse it on subsequent steps. + name: "KUBECONFIG", + value: kubeConfig, + }, ] + prow_env, volumeMounts: [ { @@ -275,7 +285,14 @@ } else {}, - + if params.tfJobVersion == "v1alpha2" then + { + name: "invalid-tfjob", + template: "invalid-tfjob", + dependencies: ["setup-kubeflow"], + } + else + {}, ], //tasks }, }, @@ -477,6 +494,15 @@ "--verify_clean_pod_policy=None", "--junit_path=" + artifactsDir + "/junit_clean-pod-none-tests.xml", ]), // run clean_pod_none + $.parts(namespace, name).e2e(prow_env, bucket).buildTemplate("invalid-tfjob", [ + "python", + "-m", + "py.test_invalid_job", + "test", + "--app_dir=" + srcDir + "/test/workflows", + "--params=name=invalid-tfjob,namespace=default", + "--junit_path=" + artifactsDir + "/junit_test-invalid-tfjob.xml", + ]), // invalid-tfjob $.parts(namespace, name).e2e(prow_env, bucket).buildTemplate("create-pr-symlink", [ "python", "-m",