From b83d53e5c71234945c9a8fcefcb8ec0547a25a67 Mon Sep 17 00:00:00 2001 From: Xinzhao Xu Date: Wed, 12 Jan 2022 17:20:22 +0800 Subject: [PATCH] Implementing the AggregateStatus hook Signed-off-by: Xinzhao Xu --- .../webhook-configuration.yaml | 2 +- .../webhook/app/workloadwebhook.go | 23 ++ .../v1alpha1/interpretercontext_types.go | 3 +- .../config/v1alpha1/zz_generated.deepcopy.go | 3 +- pkg/detector/aggregate_status.go | 223 ------------------ pkg/detector/detector.go | 43 +++- .../webhook/attributes.go | 3 +- .../defaultinterpreter/aggregatestatus.go | 176 ++++++++++++++ .../defaultinterpreter/default.go | 28 ++- pkg/resourceinterpreter/interpreter.go | 22 ++ pkg/util/helper/job.go | 6 +- pkg/util/helper/unstructured.go | 30 +++ pkg/webhook/configuration/validating.go | 1 + 13 files changed, 312 insertions(+), 251 deletions(-) delete mode 100644 pkg/detector/aggregate_status.go create mode 100644 pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go diff --git a/examples/customresourceinterpreter/webhook-configuration.yaml b/examples/customresourceinterpreter/webhook-configuration.yaml index 87a3ccb0dd6f..7a4d8478fc39 100644 --- a/examples/customresourceinterpreter/webhook-configuration.yaml +++ b/examples/customresourceinterpreter/webhook-configuration.yaml @@ -6,7 +6,7 @@ metadata: webhooks: - name: workloads.example.com rules: - - operations: [ "InterpretReplica","ReviseReplica","Retain" ] + - operations: [ "InterpretReplica","ReviseReplica","Retain","AggregateStatus" ] apiGroups: [ "workload.example.io" ] apiVersions: [ "v1alpha1" ] kinds: [ "Workload" ] diff --git a/examples/customresourceinterpreter/webhook/app/workloadwebhook.go b/examples/customresourceinterpreter/webhook/app/workloadwebhook.go index 59544a95fdce..eb2d8c929cb4 100644 --- a/examples/customresourceinterpreter/webhook/app/workloadwebhook.go +++ b/examples/customresourceinterpreter/webhook/app/workloadwebhook.go @@ -39,6 +39,8 @@ func (e *workloadInterpreter) Handle(ctx context.Context, req interpreter.Reques return e.responseWithExploreReviseReplica(workload, req) case configv1alpha1.InterpreterOperationRetain: return e.responseWithExploreRetaining(workload, req) + case configv1alpha1.InterpreterOperationAggregateStatus: + return e.responseWithExploreAggregateStatus(workload, req) default: return interpreter.Errored(http.StatusBadRequest, fmt.Errorf("wrong request operation type: %s", req.Operation)) } @@ -86,3 +88,24 @@ func (e *workloadInterpreter) responseWithExploreRetaining(desiredWorkload *work } return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes) } + +func (e *workloadInterpreter) responseWithExploreAggregateStatus(workload *workloadv1alpha1.Workload, req interpreter.Request) interpreter.Response { + wantedWorkload := workload.DeepCopy() + var readyReplicas int32 + for _, item := range req.AggregatedStatus { + if item.Status == nil { + continue + } + status := &workloadv1alpha1.WorkloadStatus{} + if err := json.Unmarshal(item.Status.Raw, status); err != nil { + return interpreter.Errored(http.StatusInternalServerError, err) + } + readyReplicas += status.ReadyReplicas + } + wantedWorkload.Status.ReadyReplicas = readyReplicas + marshaledBytes, err := json.Marshal(wantedWorkload) + if err != nil { + return interpreter.Errored(http.StatusInternalServerError, err) + } + return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes) +} diff --git a/pkg/apis/config/v1alpha1/interpretercontext_types.go b/pkg/apis/config/v1alpha1/interpretercontext_types.go index eb280dc96cfb..0443f6bc1899 100644 --- a/pkg/apis/config/v1alpha1/interpretercontext_types.go +++ b/pkg/apis/config/v1alpha1/interpretercontext_types.go @@ -5,7 +5,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" ) @@ -64,7 +63,7 @@ type ResourceInterpreterRequest struct { // AggregatedStatus represents status list of the resource running in each member cluster. // +optional - AggregatedStatus []workv1alpha1.AggregatedStatusItem `json:"aggregatedStatus,omitempty"` + AggregatedStatus []workv1alpha2.AggregatedStatusItem `json:"aggregatedStatus,omitempty"` } // ResourceInterpreterResponse describes an interpreter response. diff --git a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go index e8f1543aaebf..cb19b6e96f63 100644 --- a/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/config/v1alpha1/zz_generated.deepcopy.go @@ -5,7 +5,6 @@ package v1alpha1 import ( - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" v1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -94,7 +93,7 @@ func (in *ResourceInterpreterRequest) DeepCopyInto(out *ResourceInterpreterReque } if in.AggregatedStatus != nil { in, out := &in.AggregatedStatus, &out.AggregatedStatus - *out = make([]workv1alpha1.AggregatedStatusItem, len(*in)) + *out = make([]v1alpha2.AggregatedStatusItem, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/pkg/detector/aggregate_status.go b/pkg/detector/aggregate_status.go deleted file mode 100644 index c6acc8757633..000000000000 --- a/pkg/detector/aggregate_status.go +++ /dev/null @@ -1,223 +0,0 @@ -package detector - -import ( - "context" - "encoding/json" - "reflect" - - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - extensionsv1beta1 "k8s.io/api/extensions/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/util/retry" - "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" - - workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" - "github.com/karmada-io/karmada/pkg/util/helper" -) - -// AggregateDeploymentStatus summarize deployment status and update to original objects. -func (d *ResourceDetector) AggregateDeploymentStatus(objRef workv1alpha2.ObjectReference, status []workv1alpha2.AggregatedStatusItem) error { - if objRef.APIVersion != "apps/v1" { - return nil - } - - obj := &appsv1.Deployment{} - if err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: objRef.Namespace, Name: objRef.Name}, obj); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - klog.Errorf("Failed to get deployment(%s/%s): %v", objRef.Namespace, objRef.Name, err) - return err - } - - oldStatus := &obj.Status - newStatus := &appsv1.DeploymentStatus{} - for _, item := range status { - if item.Status == nil { - continue - } - temp := &appsv1.DeploymentStatus{} - if err := json.Unmarshal(item.Status.Raw, temp); err != nil { - klog.Errorf("Failed to unmarshal status") - return err - } - klog.V(3).Infof("Grab deployment(%s/%s) status from cluster(%s), replicas: %d, ready: %d, updated: %d, available: %d, unavailable: %d", - obj.Namespace, obj.Name, item.ClusterName, temp.Replicas, temp.ReadyReplicas, temp.UpdatedReplicas, temp.AvailableReplicas, temp.UnavailableReplicas) - newStatus.ObservedGeneration = obj.Generation - newStatus.Replicas += temp.Replicas - newStatus.ReadyReplicas += temp.ReadyReplicas - newStatus.UpdatedReplicas += temp.UpdatedReplicas - newStatus.AvailableReplicas += temp.AvailableReplicas - newStatus.UnavailableReplicas += temp.UnavailableReplicas - } - - if oldStatus.ObservedGeneration == newStatus.ObservedGeneration && - oldStatus.Replicas == newStatus.Replicas && - oldStatus.ReadyReplicas == newStatus.ReadyReplicas && - oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas && - oldStatus.AvailableReplicas == newStatus.AvailableReplicas && - oldStatus.UnavailableReplicas == newStatus.UnavailableReplicas { - klog.V(3).Infof("ignore update deployment(%s/%s) status as up to date", obj.Namespace, obj.Name) - return nil - } - - oldStatus.ObservedGeneration = newStatus.ObservedGeneration - oldStatus.Replicas = newStatus.Replicas - oldStatus.ReadyReplicas = newStatus.ReadyReplicas - oldStatus.UpdatedReplicas = newStatus.UpdatedReplicas - oldStatus.AvailableReplicas = newStatus.AvailableReplicas - oldStatus.UnavailableReplicas = newStatus.UnavailableReplicas - - if err := d.Client.Status().Update(context.TODO(), obj); err != nil { - klog.Errorf("Failed to update deployment(%s/%s) status: %v", objRef.Namespace, objRef.Name, err) - return err - } - - return nil -} - -// AggregateServiceStatus summarize service status and update to original objects. -func (d *ResourceDetector) AggregateServiceStatus(objRef workv1alpha2.ObjectReference, status []workv1alpha2.AggregatedStatusItem) error { - if objRef.APIVersion != "v1" { - return nil - } - - obj := &corev1.Service{} - if err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: objRef.Namespace, Name: objRef.Name}, obj); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - klog.Errorf("Failed to get service(%s/%s): %v", objRef.Namespace, objRef.Name, err) - return err - } - - if obj.Spec.Type != corev1.ServiceTypeLoadBalancer { - return nil - } - - // If service type is of type LoadBalancer, collect the status.loadBalancer.ingress - newStatus := &corev1.ServiceStatus{} - for _, item := range status { - if item.Status == nil { - continue - } - temp := &corev1.ServiceStatus{} - if err := json.Unmarshal(item.Status.Raw, temp); err != nil { - klog.Errorf("Failed to unmarshal status of service(%s/%s): %v", objRef.Namespace, objRef.Name, err) - return err - } - klog.V(3).Infof("Grab service(%s/%s) status from cluster(%s), loadBalancer status: %v", - obj.Namespace, obj.Name, item.ClusterName, temp.LoadBalancer) - - // Set cluster name as Hostname by default to indicate the status is collected from which member cluster. - for i := range temp.LoadBalancer.Ingress { - if temp.LoadBalancer.Ingress[i].Hostname == "" { - temp.LoadBalancer.Ingress[i].Hostname = item.ClusterName - } - } - - newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...) - } - - if reflect.DeepEqual(obj.Status, *newStatus) { - klog.V(3).Infof("ignore update service(%s/%s) status as up to date", obj.Namespace, obj.Name) - return nil - } - - obj.Status = *newStatus - if err := d.Client.Status().Update(context.TODO(), obj); err != nil { - klog.Errorf("Failed to update service(%s/%s) status: %v", objRef.Namespace, objRef.Name, err) - return err - } - - return nil -} - -// AggregateIngressStatus summarize ingress status and update to original objects. -func (d *ResourceDetector) AggregateIngressStatus(objRef workv1alpha2.ObjectReference, status []workv1alpha2.AggregatedStatusItem) error { - if objRef.APIVersion != "extensions/v1beta1" { - return nil - } - - obj := &extensionsv1beta1.Ingress{} - if err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: objRef.Namespace, Name: objRef.Name}, obj); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - klog.Errorf("Failed to get ingress(%s/%s): %v", objRef.Namespace, objRef.Name, err) - return err - } - - newStatus := &extensionsv1beta1.IngressStatus{} - for _, item := range status { - if item.Status == nil { - continue - } - temp := &extensionsv1beta1.IngressStatus{} - if err := json.Unmarshal(item.Status.Raw, temp); err != nil { - klog.Errorf("Failed to unmarshal status ingress(%s/%s): %v", obj.Namespace, obj.Name, err) - return err - } - klog.V(3).Infof("Grab ingress(%s/%s) status from cluster(%s), loadBalancer status: %v", - obj.Namespace, obj.Name, item.ClusterName, temp.LoadBalancer) - - // Set cluster name as Hostname by default to indicate the status is collected from which member cluster. - for i := range temp.LoadBalancer.Ingress { - if temp.LoadBalancer.Ingress[i].Hostname == "" { - temp.LoadBalancer.Ingress[i].Hostname = item.ClusterName - } - } - - newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...) - } - - if reflect.DeepEqual(obj.Status, *newStatus) { - klog.V(3).Infof("ignore update ingress(%s/%s) status as up to date", obj.Namespace, obj.Name) - return nil - } - - obj.Status = *newStatus - if err := d.Client.Status().Update(context.TODO(), obj); err != nil { - klog.Errorf("Failed to update ingress(%s/%s) status: %v", objRef.Namespace, objRef.Name, err) - return err - } - - return nil -} - -// AggregateJobStatus summarize job status and update to original objects. -func (d *ResourceDetector) AggregateJobStatus(objRef workv1alpha2.ObjectReference, status []workv1alpha2.AggregatedStatusItem, clusters []workv1alpha2.TargetCluster) error { - if objRef.APIVersion != "batch/v1" { - return nil - } - - obj := &batchv1.Job{} - if err := d.Client.Get(context.TODO(), client.ObjectKey{Namespace: objRef.Namespace, Name: objRef.Name}, obj); err != nil { - if apierrors.IsNotFound(err) { - return nil - } - klog.Errorf("Failed to get service(%s/%s): %v", objRef.Namespace, objRef.Name, err) - return err - } - - newStatus, err := helper.ParsingJobStatus(obj, status, clusters) - if err != nil { - return err - } - - if reflect.DeepEqual(obj.Status, *newStatus) { - klog.V(3).Infof("ignore update job(%s/%s) status as up to date", obj.Namespace, obj.Name) - return nil - } - - return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { - if err = d.Client.Get(context.TODO(), client.ObjectKey{Namespace: obj.Namespace, Name: obj.Name}, obj); err != nil { - return err - } - obj.Status = *newStatus - return d.Client.Status().Update(context.TODO(), obj) - }) -} diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 4ade0d047ed9..0b750cedca17 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -3,6 +3,7 @@ package detector import ( "context" "fmt" + "reflect" "sync" "time" @@ -1004,6 +1005,7 @@ func (d *ResourceDetector) OnResourceBindingUpdate(_, newObj interface{}) { // ReconcileResourceBinding handles ResourceBinding object changes. // For each ResourceBinding changes, we will try to calculate the summary status and update to original object // that the ResourceBinding refer to. +//nolint:gocyclo func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { ckey, ok := key.(keys.ClusterWideKey) if !ok { // should not happen @@ -1026,19 +1028,38 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error { } klog.Infof("Reconciling resource binding(%s/%s)", binding.Namespace, binding.Name) - switch binding.Spec.Resource.Kind { - case util.DeploymentKind: - return d.AggregateDeploymentStatus(binding.Spec.Resource, binding.Status.AggregatedStatus) - case util.ServiceKind: - return d.AggregateServiceStatus(binding.Spec.Resource, binding.Status.AggregatedStatus) - case util.IngressKind: - return d.AggregateIngressStatus(binding.Spec.Resource, binding.Status.AggregatedStatus) - case util.JobKind: - return d.AggregateJobStatus(binding.Spec.Resource, binding.Status.AggregatedStatus, binding.Spec.Clusters) - default: - // Unsupported resource type. + resource := binding.Spec.Resource + gvr, err := restmapper.GetGroupVersionResource( + d.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind), + ) + if err != nil { + klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err) + return err + } + obj, err := d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + + if !d.ResourceInterpreter.HookEnabled(obj, configv1alpha1.InterpreterOperationAggregateStatus) { + return nil + } + newObj, err := d.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus) + if err != nil { + klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + if reflect.DeepEqual(obj, newObj) { + klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name) return nil } + + if _, err = d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil { + klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err) + return err + } + return nil } // OnClusterResourceBindingAdd handles object add event. diff --git a/pkg/resourceinterpreter/customizedinterpreter/webhook/attributes.go b/pkg/resourceinterpreter/customizedinterpreter/webhook/attributes.go index d4339e25620d..75fec949de33 100644 --- a/pkg/resourceinterpreter/customizedinterpreter/webhook/attributes.go +++ b/pkg/resourceinterpreter/customizedinterpreter/webhook/attributes.go @@ -5,7 +5,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" ) @@ -15,7 +14,7 @@ type RequestAttributes struct { Object *unstructured.Unstructured ObservedObj *unstructured.Unstructured ReplicasSet int32 - AggregatedStatus []workv1alpha1.AggregatedStatusItem + AggregatedStatus []workv1alpha2.AggregatedStatusItem } // ResponseAttributes contains the attributes that response by the webhook. diff --git a/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go new file mode 100644 index 000000000000..b2764ae36a2a --- /dev/null +++ b/pkg/resourceinterpreter/defaultinterpreter/aggregatestatus.go @@ -0,0 +1,176 @@ +package defaultinterpreter + +import ( + "encoding/json" + "reflect" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" +) + +type aggregateStatusInterpreter func(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) + +func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggregateStatusInterpreter { + s := make(map[schema.GroupVersionKind]aggregateStatusInterpreter) + s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = aggregateDeploymentStatus + s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = aggregateServiceStatus + s[extensionsv1beta1.SchemeGroupVersion.WithKind(util.IngressKind)] = aggregateIngressStatus + s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = aggregateJobStatus + return s +} + +func aggregateDeploymentStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + deploy, err := helper.ConvertToDeployment(object) + if err != nil { + return nil, err + } + + oldStatus := &deploy.Status + newStatus := &appsv1.DeploymentStatus{} + for _, item := range aggregatedStatusItems { + if item.Status == nil { + continue + } + temp := &appsv1.DeploymentStatus{} + if err = json.Unmarshal(item.Status.Raw, temp); err != nil { + return nil, err + } + klog.V(3).Infof("Grab deployment(%s/%s) status from cluster(%s), replicas: %d, ready: %d, updated: %d, available: %d, unavailable: %d", + deploy.Namespace, deploy.Name, item.ClusterName, temp.Replicas, temp.ReadyReplicas, temp.UpdatedReplicas, temp.AvailableReplicas, temp.UnavailableReplicas) + newStatus.ObservedGeneration = deploy.Generation + newStatus.Replicas += temp.Replicas + newStatus.ReadyReplicas += temp.ReadyReplicas + newStatus.UpdatedReplicas += temp.UpdatedReplicas + newStatus.AvailableReplicas += temp.AvailableReplicas + newStatus.UnavailableReplicas += temp.UnavailableReplicas + } + + if oldStatus.ObservedGeneration == newStatus.ObservedGeneration && + oldStatus.Replicas == newStatus.Replicas && + oldStatus.ReadyReplicas == newStatus.ReadyReplicas && + oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas && + oldStatus.AvailableReplicas == newStatus.AvailableReplicas && + oldStatus.UnavailableReplicas == newStatus.UnavailableReplicas { + klog.V(3).Infof("ignore update deployment(%s/%s) status as up to date", deploy.Namespace, deploy.Name) + return object, nil + } + + oldStatus.ObservedGeneration = newStatus.ObservedGeneration + oldStatus.Replicas = newStatus.Replicas + oldStatus.ReadyReplicas = newStatus.ReadyReplicas + oldStatus.UpdatedReplicas = newStatus.UpdatedReplicas + oldStatus.AvailableReplicas = newStatus.AvailableReplicas + oldStatus.UnavailableReplicas = newStatus.UnavailableReplicas + + return helper.ToUnstructured(deploy) +} + +func aggregateServiceStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + service, err := helper.ConvertToService(object) + if err != nil { + return nil, err + } + + if service.Spec.Type != corev1.ServiceTypeLoadBalancer { + return object, nil + } + + // If service type is of type LoadBalancer, collect the status.loadBalancer.ingress + newStatus := &corev1.ServiceStatus{} + for _, item := range aggregatedStatusItems { + if item.Status == nil { + continue + } + temp := &corev1.ServiceStatus{} + if err := json.Unmarshal(item.Status.Raw, temp); err != nil { + klog.Errorf("Failed to unmarshal status of service(%s/%s): %v", service.Namespace, service.Name, err) + return nil, err + } + klog.V(3).Infof("Grab service(%s/%s) status from cluster(%s), loadBalancer status: %v", + service.Namespace, service.Name, item.ClusterName, temp.LoadBalancer) + + // Set cluster name as Hostname by default to indicate the status is collected from which member cluster. + for i := range temp.LoadBalancer.Ingress { + if temp.LoadBalancer.Ingress[i].Hostname == "" { + temp.LoadBalancer.Ingress[i].Hostname = item.ClusterName + } + } + + newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...) + } + + if reflect.DeepEqual(service.Status, *newStatus) { + klog.V(3).Infof("ignore update service(%s/%s) status as up to date", service.Namespace, service.Name) + return object, nil + } + + service.Status = *newStatus + return helper.ToUnstructured(service) +} + +func aggregateIngressStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + ingress, err := helper.ConvertToIngress(object) + if err != nil { + return nil, err + } + + newStatus := &extensionsv1beta1.IngressStatus{} + for _, item := range aggregatedStatusItems { + if item.Status == nil { + continue + } + temp := &extensionsv1beta1.IngressStatus{} + if err := json.Unmarshal(item.Status.Raw, temp); err != nil { + klog.Errorf("Failed to unmarshal status ingress(%s/%s): %v", ingress.Namespace, ingress.Name, err) + return nil, err + } + klog.V(3).Infof("Grab ingress(%s/%s) status from cluster(%s), loadBalancer status: %v", + ingress.Namespace, ingress.Name, item.ClusterName, temp.LoadBalancer) + + // Set cluster name as Hostname by default to indicate the status is collected from which member cluster. + for i := range temp.LoadBalancer.Ingress { + if temp.LoadBalancer.Ingress[i].Hostname == "" { + temp.LoadBalancer.Ingress[i].Hostname = item.ClusterName + } + } + + newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...) + } + + if reflect.DeepEqual(ingress.Status, *newStatus) { + klog.V(3).Infof("ignore update ingress(%s/%s) status as up to date", ingress.Namespace, ingress.Name) + return object, nil + } + + ingress.Status = *newStatus + return helper.ToUnstructured(ingress) +} + +func aggregateJobStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + job, err := helper.ConvertToJob(object) + if err != nil { + return nil, err + } + + newStatus, err := helper.ParsingJobStatus(job, aggregatedStatusItems) + if err != nil { + return nil, err + } + + if reflect.DeepEqual(job.Status, *newStatus) { + klog.V(3).Infof("ignore update job(%s/%s) status as up to date", job.Namespace, job.Name) + return object, nil + } + + job.Status = *newStatus + return helper.ToUnstructured(job) +} diff --git a/pkg/resourceinterpreter/defaultinterpreter/default.go b/pkg/resourceinterpreter/defaultinterpreter/default.go index 73b8aeab1242..e72cda6d289f 100644 --- a/pkg/resourceinterpreter/defaultinterpreter/default.go +++ b/pkg/resourceinterpreter/defaultinterpreter/default.go @@ -14,17 +14,19 @@ import ( // DefaultInterpreter contains all default operation interpreter factory // for interpreting common resource. type DefaultInterpreter struct { - replicaHandlers map[schema.GroupVersionKind]replicaInterpreter - reviseReplicaHandlers map[schema.GroupVersionKind]reviseReplicaInterpreter - retentionHandlers map[schema.GroupVersionKind]retentionInterpreter + replicaHandlers map[schema.GroupVersionKind]replicaInterpreter + reviseReplicaHandlers map[schema.GroupVersionKind]reviseReplicaInterpreter + retentionHandlers map[schema.GroupVersionKind]retentionInterpreter + aggregateStatusHandlers map[schema.GroupVersionKind]aggregateStatusInterpreter } // NewDefaultInterpreter return a new DefaultInterpreter. func NewDefaultInterpreter() *DefaultInterpreter { return &DefaultInterpreter{ - replicaHandlers: getAllDefaultReplicaInterpreter(), - reviseReplicaHandlers: getAllDefaultReviseReplicaInterpreter(), - retentionHandlers: getAllDefaultRetentionInterpreter(), + replicaHandlers: getAllDefaultReplicaInterpreter(), + reviseReplicaHandlers: getAllDefaultReviseReplicaInterpreter(), + retentionHandlers: getAllDefaultRetentionInterpreter(), + aggregateStatusHandlers: getAllDefaultAggregateStatusInterpreter(), } } @@ -43,6 +45,10 @@ func (e *DefaultInterpreter) HookEnabled(kind schema.GroupVersionKind, operation if _, exist := e.retentionHandlers[kind]; exist { return true } + case configv1alpha1.InterpreterOperationAggregateStatus: + if _, exist := e.aggregateStatusHandlers[kind]; exist { + return true + } // TODO(RainbowMango): more cases should be added here } @@ -75,6 +81,14 @@ func (e *DefaultInterpreter) Retain(desired *unstructured.Unstructured, observed if !exist { return nil, fmt.Errorf("default %s interpreter for %q not found", configv1alpha1.InterpreterOperationRetain, desired.GroupVersionKind()) } - return handler(desired, observed) } + +// AggregateStatus returns the objects that based on the 'object' but with status aggregated. +func (e *DefaultInterpreter) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + handler, exist := e.aggregateStatusHandlers[object.GroupVersionKind()] + if !exist { + return nil, fmt.Errorf("defalut %s interpreter for %q not found", configv1alpha1.InterpreterOperationAggregateStatus, object.GroupVersionKind()) + } + return handler(object, aggregatedStatusItems) +} diff --git a/pkg/resourceinterpreter/interpreter.go b/pkg/resourceinterpreter/interpreter.go index 4ab9d8723115..5e1d1d507bb7 100644 --- a/pkg/resourceinterpreter/interpreter.go +++ b/pkg/resourceinterpreter/interpreter.go @@ -31,6 +31,9 @@ type ResourceInterpreter interface { // Retain returns the objects that based on the "desired" object but with values retained from the "observed" object. Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error) + // AggregateStatus returns the objects that based on the 'object' but with status aggregated. + AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) + // other common method } @@ -133,3 +136,22 @@ func (i *customResourceInterpreterImpl) Retain(desired *unstructured.Unstructure return i.defaultInterpreter.Retain(desired, observed) } + +// AggregateStatus returns the objects that based on the 'object' but with status aggregated. +func (i *customResourceInterpreterImpl) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) { + klog.V(4).Infof("Begin to aggregate status for object: %v %s/%s.", object.GroupVersionKind(), object.GetNamespace(), object.GetName()) + + obj, hookEnabled, err := i.customizedInterpreter.Patch(context.TODO(), &webhook.RequestAttributes{ + Operation: configv1alpha1.InterpreterOperationAggregateStatus, + Object: object.DeepCopy(), + AggregatedStatus: aggregatedStatusItems, + }) + if err != nil { + return nil, err + } + if hookEnabled { + return obj, nil + } + + return i.defaultInterpreter.AggregateStatus(object, aggregatedStatusItems) +} diff --git a/pkg/util/helper/job.go b/pkg/util/helper/job.go index 7896d1332a42..7ad423c523b3 100644 --- a/pkg/util/helper/job.go +++ b/pkg/util/helper/job.go @@ -15,7 +15,7 @@ import ( // ParsingJobStatus generates new status of given 'AggregatedStatusItem'. //nolint:gocyclo -func ParsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusItem, clusters []workv1alpha2.TargetCluster) (*batchv1.JobStatus, error) { +func ParsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusItem) (*batchv1.JobStatus, error) { var jobFailed []string var startTime, completionTime *metav1.Time successfulJobs, completionJobs := 0, 0 @@ -67,7 +67,7 @@ func ParsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusIt }) } - if successfulJobs == len(clusters) { + if successfulJobs == len(status) { newStatus.Conditions = append(newStatus.Conditions, batchv1.JobCondition{ Type: batchv1.JobComplete, Status: corev1.ConditionTrue, @@ -81,7 +81,7 @@ func ParsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusIt if startTime != nil { newStatus.StartTime = startTime.DeepCopy() } - if completionTime != nil && completionJobs == len(clusters) { + if completionTime != nil && completionJobs == len(status) { newStatus.CompletionTime = completionTime.DeepCopy() } diff --git a/pkg/util/helper/unstructured.go b/pkg/util/helper/unstructured.go index 5785acbdaad9..f4993d79b08f 100644 --- a/pkg/util/helper/unstructured.go +++ b/pkg/util/helper/unstructured.go @@ -5,6 +5,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" + extensionsv1beta1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -134,6 +135,26 @@ func ConvertToResourceExploringWebhookConfiguration(obj *unstructured.Unstructur return typedObj, nil } +// ConvertToService converts a Service object from unstructured to typed. +func ConvertToService(obj *unstructured.Unstructured) (*corev1.Service, error) { + typedObj := &corev1.Service{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), typedObj); err != nil { + return nil, err + } + + return typedObj, nil +} + +// ConvertToIngress converts a Service object from unstructured to typed. +func ConvertToIngress(obj *unstructured.Unstructured) (*extensionsv1beta1.Ingress, error) { + typedObj := &extensionsv1beta1.Ingress{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), typedObj); err != nil { + return nil, err + } + + return typedObj, nil +} + // ApplyReplica applies the Replica value for the specific field. func ApplyReplica(workload *unstructured.Unstructured, desireReplica int64, field string) error { _, ok, err := unstructured.NestedInt64(workload.Object, util.SpecField, field) @@ -148,3 +169,12 @@ func ApplyReplica(workload *unstructured.Unstructured, desireReplica int64, fiel } return nil } + +// ToUnstructured converts a typed object to an unstructured object. +func ToUnstructured(obj interface{}) (*unstructured.Unstructured, error) { + uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, err + } + return &unstructured.Unstructured{Object: uncastObj}, nil +} diff --git a/pkg/webhook/configuration/validating.go b/pkg/webhook/configuration/validating.go index c3d76ca33c7f..51d4f7ef6268 100644 --- a/pkg/webhook/configuration/validating.go +++ b/pkg/webhook/configuration/validating.go @@ -67,6 +67,7 @@ var supportedInterpreterOperation = sets.NewString( string(configv1alpha1.InterpreterOperationInterpretReplica), string(configv1alpha1.InterpreterOperationReviseReplica), string(configv1alpha1.InterpreterOperationRetain), + string(configv1alpha1.InterpreterOperationAggregateStatus), ) var acceptedInterpreterContextVersions = []string{configv1alpha1.GroupVersion.Version}