Skip to content

Commit

Permalink
implement clusterStagedUpdateRun validation
Browse files Browse the repository at this point in the history
  • Loading branch information
jwtty committed Dec 16, 2024
1 parent 3bb2656 commit fd9a057
Show file tree
Hide file tree
Showing 7 changed files with 1,333 additions and 135 deletions.
21 changes: 17 additions & 4 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ var (
// errInitializedFailed is the error when the ClusterStagedUpdateRun fails to initialize.
// It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step.
errInitializedFailed = fmt.Errorf("%w: failed to initialize the clusterStagedUpdateRun", errStagedUpdatedAborted)

// stageUpdatingWaitTime is the time to wait before rechecking the stage update status.
// Put it as a variable for convenient testing.
stageUpdatingWaitTime = 60 * time.Second
)

// Reconciler reconciles a ClusterStagedUpdateRun object.
Expand Down Expand Up @@ -108,16 +112,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Check if the clusterStagedUpdateRun is finished.
finishedCond := meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1alpha1.StagedUpdateRunConditionSucceeded))
if condition.IsConditionStatusTrue(finishedCond, updateRun.Generation) || condition.IsConditionStatusFalse(finishedCond, updateRun.Generation) {
klog.V(2).InfoS("The clusterStagedUpdateRun is finished", "clusterStagedUpdateRun", runObjRef)
klog.V(2).InfoS("The clusterStagedUpdateRun is finished", "finishedSuccessfully", finishedCond.Status, "clusterStagedUpdateRun", runObjRef)
return runtime.Result{}, nil
}
// TODO(wantjian): validate the clusterStagedUpdateRun and generate the updatingStage etc.

// Validate the clusterStagedUpdateRun status to ensure the update can be continued and get the updating stage index and cluster indices.
if updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings, err = r.validate(ctx, &updateRun); err != nil {
// errStagedUpdatedAborted cannot be retried.
if errors.Is(err, errStagedUpdatedAborted) {
return runtime.Result{}, r.recordUpdateRunFailed(ctx, &updateRun, err.Error())
}
return runtime.Result{}, err
}
klog.V(2).InfoS("The clusterStagedUpdateRun is validated", "clusterStagedUpdateRun", runObjRef)
}

// TODO(wantjian): execute the clusterStagedUpdateRun.
// TODO(wantjian): execute the clusterStagedUpdateRun and fix the requeue time.
klog.V(2).InfoS("Executing the clusterStagedUpdateRun", "clusterStagedUpdateRun", runObjRef, "updatingStageIndex", updatingStageIndex,
"toBeUpdatedBindings count", len(toBeUpdatedBindings), "toBeDeletedBindings count", len(toBeDeletedBindings))
return runtime.Result{}, nil
return runtime.Result{RequeueAfter: stageUpdatingWaitTime}, nil
}

// handleDelete handles the deletion of the clusterStagedUpdateRun object.
Expand Down
80 changes: 59 additions & 21 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
timeout = time.Second * 10
// interval is the time to wait between retries for Eventually and Consistently
interval = time.Millisecond * 250
// duration is the time to duration to check for Consistently
duration = time.Second * 20

// numTargetClusters is the number of scheduled clusters
numTargetClusters = 10
Expand Down Expand Up @@ -107,8 +109,8 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
validateUpdateRunHasFinalizer(ctx, updateRun)

By("Updating the clusterStagedUpdateRun to failed")
startedcond := getTrueCondition(updateRun, string(placementv1alpha1.StagedUpdateRunConditionProgressing))
finishedcond := getFalseCondition(updateRun, string(placementv1alpha1.StagedUpdateRunConditionSucceeded))
startedcond := generateTrueCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionProgressing)
finishedcond := generateFalseCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionSucceeded)
meta.SetStatusCondition(&updateRun.Status.Conditions, startedcond)
meta.SetStatusCondition(&updateRun.Status.Conditions, finishedcond)
Expect(k8sClient.Status().Update(ctx, updateRun)).Should(Succeed(), "failed to update the clusterStagedUpdateRun")
Expand Down Expand Up @@ -136,7 +138,7 @@ var _ = Describe("Test the clusterStagedUpdateRun controller", func() {
validateUpdateRunHasFinalizer(ctx, updateRun)

By("Updating the clusterStagedUpdateRun status to processing")
startedcond := getTrueCondition(updateRun, string(placementv1alpha1.StagedUpdateRunConditionProgressing))
startedcond := generateTrueCondition(updateRun, placementv1alpha1.StagedUpdateRunConditionProgressing)
meta.SetStatusCondition(&updateRun.Status.Conditions, startedcond)
Expect(k8sClient.Status().Update(ctx, updateRun)).Should(Succeed(), "failed to add condition to the clusterStagedUpdateRun")

Expand Down Expand Up @@ -467,35 +469,71 @@ func validateApprovalRequestCount(ctx context.Context, count int) {
}, timeout, interval).Should(Equal(count), "approval requests count mismatch")
}

func getTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType string) metav1.Condition {
reason := ""
switch condType {
case string(placementv1alpha1.StagedUpdateRunConditionInitialized):
reason = condition.UpdateRunInitializeSucceededReason
case string(placementv1alpha1.StagedUpdateRunConditionProgressing):
reason = condition.UpdateRunStartedReason
case string(placementv1alpha1.StagedUpdateRunConditionSucceeded):
reason = condition.UpdateRunSucceededReason
func generateTrueCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
switch cond {
case placementv1alpha1.StagedUpdateRunConditionInitialized:
reason = condition.UpdateRunInitializeSucceededReason
case placementv1alpha1.StagedUpdateRunConditionProgressing:
reason = condition.UpdateRunStartedReason
case placementv1alpha1.StagedUpdateRunConditionSucceeded:
reason = condition.UpdateRunSucceededReason
}
typeStr = string(cond)
case placementv1alpha1.StageUpdatingConditionType:
switch cond {
case placementv1alpha1.StageUpdatingConditionProgressing:
reason = condition.StageUpdatingStartedReason
case placementv1alpha1.StageUpdatingConditionSucceeded:
reason = condition.StageUpdatingSucceededReason
}
typeStr = string(cond)
case placementv1alpha1.ClusterUpdatingStatusConditionType:
switch cond {
case placementv1alpha1.ClusterUpdatingConditionStarted:
reason = condition.ClusterUpdatingStartedReason
case placementv1alpha1.ClusterUpdatingConditionSucceeded:
reason = condition.ClusterUpdatingSucceededReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionTrue,
Type: condType,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
Reason: reason,
}
}

func getFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType string) metav1.Condition {
reason := ""
switch condType {
case string(placementv1alpha1.StagedUpdateRunConditionInitialized):
reason = condition.UpdateRunInitializeFailedReason
case string(placementv1alpha1.StagedUpdateRunConditionSucceeded):
reason = condition.UpdateRunFailedReason
func generateFalseCondition(updateRun *placementv1alpha1.ClusterStagedUpdateRun, condType any) metav1.Condition {
reason, typeStr := "", ""
switch cond := condType.(type) {
case placementv1alpha1.StagedUpdateRunConditionType:
switch cond {
case placementv1alpha1.StagedUpdateRunConditionInitialized:
reason = condition.UpdateRunInitializeFailedReason
case placementv1alpha1.StagedUpdateRunConditionSucceeded:
reason = condition.UpdateRunFailedReason
}
typeStr = string(cond)
case placementv1alpha1.StageUpdatingConditionType:
switch cond {
case placementv1alpha1.StageUpdatingConditionSucceeded:
reason = condition.StageUpdatingFailedReason
}
typeStr = string(cond)
case placementv1alpha1.ClusterUpdatingStatusConditionType:
switch cond {
case placementv1alpha1.ClusterUpdatingConditionSucceeded:
reason = condition.ClusterUpdatingFailedReason
}
typeStr = string(cond)
}
return metav1.Condition{
Status: metav1.ConditionFalse,
Type: condType,
Type: typeStr,
ObservedGeneration: updateRun.Generation,
Reason: reason,
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *Reconciler) collectScheduledClusters(
for i, binding := range bindingList.Items {
if binding.Spec.SchedulingPolicySnapshotName == latestPolicySnapshot.Name {
if binding.Spec.State != placementv1beta1.BindingStateScheduled && binding.Spec.State != placementv1beta1.BindingStateBound {
stateErr := fmt.Errorf("binding `%s`'s state %s is not scheduled or bound", binding.Name, binding.Spec.State)
stateErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("binding `%s`'s state %s is not scheduled or bound", binding.Name, binding.Spec.State))
klog.ErrorS(stateErr, "Failed to collect clusterResourceBindings", "clusterResourcePlacement", placementName, "latestPolicySnapshot", latestPolicySnapshot.Name, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
return nil, nil, fmt.Errorf("%w: %s", errInitializedFailed, stateErr.Error())
Expand Down Expand Up @@ -255,6 +255,7 @@ func (r *Reconciler) computeRunStageStatus(
for _, binding := range scheduledBindings {
allSelectedClusters[binding.Spec.TargetCluster] = struct{}{}
}
stagesStatus := make([]placementv1alpha1.StageUpdatingStatus, 0, len(updateRun.Status.StagedUpdateStrategySnapshot.Stages))

// Apply the label selectors from the ClusterStagedUpdateStrategy to filter the clusters.
for _, stage := range updateRun.Status.StagedUpdateStrategySnapshot.Stages {
Expand All @@ -272,7 +273,7 @@ func (r *Reconciler) computeRunStageStatus(
klog.ErrorS(err, "Failed to convert label selector", "clusterStagedUpdateStrategy", updateStrategyName, "stage name", stage.Name, "labelSelector", stage.LabelSelector, "clusterStagedUpdateRun", updateRunRef)
// no more retries here.
invalidLabelErr := controller.NewUserError(fmt.Errorf("the stage label selector is invalid, clusterStagedUpdateStrategy: %s, stage: %s, err: %s", updateStrategyName, stage.Name, err.Error()))
return controller.NewUserError(fmt.Errorf("%w: %s", errInitializedFailed, invalidLabelErr.Error()))
return fmt.Errorf("%w: %s", errInitializedFailed, invalidLabelErr.Error())
}
// List all the clusters that match the label selector.
var clusterList clusterv1beta1.MemberClusterList
Expand Down Expand Up @@ -340,8 +341,9 @@ func (r *Reconciler) computeRunStageStatus(
curStageUpdatingStatus.AfterStageTaskStatus[i].ApprovalRequestName = fmt.Sprintf(placementv1alpha1.ApprovalTaskNameFmt, updateRun.Name, stage.Name)
}
}
updateRun.Status.StagesStatus = append(updateRun.Status.StagesStatus, curStageUpdatingStatus)
stagesStatus = append(stagesStatus, curStageUpdatingStatus)
}
updateRun.Status.StagesStatus = stagesStatus

// Check if the clusters are all placed.
if len(allPlacedClusters) != len(allSelectedClusters) {
Expand Down
Loading

0 comments on commit fd9a057

Please sign in to comment.