diff --git a/cmd/pytorch-operator/app/server.go b/cmd/pytorch-operator/app/server.go index 8a164a7fd..2b2c716b5 100644 --- a/cmd/pytorch-operator/app/server.go +++ b/cmd/pytorch-operator/app/server.go @@ -37,8 +37,8 @@ import ( informers "github.com/kubeflow/pytorch-operator/pkg/client/informers/externalversions" "github.com/kubeflow/pytorch-operator/pkg/controller" "github.com/kubeflow/pytorch-operator/pkg/util" - "github.com/kubeflow/pytorch-operator/pkg/util/k8sutil" "github.com/kubeflow/pytorch-operator/version" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" ) var ( diff --git a/pkg/controller.v2/pytorch/job.go b/pkg/controller.v2/pytorch/job.go index 9671a8604..50a781d8e 100644 --- a/pkg/controller.v2/pytorch/job.go +++ b/pkg/controller.v2/pytorch/job.go @@ -8,14 +8,16 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" v1alpha2 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1alpha2" pylogger "github.com/kubeflow/tf-operator/pkg/logger" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" ) const ( - failedMarshalPyTorchJobReason = "FailedMarshalPyTorchJob" + failedMarshalPyTorchJobReason = "FailedInvalidPyTorchJobSpec" ) // When a pod is added, set the defaults and enqueue the current pytorchjob. @@ -31,9 +33,42 @@ func (pc *PyTorchController) addPyTorchJob(obj interface{}) { logger.Errorf("Failed to convert the PyTorchJob: %v", err) // Log the failure to conditions. if err == errFailedMarshal { - errMsg := fmt.Sprintf("Failed to unmarshal the object to PyTorchJob object: %v", err) + errMsg := fmt.Sprintf("Failed to unmarshal the object to PyTorchJob: Spec is invalid %v", err) logger.Warn(errMsg) pc.Recorder.Event(un, v1.EventTypeWarning, failedMarshalPyTorchJobReason, errMsg) + + status := v1alpha2.PyTorchJobStatus{ + Conditions: []v1alpha2.PyTorchJobCondition{ + v1alpha2.PyTorchJobCondition{ + Type: v1alpha2.PyTorchJobFailed, + Status: v1.ConditionTrue, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: failedMarshalPyTorchJobReason, + Message: errMsg, + }, + }, + } + + statusMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&status) + + if err != nil { + logger.Errorf("Could not covert the PyTorchJobStatus to unstructured; %v", err) + return + } + + client, err := k8sutil.NewCRDRestClient(&v1alpha2.SchemeGroupVersion) + + if err == nil { + metav1unstructured.SetNestedField(un.Object, statusMap, "status") + logger.Infof("Updating the job to; %+v", un.Object) + err = client.Update(un, v1alpha2.Plural) + if err != nil { + logger.Errorf("Could not update the PyTorchJob; %v", err) + } + } else { + logger.Errorf("Could not create a REST client to update the PyTorchJob") + } } return } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 51488b774..1e9575510 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -30,7 +30,7 @@ import ( "k8s.io/client-go/tools/record" torchv1alpha1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1alpha1" - "github.com/kubeflow/pytorch-operator/pkg/util/k8sutil" + "github.com/kubeflow/tf-operator/pkg/util/k8sutil" // TOOO(jlewi): Rename to apiErrors "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/helper" "github.com/kubeflow/pytorch-operator/pkg/util" diff --git a/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/client.go b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/client.go new file mode 100644 index 000000000..5caa78cd2 --- /dev/null +++ b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/client.go @@ -0,0 +1,82 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8sutil + +import ( + "fmt" + "net/http" + + tflogger "github.com/kubeflow/tf-operator/pkg/logger" + metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +// CRDRestClient defines an interface for working with CRDs using the REST client. +// In most cases we want to use the auto-generated clientset for specific CRDs. +// The only exception is when the CRD spec is invalid and we can't parse the type into the corresponding +// go struct. +type CRDClient interface { + // Update a TfJob. + Update(obj *metav1unstructured.Unstructured) error +} + +// CRDRestClient uses the Kubernetes rest interface to talk to the CRD. +type CRDRestClient struct { + restcli *rest.RESTClient +} + +func NewCRDRestClient(version *schema.GroupVersion) (*CRDRestClient, error) { + config, err := GetClusterConfig() + if err != nil { + return nil, err + } + config.GroupVersion = version + config.APIPath = "/apis" + config.ContentType = runtime.ContentTypeJSON + config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} + + restcli, err := rest.RESTClientFor(config) + if err != nil { + return nil, err + } + + cli := &CRDRestClient{ + restcli: restcli, + } + return cli, nil +} + +// HttpClient returns the http client used. +func (c *CRDRestClient) Client() *http.Client { + return c.restcli.Client +} + +func (c *CRDRestClient) Update(obj *metav1unstructured.Unstructured, plural string) error { + logger := tflogger.LoggerForUnstructured(obj, obj.GetKind()) + // TODO(jlewi): Can we just call obj.GetKind() to get the kind? I think that will return the singular + // not plural will that work? + if plural == "" { + logger.Errorf("Could not issue update because plural not set.") + return fmt.Errorf("plural must be set") + } + r := c.restcli.Put().Resource(plural).Namespace(obj.GetNamespace()).Name(obj.GetName()).Body(obj) + _, err := r.DoRaw() + if err != nil { + logger.Errorf("Could not issue update using URL: %v; error; %v", r.URL().String(), err) + } + return err +} diff --git a/pkg/util/k8sutil/k8sutil.go b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go similarity index 81% rename from pkg/util/k8sutil/k8sutil.go rename to vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go index 8cb6a3f3a..ceaf8c761 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/vendor/github.com/kubeflow/tf-operator/pkg/util/k8sutil/k8sutil.go @@ -21,23 +21,22 @@ import ( log "github.com/sirupsen/logrus" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // for gcp auth "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - - torchv1alpha1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1alpha1" ) +// RecommendedConfigPathEnvVar is a environment variable for path configuration const RecommendedConfigPathEnvVar = "KUBECONFIG" // TODO(jlewi): I think this function is used to add an owner to a resource. I think we we should use this -// method to ensure all resources created for the TFJob are owned by the TFJob. +// addOwnerRefToObject method to ensure all resources created for the TFJob are owned by the TFJob. func addOwnerRefToObject(o metav1.Object, r metav1.OwnerReference) { o.SetOwnerReferences(append(o.GetOwnerReferences(), r)) } +// MustNewKubeClient returns new kubernetes client for cluster configuration func MustNewKubeClient() kubernetes.Interface { cfg, err := GetClusterConfig() if err != nil { @@ -46,7 +45,7 @@ func MustNewKubeClient() kubernetes.Interface { return kubernetes.NewForConfigOrDie(cfg) } -// Obtain the config from the Kube configuration used by kubeconfig, or from k8s cluster. +// GetClusterConfig obtain the config from the Kube configuration used by kubeconfig, or from k8s cluster. func GetClusterConfig() (*rest.Config, error) { if len(os.Getenv(RecommendedConfigPathEnvVar)) > 0 { // use the current context in kubeconfig @@ -73,30 +72,18 @@ func GetClusterConfig() (*rest.Config, error) { return rest.InClusterConfig() } +// IsKubernetesResourceAlreadyExistError throws error when kubernetes resources already exist. func IsKubernetesResourceAlreadyExistError(err error) bool { return apierrors.IsAlreadyExists(err) } +// IsKubernetesResourceNotFoundError throws error when there is no kubernetes resource found. func IsKubernetesResourceNotFoundError(err error) bool { return apierrors.IsNotFound(err) } -// We are using internal api types for cluster related. -func JobListOpt(clusterName string) metav1.ListOptions { - return metav1.ListOptions{ - LabelSelector: labels.SelectorFromSet(LabelsForJob(clusterName)).String(), - } -} - -func LabelsForJob(jobName string) map[string]string { - return map[string]string{ - // TODO(jlewi): Need to set appropriate labels for TF. - "pytorch_job": jobName, - "app": torchv1alpha1.AppLabel, - } -} - // TODO(jlewi): CascadeDeletOptions are part of garbage collection policy. +// CascadeDeleteOptions deletes the workload after the grace period // Do we want to use this? See // https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/ func CascadeDeleteOptions(gracePeriodSeconds int64) *metav1.DeleteOptions {