diff --git a/pkg/controller/integration/integration_controller.go b/pkg/controller/integration/integration_controller.go index 729db0ca35..ae6d4c515c 100644 --- a/pkg/controller/integration/integration_controller.go +++ b/pkg/controller/integration/integration_controller.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/builder" ctrl "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -205,9 +204,9 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error { return requests })). // Watch for the owned Deployments - Owns(&appsv1.Deployment{}). + Owns(&appsv1.Deployment{}, builder.WithPredicates(StatusChangedPredicate{})). // Watch for the owned CronJobs - Owns(&batchv1beta1.CronJob{}). + Owns(&batchv1beta1.CronJob{}, builder.WithPredicates(StatusChangedPredicate{})). // Watch for the Integration Pods Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(func(a ctrl.Object) []reconcile.Request { @@ -236,7 +235,7 @@ func add(mgr manager.Manager, c client.Client, r reconcile.Reconciler) error { if ok, err = kubernetes.CheckPermission(ctx, c, serving.GroupName, "services", platform.GetOperatorWatchNamespace(), "", "watch"); err != nil { return err } else if ok { - b.Owns(&servingv1.Service{}) + b.Owns(&servingv1.Service{}, builder.WithPredicates(StatusChangedPredicate{})) } } diff --git a/pkg/controller/integration/predicate.go b/pkg/controller/integration/predicate.go new file mode 100644 index 0000000000..79d61556a9 --- /dev/null +++ b/pkg/controller/integration/predicate.go @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "reflect" + + "k8s.io/apimachinery/pkg/api/equality" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// StatusChangedPredicate implements a generic update predicate function on status change. +type StatusChangedPredicate struct { + predicate.Funcs +} + +// Update implements default UpdateEvent filter for validating status change. +func (StatusChangedPredicate) Update(e event.UpdateEvent) bool { + if e.ObjectOld == nil { + Log.Error(nil, "Update event has no old object to update", "event", e) + return false + } + if e.ObjectNew == nil { + Log.Error(nil, "Update event has no new object to update", "event", e) + return false + } + + s1 := reflect.ValueOf(e.ObjectOld).Elem().FieldByName("Status") + if !s1.IsValid() { + Log.Error(nil, "Update event old object has no Status field", "event", e) + return false + } + + s2 := reflect.ValueOf(e.ObjectNew).Elem().FieldByName("Status") + if !s2.IsValid() { + Log.Error(nil, "Update event new object has no Status field", "event", e) + return false + } + + return !equality.Semantic.DeepDerivative(s1.Interface(), s2.Interface()) +}