Skip to content

Commit

Permalink
add expectation-related functions for other resources used in mpi-con…
Browse files Browse the repository at this point in the history
…troller (#1484)

* fix comments for mpi-controller

* Update pkg/controller.v1/mpi/mpijob_controller.go

Co-authored-by: Yuan Tang <[email protected]>

* Update pkg/controller.v1/mpi/mpijob_controller.go

Co-authored-by: Yuan Tang <[email protected]>

Co-authored-by: Yuan Tang <[email protected]>
  • Loading branch information
zw0610 and terrytangyuan authored Dec 15, 2021
1 parent 49d032b commit b51bfda
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 24 deletions.
9 changes: 3 additions & 6 deletions pkg/common/util/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ import (
"fmt"
"reflect"

"github.com/sirupsen/logrus"

commonutil "github.com/kubeflow/common/pkg/util"

"github.com/kubeflow/common/pkg/controller.v1/common"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
commonutil "github.com/kubeflow/common/pkg/util"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down
133 changes: 133 additions & 0 deletions pkg/common/util/reconciler_generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License

package util

import (
"fmt"
"reflect"
"strings"

"github.com/kubeflow/common/pkg/controller.v1/common"
log "github.com/sirupsen/logrus"

commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/expectation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
)

// GenExpectationGenericKey generates an expectation key for {Kind} of a job
func GenExpectationGenericKey(jobKey string, replicaType string, pl string) string {
return jobKey + "/" + strings.ToLower(replicaType) + "/" + pl
}

// LoggerForGenericKind generates log entry for generic Kubernetes resource Kind
func LoggerForGenericKind(obj metav1.Object, kind string) *log.Entry {
job := ""
if controllerRef := metav1.GetControllerOf(obj); controllerRef != nil {
if controllerRef.Kind == kind {
job = obj.GetNamespace() + "." + controllerRef.Name
}
}
return log.WithFields(log.Fields{
// We use job to match the key used in controller.go
// In controller.go we log the key used with the workqueue.
"job": job,
kind: obj.GetNamespace() + "." + obj.GetName(),
"uid": obj.GetUID(),
})
}

// OnDependentCreateFuncGeneric modify expectations when dependent (pod/service) creation observed.
func OnDependentCreateFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.CreateEvent) bool {
return func(e event.CreateEvent) bool {
rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
var expectKey string
pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s"
expectKey = GenExpectationGenericKey(jobKey, rtype, pl)
exp.CreationObserved(expectKey)
return true
}

return true
}
}

// OnDependentUpdateFuncGeneric modify expectations when dependent (pod/service) update observed.
func OnDependentUpdateFuncGeneric(jc *common.JobController) func(updateEvent event.UpdateEvent) bool {
return func(e event.UpdateEvent) bool {
newObj := e.ObjectNew
oldObj := e.ObjectOld
if newObj.GetResourceVersion() == oldObj.GetResourceVersion() {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return false
}

kind := jc.Controller.GetAPIGroupVersionKind().Kind
var logger = LoggerForGenericKind(newObj, kind)

newControllerRef := metav1.GetControllerOf(newObj)
oldControllerRef := metav1.GetControllerOf(oldObj)
controllerRefChanged := !reflect.DeepEqual(newControllerRef, oldControllerRef)

if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := resolveControllerRef(jc, oldObj.GetName(), oldControllerRef); job != nil {
logger.Infof("%s controller ref updated: %v, %v", kind, newObj, oldObj)
return true
}
}

// If it has a controller ref, that's all that matters.
if newControllerRef != nil {
job := resolveControllerRef(jc, newObj.GetNamespace(), newControllerRef)
if job == nil {
return false
}
logger.Debugf("%s has a controller ref: %v, %v", kind, newObj, oldObj)
return true
}
return false
}
}

// OnDependentDeleteFuncGeneric modify expectations when dependent (pod/service) deletion observed.
func OnDependentDeleteFuncGeneric(exp expectation.ControllerExpectationsInterface) func(event.DeleteEvent) bool {
return func(e event.DeleteEvent) bool {

rtype := e.Object.GetLabels()[commonv1.ReplicaTypeLabel]
if len(rtype) == 0 {
return false
}

if controllerRef := metav1.GetControllerOf(e.Object); controllerRef != nil {
jobKey := fmt.Sprintf("%s/%s", e.Object.GetNamespace(), controllerRef.Name)
pl := strings.ToLower(e.Object.GetObjectKind().GroupVersionKind().Kind) + "s"
var expectKey = GenExpectationGenericKey(jobKey, rtype, pl)

exp.DeletionObserved(expectKey)
return true
}

return true
}
}
66 changes: 48 additions & 18 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -130,15 +129,12 @@ func (r *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr

if err = validation.ValidateV1MpiJobSpec(&mpijob.Spec); err != nil {
logger.Info(err.Error(), "MPIJob failed validation", req.NamespacedName.String())
return ctrl.Result{}, err
}

// Check if reconciliation is needed
jobKey, err := common.KeyFunc(mpijob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get jobKey for job object %#v: %v", mpijob, err))
}
replicaTypes := util.GetReplicaTypes(mpijob.Spec.MPIReplicaSpecs)
needReconcile := util.SatisfiedExpectations(r.Expectations, jobKey, replicaTypes)
// In the new reconcile mode, we will always proceed the reconciling and let each `createOrUpdate` function
// to determine if updating/creating is needed
needReconcile := true

if !needReconcile || mpijob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, nil
Expand Down Expand Up @@ -187,22 +183,58 @@ func (r *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

// inject watching for job related service
if err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{
// inject watching for job related ConfigMap
if err = c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &mpiv1.MPIJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFunc(r.Expectations),
UpdateFunc: util.OnDependentUpdateFunc(&r.JobController),
DeleteFunc: util.OnDependentDeleteFunc(r.Expectations),
CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations),
UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController),
DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations),
}); err != nil {
return err
}

// inject watching for job related Role
if err = c.Watch(&source.Kind{Type: &rbacv1.Role{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &mpiv1.MPIJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations),
UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController),
DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations),
}); err != nil {
return err
}

// inject watching for job related RoleBinding
if err = c.Watch(&source.Kind{Type: &rbacv1.RoleBinding{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &mpiv1.MPIJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations),
UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController),
DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations),
}); err != nil {
return err
}

// inject watching for job related ServiceAccount
if err = c.Watch(&source.Kind{Type: &corev1.ServiceAccount{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &mpiv1.MPIJob{},
}, predicate.Funcs{
CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations),
UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController),
DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations),
}); err != nil {
return err
}

return nil
}

//mpijob not need delete services
// DeletePodsAndServices is overridden because mpi-reconciler.v1 needs not deleting services
func (r *MPIJobReconciler) DeletePodsAndServices(runPolicy *commonv1.RunPolicy, job interface{}, pods []*corev1.Pod) error {
if len(pods) == 0 {
return nil
Expand All @@ -227,9 +259,7 @@ func (r *MPIJobReconciler) DeletePodsAndServices(runPolicy *commonv1.RunPolicy,
return nil
}

// reconcileServices checks and updates services for each given ReplicaSpec.
// It will requeue the job in case of an error while creating/deleting services.
// mpijob not need services
// ReconcileServices is overridden because mpi-reconciler.v1 does not need to reconcile services
func (jc *MPIJobReconciler) ReconcileServices(
job metav1.Object,
services []*corev1.Service,
Expand All @@ -254,7 +284,7 @@ func (r *MPIJobReconciler) GetGroupNameLabelValue() string {
return mpiv1.GroupVersion.Group
}

// Same as Func (tc *TFController) SetClusterSpec(...) in pod.go
// SetClusterSpec is overridden because no cluster spec is needed for MPIJob
func (r *MPIJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
return nil
}
Expand Down

0 comments on commit b51bfda

Please sign in to comment.