Skip to content

Commit

Permalink
Implementing the AggregateStatus hook
Browse files Browse the repository at this point in the history
Signed-off-by: Xinzhao Xu <[email protected]>
  • Loading branch information
iawia002 committed Jan 18, 2022
1 parent 5732ba2 commit b83d53e
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
webhooks:
- name: workloads.example.com
rules:
- operations: [ "InterpretReplica","ReviseReplica","Retain" ]
- operations: [ "InterpretReplica","ReviseReplica","Retain","AggregateStatus" ]
apiGroups: [ "workload.example.io" ]
apiVersions: [ "v1alpha1" ]
kinds: [ "Workload" ]
Expand Down
23 changes: 23 additions & 0 deletions examples/customresourceinterpreter/webhook/app/workloadwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func (e *workloadInterpreter) Handle(ctx context.Context, req interpreter.Reques
return e.responseWithExploreReviseReplica(workload, req)
case configv1alpha1.InterpreterOperationRetain:
return e.responseWithExploreRetaining(workload, req)
case configv1alpha1.InterpreterOperationAggregateStatus:
return e.responseWithExploreAggregateStatus(workload, req)
default:
return interpreter.Errored(http.StatusBadRequest, fmt.Errorf("wrong request operation type: %s", req.Operation))
}
Expand Down Expand Up @@ -86,3 +88,24 @@ func (e *workloadInterpreter) responseWithExploreRetaining(desiredWorkload *work
}
return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}

func (e *workloadInterpreter) responseWithExploreAggregateStatus(workload *workloadv1alpha1.Workload, req interpreter.Request) interpreter.Response {
wantedWorkload := workload.DeepCopy()
var readyReplicas int32
for _, item := range req.AggregatedStatus {
if item.Status == nil {
continue
}
status := &workloadv1alpha1.WorkloadStatus{}
if err := json.Unmarshal(item.Status.Raw, status); err != nil {
return interpreter.Errored(http.StatusInternalServerError, err)
}
readyReplicas += status.ReadyReplicas
}
wantedWorkload.Status.ReadyReplicas = readyReplicas
marshaledBytes, err := json.Marshal(wantedWorkload)
if err != nil {
return interpreter.Errored(http.StatusInternalServerError, err)
}
return interpreter.PatchResponseFromRaw(req.Object.Raw, marshaledBytes)
}
3 changes: 1 addition & 2 deletions pkg/apis/config/v1alpha1/interpretercontext_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)

Expand Down Expand Up @@ -64,7 +63,7 @@ type ResourceInterpreterRequest struct {

// AggregatedStatus represents status list of the resource running in each member cluster.
// +optional
AggregatedStatus []workv1alpha1.AggregatedStatusItem `json:"aggregatedStatus,omitempty"`
AggregatedStatus []workv1alpha2.AggregatedStatusItem `json:"aggregatedStatus,omitempty"`
}

// ResourceInterpreterResponse describes an interpreter response.
Expand Down
3 changes: 1 addition & 2 deletions pkg/apis/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

223 changes: 0 additions & 223 deletions pkg/detector/aggregate_status.go

This file was deleted.

43 changes: 32 additions & 11 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package detector
import (
"context"
"fmt"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -1004,6 +1005,7 @@ func (d *ResourceDetector) OnResourceBindingUpdate(_, newObj interface{}) {
// ReconcileResourceBinding handles ResourceBinding object changes.
// For each ResourceBinding changes, we will try to calculate the summary status and update to original object
// that the ResourceBinding refer to.
//nolint:gocyclo
func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
ckey, ok := key.(keys.ClusterWideKey)
if !ok { // should not happen
Expand All @@ -1026,19 +1028,38 @@ func (d *ResourceDetector) ReconcileResourceBinding(key util.QueueKey) error {
}

klog.Infof("Reconciling resource binding(%s/%s)", binding.Namespace, binding.Name)
switch binding.Spec.Resource.Kind {
case util.DeploymentKind:
return d.AggregateDeploymentStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.ServiceKind:
return d.AggregateServiceStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.IngressKind:
return d.AggregateIngressStatus(binding.Spec.Resource, binding.Status.AggregatedStatus)
case util.JobKind:
return d.AggregateJobStatus(binding.Spec.Resource, binding.Status.AggregatedStatus, binding.Spec.Clusters)
default:
// Unsupported resource type.
resource := binding.Spec.Resource
gvr, err := restmapper.GetGroupVersionResource(
d.RESTMapper, schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind),
)
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resource.APIVersion, resource.Kind, err)
return err
}
obj, err := d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).Get(context.TODO(), resource.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}

if !d.ResourceInterpreter.HookEnabled(obj, configv1alpha1.InterpreterOperationAggregateStatus) {
return nil
}
newObj, err := d.ResourceInterpreter.AggregateStatus(obj, binding.Status.AggregatedStatus)
if err != nil {
klog.Errorf("AggregateStatus for resource(%s/%s/%s) failed: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
if reflect.DeepEqual(obj, newObj) {
klog.V(3).Infof("ignore update resource(%s/%s/%s) status as up to date", resource.Kind, resource.Namespace, resource.Name)
return nil
}

if _, err = d.DynamicClient.Resource(gvr).Namespace(resource.Namespace).UpdateStatus(context.TODO(), newObj, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update resource(%s/%s/%s), Error: %v", resource.Kind, resource.Namespace, resource.Name, err)
return err
}
return nil
}

// OnClusterResourceBindingAdd handles object add event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
)

Expand All @@ -15,7 +14,7 @@ type RequestAttributes struct {
Object *unstructured.Unstructured
ObservedObj *unstructured.Unstructured
ReplicasSet int32
AggregatedStatus []workv1alpha1.AggregatedStatusItem
AggregatedStatus []workv1alpha2.AggregatedStatusItem
}

// ResponseAttributes contains the attributes that response by the webhook.
Expand Down
Loading

0 comments on commit b83d53e

Please sign in to comment.