Skip to content

Commit

Permalink
If a TFJob spec is invalid mark the job as failed with an appropriate…
Browse files Browse the repository at this point in the history
… condition.

* If a TFJob spec is invalid (e.g. can't be marshaled to TFJob YAML) we
  want to update the TFJob status to indicate it failed.

* We need to use the REST API to update the TFJob status because we won't
  be able to deserialize the json to TFJob.

* Related to kubeflow#755

* I created invalid-tfjob.jsonnet which can be used in an E2E test but
  I haven't included the E2E test in this PR.

* I tested it manually and got the following result

apiVersion: kubeflow.org/v1alpha2
kind: TFJob
metadata:
  clusterName: ""
  creationTimestamp: 2018-08-31T23:37:14Z
  generation: 1
  labels:
    app.kubernetes.io/deploy-manager: ksonnet
    ksonnet.io/component: invalid-tfjob
  name: invalid-tfjob
  namespace: kubeflow
  resourceVersion: "1826961"
  selfLink: /apis/kubeflow.org/v1alpha2/namespaces/kubeflow/tfjobs/invalid-tfjob
  uid: ca7b4b02-ad76-11e8-be57-42010a8e0084
spec:
  notTheActualField:
    Ps:
      replicas: 2
      restartPolicy: Never
      template:
        spec:
          containers:
          - image: busybox
            name: tensorflow
    Worker:
      replicas: 4
      restartPolicy: Never
      template:
        spec:
          containers:
          - image: busybox
            name: tensorflow
status:
  conditions:
  - lastTransitionTime: 2018-08-31T23:37:14Z
    lastUpdateTime: 2018-08-31T23:37:14Z
    message: 'Failed to marshal the object to TFJob; the spec is invalid: Failed to
      marshal the object to TFJob'
    reason: FailedInvalidTFJobSpec
    status: "True"
    type: Failed
  tfReplicaStatuses: null
  • Loading branch information
jlewi committed Aug 31, 2018
1 parent 1bc8425 commit 13e554d
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 2 deletions.
40 changes: 38 additions & 2 deletions pkg/controller.v2/tensorflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tflogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/kubeflow/tf-operator/pkg/util/k8sutil"
"k8s.io/apimachinery/pkg/runtime"
)

const (
failedMarshalTFJobReason = "FailedMarshalTFJob"
failedMarshalTFJobReason = "FailedInvalidTFJobSpec"
)

// When a pod is added, set the defaults and enqueue the current tfjob.
Expand All @@ -31,9 +33,43 @@ func (tc *TFController) addTFJob(obj interface{}) {
logger.Errorf("Failed to convert the TFJob: %v", err)
// Log the failure to conditions.
if err == errFailedMarshal {
errMsg := fmt.Sprintf("Failed to unmarshal the object to TFJob object: %v", err)
errMsg := fmt.Sprintf("Failed to marshal the object to TFJob; the spec is invalid: %v", err)
logger.Warn(errMsg)
// TODO(jlewi): v1 doesn't appear to define an error type.
tc.Recorder.Event(un, v1.EventTypeWarning, failedMarshalTFJobReason, errMsg)

status:= tfv1alpha2.TFJobStatus{
Conditions: []tfv1alpha2.TFJobCondition{
tfv1alpha2.TFJobCondition{
Type: tfv1alpha2.TFJobFailed,
Status: v1.ConditionTrue,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Reason: failedMarshalTFJobReason,
Message: errMsg,
},
},
}

statusMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&status)

if err != nil {
logger.Errorf("Could not covert the TFJobStatus to unstructured; %v", err)
return
}

client, err := k8sutil.NewCRDRestClient(&tfv1alpha2.SchemeGroupVersion)

if err == nil {
metav1unstructured.SetNestedField(un.Object, statusMap, "status")
logger.Infof("Updating the job to; %+v", un.Object)
err = client.Update(un, tfv1alpha2.Plural)
if err != nil {
logger.Errorf("Could not update the TFJob; %v", err)
}
} else {
logger.Errorf("Could not create a REST client to update the TFJob")
}
}
return
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/util/k8sutil/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sutil

import (
"fmt"
"net/http"

tflogger "github.com/kubeflow/tf-operator/pkg/logger"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/rest"
"k8s.io/client-go/kubernetes/scheme"
metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// CRDRestClient defines an interface for working with CRDs using the REST client.
// In most cases we want to use the auto-generated clientset for specific CRDs.
// The only exception is when the CRD spec is invalid and we can't parse the type into the corresponding
// go struct.
type CRDClient interface {
// Update a TfJob.
Update(obj *metav1unstructured.Unstructured) (error)
}

// CRDRestClient uses the Kubernetes rest interface to talk to the CRD.
type CRDRestClient struct {
restcli *rest.RESTClient
}

func NewCRDRestClient(version *schema.GroupVersion) (*CRDRestClient, error) {
config, err := GetClusterConfig()
if err != nil {
return nil, err
}
config.GroupVersion = version
config.APIPath = "/apis"
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}

restcli, err := rest.RESTClientFor(config)
if err != nil {
return nil, err
}

cli := &CRDRestClient{
restcli: restcli,
}
return cli, nil
}

// HttpClient returns the http client used.
func (c *CRDRestClient) Client() *http.Client {
return c.restcli.Client
}

func (c *CRDRestClient) Update(obj *metav1unstructured.Unstructured, plural string) (error) {
logger := tflogger.LoggerForUnstructured(obj, obj.GetKind())
// TODO(jlewi): Can we just call obj.GetKind() to get the kind? I think that will return the singular
// not plural will that work?
if plural == "" {
logger.Errorf("Could not issue update because plural not set.")
return fmt.Errorf("plural must be set")
}
r := c.restcli.Put().Resource(plural).Namespace(obj.GetNamespace()).Name(obj.GetName()).Body(obj)
_, err := r.DoRaw()
if err != nil {
logger.Errorf("Could not issue update using URL: %v; error; %v", r.URL().String(), err)
}
return err
}
50 changes: 50 additions & 0 deletions test/workflows/components/invalid-tfjob.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// This is a test job to ensure we correctly handle the case where the job spec is not
// a valid TFJob and therefore can't be unmarshled to a TFJob struct.
// In this case we want to check that the TFJob status is updated correctly to reflect this.
//
local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["invalid-tfjob"];

local k = import "k.libsonnet";


local name = params.name;
local namespace = env.namespace;

local podTemplate = {
spec: {
containers: [
{
name: "tensorflow",
// image doesn't matter because we won't actually create the pods
image: "busybox",
},
],
},
};

local job = {
apiVersion: "kubeflow.org/v1alpha2",
kind: "TFJob",
metadata: {
name: name,
namespace: namespace,
},
spec: {
// Provide invalid json
notTheActualField: {
Ps: {
replicas: 2,
restartPolicy: "Never",
template: podTemplate,
},
Worker: {
replicas: 4,
restartPolicy: "Never",
template: podTemplate,
},
},
},
}; // job.

std.prune(k.core.v1.list.new([job]))
6 changes: 6 additions & 0 deletions test/workflows/components/params.libsonnet
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
global: {},
// TODO(jlewi): Having the component name not match the TFJob name is confusing.
// Job names can't have hyphens in the name. Moving forward we should use hyphens
// not underscores in component names.
components: {
// Component-level parameters, defined initially from 'ks prototype use ...'
// Each object below should correspond to a component in the components/ directory
Expand Down Expand Up @@ -61,5 +64,8 @@
namespace: "kubeflow-test-infra",
image: "",
},
"invalid-tfjob": {
name: "invalid-tfjob",
},
},
}

0 comments on commit 13e554d

Please sign in to comment.