Skip to content

Commit

Permalink
Implementing the AggregateStatus hook
Browse files Browse the repository at this point in the history
Signed-off-by: Xinzhao Xu <[email protected]>
  • Loading branch information
iawia002 committed Jan 17, 2022
1 parent 361978a commit 81c1c24
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
webhooks:
- name: workloads.example.com
rules:
- operations: [ "InterpretReplica","ReviseReplica","Retain" ]
- operations: [ "InterpretReplica","ReviseReplica","Retain","AggregateStatus" ]
apiGroups: [ "workload.example.io" ]
apiVersions: [ "v1alpha1" ]
kinds: [ "Workload" ]
Expand Down
20 changes: 20 additions & 0 deletions examples/customresourceinterpreter/webhook/app/workloadwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (e *workloadInterpreter) Handle(ctx context.Context, req interpreter.Reques
return e.responseWithExploreReviseReplica(workload, req)
case configv1alpha1.InterpreterOperationRetain:
return e.responseWithExploreRetaining(workload, req)
case configv1alpha1.InterpreterOperationAggregateStatus:
return e.responseWithExploreAggregateStatus(workload, req)
default:
return interpreter.Errored(http.StatusBadRequest, fmt.Errorf("wrong request operation type: %s", req.Operation))
}
Expand Down Expand Up @@ -86,3 +88,21 @@ func (e *workloadInterpreter) responseWithExploreRetaining(desiredWorkload *work
}
return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}

func (e *workloadInterpreter) responseWithExploreAggregateStatus(workload *workloadv1alpha1.Workload, req interpreter.Request) interpreter.Response {
wantedWorkload := workload.DeepCopy()
var readyReplicas int32
for _, item := range req.AggregatedStatus {
// NOTE: this is just an example, you need to change the method of updating status according to the actual situation
if !item.Applied {
continue
}
readyReplicas += 1
}
wantedWorkload.Status.ReadyReplicas = readyReplicas
marshaledBytes, err := json.Marshal(wantedWorkload)
if err != nil {
return interpreter.Errored(http.StatusInternalServerError, err)
}
return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}
3 changes: 1 addition & 2 deletions pkg/apis/config/v1alpha1/interpretercontext_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)

Expand Down Expand Up @@ -64,7 +63,7 @@ type ResourceInterpreterRequest struct {

// AggregatedStatus represents status list of the resource running in each member cluster.
// +optional
AggregatedStatus []workv1alpha1.AggregatedStatusItem `json:"aggregatedStatus,omitempty"`
AggregatedStatus []workv1alpha2.AggregatedStatusItem `json:"aggregatedStatus,omitempty"`
}

// ResourceInterpreterResponse describes an interpreter response.
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

223 changes: 0 additions & 223 deletions pkg/detector/aggregate_status.go

This file was deleted.

67 changes: 56 additions & 11 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package detector
import (
"context"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -1004,6 +1006,7 @@ func (d *ResourceDetector) OnResourceBindingUpdate(_, newObj interface{}) {
// ReconcileResourceBinding handles ResourceBinding object changes.
// For each ResourceBinding changes, we will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
//nolint:gocyclo
func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
ckey, ok := key.(keys.ClusterWideKey)
if !ok { // should not happen
Expand All @@ -1026,19 +1029,61 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
}

klog.Infof("Reconciling resource binding(%s/%s)", binding.Namespace, binding.Name)
switch binding.Spec.Resource.Kind {
case util.DeploymentKind:
return d.AggregateDeploymentStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.ServiceKind:
return d.AggregateServiceStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.IngressKind:
return d.AggregateIngressStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.JobKind:
return d.AggregateJobStatus(binding.Spec.Resource, binding.Status.AggregatedStatus, binding.Spec.Clusters)
default:
// Unsupported resource type.
resource := binding.Spec.Resource
gvr, err := restmapper.GetGroupVersionResource(
d.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind),
)
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err)
return err
}
obj, err := d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}

if !d.ResourceInterpreter.HookEnabled(obj, configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := d.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus)
if err != nil {
klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
newStatus, _, err := unstructured.NestedFieldNoCopy(newObj.Object, "status")
if err != nil {
return err
}
oldStatus, _, err := unstructured.NestedFieldNoCopy(obj.Object, "status")
if err != nil {
return err
}
if reflect.DeepEqual(newStatus, oldStatus) {
klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name)
return nil
}

if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err = unstructured.SetNestedField(obj.Object, newStatus, "status"); err != nil {
return err
}
_, updateErr := d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), obj, metav1.UpdateOptions{})
if updateErr == nil {
return nil
}

obj, err = d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{})
if err != nil {
return err
}

return updateErr
}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
return nil
}

// OnClusterResourceBindingAdd handles object add event.
Expand Down
Loading

0 comments on commit 81c1c24

Please sign in to comment.