Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ownerreferrences to manifest resources #211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 71 additions & 128 deletions controllers/kfdef.apps.kubeflow.org/kfdef_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
ocappsv1 "github.com/openshift/api/apps/v1"
ocbuildv1 "github.com/openshift/api/build/v1"
ocimgv1 "github.com/openshift/api/image/v1"
ofapi "github.com/operator-framework/api/pkg/operators/v1alpha1"
olmclientset "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/typed/operators/v1alpha1"
"io/ioutil"
admv1 "k8s.io/api/admissionregistration/v1"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,33 +36,27 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiserv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"os"
"path"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"strings"

ofapi "github.com/operator-framework/api/pkg/operators/v1alpha1"
olmclientset "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/typed/operators/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
apiserv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kftypesv3 "github.com/opendatahub-io/opendatahub-operator/apis/apps"
kfdefappskubefloworgv1 "github.com/opendatahub-io/opendatahub-operator/apis/kfdef.apps.kubeflow.org/v1"
"github.com/opendatahub-io/opendatahub-operator/pkg/kfapp/coordinator"
kfloaders "github.com/opendatahub-io/opendatahub-operator/pkg/kfconfig/loaders"
kfutils "github.com/opendatahub-io/opendatahub-operator/pkg/utils"
)

const (
Expand All @@ -74,8 +70,10 @@ const (
odhGeneratedNamespaceLabel = "opendatahub.io/generated-namespace"
)

// kfdefInstances keep all KfDef CRs watched by the operator
var kfdefInstances = make(map[string]struct{})
// kfdefInstances maps every instance of KfDef with corresponding name and namespace.
// This is required since we are removing the annotation kfctl.kubeflow.io/kfdef-instance: <kfdef-name>.<kfdef-namespace>
// that had data regarding name and namespace of a KfDef instance.
var kfdefInstances = map[types.UID]types.NamespacedName{}

// Add logger for helper functions
var kfdefLog logr.Logger
Expand Down Expand Up @@ -128,6 +126,12 @@ func (r *KfDefReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
return ctrl.Result{}, err
}

// add to kfdefInstances if not exists
if _, ok := kfdefInstances[instance.GetUID()]; !ok {
kfdefInstances[instance.GetUID()] = request.NamespacedName
}
r.Log.Info("kfDefInstances map updated", "maps", kfdefInstances)

deleted := instance.GetDeletionTimestamp() != nil
finalizers := sets.NewString(instance.GetFinalizers()...)
if deleted {
Expand All @@ -141,19 +145,6 @@ func (r *KfDefReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
}
r.Log.Info("Deleting kfdef instance", "instance", instance.Name)

// Uninstall Kubeflow
err = kfDelete(instance)
if err == nil {
r.Log.Info("KubeFlow Deployment Deleted.")
r.Recorder.Eventf(instance, v1.EventTypeNormal, "KfDefDeletionSuccessful",
"KF instance %s deleted successfully", instance.Name)
} else {
// log an error and continue for cleanup. It does not make sense to retry the delete.
r.Recorder.Eventf(instance, v1.EventTypeWarning, "KfDefDeletionFailed",
"Error deleting KF instance %s", instance.Name)
r.Log.Error(fmt.Errorf("failed to delete Kubeflow"), "", "instance", instance.Name)
}

// Delete the kfapp directory
kfAppDir := path.Join("/tmp", instance.GetNamespace(), instance.GetName())
if err := os.RemoveAll(kfAppDir); err != nil {
Expand All @@ -163,7 +154,7 @@ func (r *KfDefReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
r.Log.Info("kfAppDir deleted.")

// Remove this KfDef instance
delete(kfdefInstances, strings.Join([]string{instance.GetName(), instance.GetNamespace()}, "."))
delete(kfdefInstances, instance.GetUID())

// Remove finalizer once kfDelete is completed.
finalizers.Delete(finalizer)
Expand Down Expand Up @@ -208,24 +199,19 @@ func (r *KfDefReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
}

if hasDeleteConfigMap(r.Client) {
for key, _ := range kfdefInstances {
keyVal := strings.Split(key, ".")
if len(keyVal) == 2 {
instanceName, namespace := keyVal[0], keyVal[1]
currentInstance := &kfdefappskubefloworgv1.KfDef{
ObjectMeta: metav1.ObjectMeta{
Name: instanceName,
Namespace: namespace,
},
}
for _, val := range kfdefInstances {
instanceName, namespace := val.Name, val.Namespace
currentInstance := &kfdefappskubefloworgv1.KfDef{
ObjectMeta: metav1.ObjectMeta{
Name: instanceName,
Namespace: namespace,
},
}

if err := r.Client.Delete(ctx, currentInstance, []client.DeleteOption{}...); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
if err := r.Client.Delete(ctx, currentInstance, []client.DeleteOption{}...); err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
} else {
return ctrl.Result{}, fmt.Errorf("error getting kfdef instance name and namespace")
}

}
Expand All @@ -239,10 +225,10 @@ func (r *KfDefReconciler) Reconcile(ctx context.Context, request ctrl.Request) (
r.Recorder.Eventf(instance, v1.EventTypeNormal, "KfDefCreationSuccessful",
"KfDef instance %s created and deployed successfully", instance.Name)

// add to kfdefInstances if not exists
if _, ok := kfdefInstances[strings.Join([]string{instance.GetName(), instance.GetNamespace()}, ".")]; !ok {
kfdefInstances[strings.Join([]string{instance.GetName(), instance.GetNamespace()}, ".")] = struct{}{}
if _, ok := kfdefInstances[instance.GetUID()]; !ok {
kfdefInstances[instance.GetUID()] = request.NamespacedName
}
r.Log.Info("kfDefInstances map updated", "maps", kfdefInstances)

}

Expand Down Expand Up @@ -324,32 +310,31 @@ func (r *KfDefReconciler) watchKfDef(a client.Object) (requests []reconcile.Requ

// watch is monitoring changes for kfctl resources managed by the operator
func (r *KfDefReconciler) watchKubeflowResources(a client.Object) (requests []reconcile.Request) {
anns := a.GetAnnotations()
kfdefAnn := strings.Join([]string{kfutils.KfDefAnnotation, kfutils.KfDefInstance}, "/")
_, found := anns[kfdefAnn]
if found {
kfdefCr := strings.Split(anns[kfdefAnn], ".")
namespacedName := types.NamespacedName{Name: kfdefCr[0], Namespace: kfdefCr[1]}
instance := &kfdefappskubefloworgv1.KfDef{}
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: kfdefCr[0], Namespace: kfdefCr[1]}, instance)
if err != nil {
if errors.IsNotFound(err) {
// KfDef CR may have been deleted
return nil

for _, owner := range a.GetOwnerReferences() {
if owner.Kind == string(kftypesv3.KFDEF) {
if namespacedName, ok := kfdefInstances[owner.UID]; ok {
instance := &kfdefappskubefloworgv1.KfDef{}
err := r.Client.Get(context.TODO(), namespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
// KfDef CR may have been deleted
return nil
}
} else if instance.GetDeletionTimestamp() != nil {
// KfDef is being deleted
return nil
}
r.Log.Info("Watch a change for Kubeflow resource", "instance", a.GetName(), "namespace", a.GetNamespace())
return []reconcile.Request{{NamespacedName: namespacedName}}
}
} else if instance.GetDeletionTimestamp() != nil {
// KfDef is being deleted
return nil
}
r.Log.Info("Watch a change for Kubeflow resource", "instance", a.GetName(), "namespace", a.GetNamespace())
return []reconcile.Request{{NamespacedName: namespacedName}}
} else if a.GetObjectKind().GroupVersionKind().Kind == "ConfigMap" {
labels := a.GetLabels()
if val, ok := labels[deleteConfigMapLabel]; ok {
if val == "true" {
for k := range kfdefInstances {
kfdefCr := strings.Split(k, ".")
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: kfdefCr[0], Namespace: kfdefCr[1]}}}
} else if a.GetObjectKind().GroupVersionKind().Kind == "ConfigMap" {
labels := a.GetLabels()
if val, ok := labels[deleteConfigMapLabel]; ok {
if val == "true" {
for _, v := range kfdefInstances {
return []reconcile.Request{{NamespacedName: v}}
}
}
}
}
Expand Down Expand Up @@ -397,24 +382,33 @@ var ownedResourcePredicates = predicate.Funcs{
if err != nil {
return false
}
// if this object has an owner, let the owner handle the appropriate recovery
if len(object.GetOwnerReferences()) > 0 {
return false
for _, owner := range object.GetOwnerReferences() {
// if owner is KfDef, reconcile function needs to handle the delete.
if owner.Kind == string(kftypesv3.KFDEF) {
return true
}
}
}
return true
return false

},
UpdateFunc: func(e event.UpdateEvent) bool {
// handle update events
object, err := meta.Accessor(e.ObjectOld)
if err != nil {
return false
}
// if this object has an owner, let the owner handle the appropriate recovery
if len(object.GetOwnerReferences()) > 0 {
return false
for _, owner := range object.GetOwnerReferences() {
// if owner is KfDef, reconcile function needs to handle the update.
if owner.Kind == string(kftypesv3.KFDEF) {
return true
}
}
}
// TODO: Add update log message when plugin is integrated. We need to only log events for the resources with 'configurable' label
return true
return false

},
}

Expand All @@ -431,19 +425,6 @@ func kfApply(instance *kfdefappskubefloworgv1.KfDef) error {
return err
}

// kfDelete is equivalent of kfctl delete
func kfDelete(instance *kfdefappskubefloworgv1.KfDef) error {
kfdefLog.Info("Uninstall Kubeflow.", "KubeFlow.Namespace", instance.Namespace)
kfApp, err := kfLoadConfig(instance, "delete")
if err != nil {
kfdefLog.Error(err, "Failed to load KfApp")
return err
}
// Delete kfApp.
err = kfApp.Delete(kftypesv3.K8S)
return err
}

func kfLoadConfig(instance *kfdefappskubefloworgv1.KfDef, action string) (kftypesv3.KfApp, error) {
// Define kfApp
kfdefBytes, _ := yaml.Marshal(instance)
Expand All @@ -462,28 +443,6 @@ func kfLoadConfig(instance *kfdefappskubefloworgv1.KfDef, action string) (kftype
return nil, err
}

if action == "apply" {
// Indicate to add annotation to the top level resources
setAnnotationAnn := strings.Join([]string{kfutils.KfDefAnnotation, kfutils.SetAnnotation}, "/")
setAnnotations(configFilePath, map[string]string{
setAnnotationAnn: "true",
})
}

if action == "delete" {
// Enable force delete since inClusterConfig has no ./kube/config file to pass the delete safety check.
forceDeleteAnn := strings.Join([]string{kfutils.KfDefAnnotation, kfutils.ForceDelete}, "/")
setAnnotations(configFilePath, map[string]string{
forceDeleteAnn: "true",
})

// Indicate the Kubeflow is installed by the operator
byOperatorAnn := strings.Join([]string{kfutils.KfDefAnnotation, kfutils.InstallByOperator}, "/")
setAnnotations(configFilePath, map[string]string{
byOperatorAnn: "true",
})
}

kfApp, err := coordinator.NewLoadKfAppFromURI(configFilePath)
if err != nil {
kfdefLog.Error(err, "failed to build kfApp from URI", "uri", configFilePath)
Expand All @@ -493,22 +452,6 @@ func kfLoadConfig(instance *kfdefappskubefloworgv1.KfDef, action string) (kftype
return kfApp, nil
}

func setAnnotations(configPath string, annotations map[string]string) error {
config, err := kfloaders.LoadConfigFromURI(configPath)
if err != nil {
return err
}
anns := config.GetAnnotations()
if anns == nil {
anns = map[string]string{}
}
for ann, val := range annotations {
anns[ann] = val
}
config.SetAnnotations(anns)
return kfloaders.WriteConfigToFile(*config)
}

// getClusterServiceVersion retries the clusterserviceversions available in the operator namespace.
func getClusterServiceVersion(cfg *rest.Config, watchNameSpace string) (*ofapi.ClusterServiceVersion, error) {

Expand Down
Loading