Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jwtty committed Dec 17, 2024
1 parent 453fec3 commit ddc7a6b
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 159 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *Reconciler) generateStagesByStrategy(
// This won't change even if the stagedUpdateStrategy changes or is deleted after the updateRun is initialized.
updateRun.Status.StagedUpdateStrategySnapshot = &updateStrategy.Spec

// Compute the stages including the deletion stage.
// Compute the update stages.
if err := r.computeRunStageStatus(ctx, scheduledBindings, updateRun); err != nil {
return err
}
Expand Down
53 changes: 34 additions & 19 deletions pkg/controllers/updaterun/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (r *Reconciler) validate(
}
// Validate the applyStrategy.
if !reflect.DeepEqual(updateRun.Status.ApplyStrategy, updateRunCopy.Status.ApplyStrategy) {
mismatchErr := fmt.Errorf("the applyStrategy in the clusterStagedUpdateRun is outdated, latest: %v, recorded: %v", updateRunCopy.Status.ApplyStrategy, updateRun.Status.ApplyStrategy)
mismatchErr := controller.NewUserError(fmt.Errorf("the applyStrategy in the clusterStagedUpdateRun is outdated, latest: %v, recorded: %v", updateRunCopy.Status.ApplyStrategy, updateRun.Status.ApplyStrategy))
klog.ErrorS(mismatchErr, "the applyStrategy in the clusterResourcePlacement has changed", "clusterResourcePlacement", placementName, "clusterStagedUpdateRun", updateRunRef)
return -1, nil, nil, fmt.Errorf("%w: %s", errStagedUpdatedAborted, mismatchErr.Error())
}
Expand Down Expand Up @@ -70,14 +70,8 @@ func (r *Reconciler) validate(
return -1, nil, nil, err
}

// if condition.IsConditionStatusFalse(meta.FindStatusCondition(updateRun.Status.Conditions, string(placementv1alpha1.StagedUpdateRunConditionProgressing)), updateRun.Generation) {
// // The clusterStagedUpdateRun has not started yet.
// klog.V(2).InfoS("Starting the staged update run from the beginning", "clusterStagedUpdateRun", updateRunRef)
// return 0, scheduledBindings, toBeDeletedBindings, nil
// }

// Validate the stages and return the updating stage index.
updatingStageIndex, err := r.validateStagesStatus(ctx, scheduledBindings, updateRun, updateRunCopy)
updatingStageIndex, err := r.validateStagesStatus(ctx, scheduledBindings, toBeDeletedBindings, updateRun, updateRunCopy)
if err != nil {
return -1, nil, nil, err
}
Expand All @@ -91,7 +85,7 @@ func (r *Reconciler) validate(
// If the updating stage index is len(updateRun.Status.StagesStatus), the next stage to be updated will be the delete stage.
func (r *Reconciler) validateStagesStatus(
ctx context.Context,
scheduledBindings []*placementv1beta1.ClusterResourceBinding,
scheduledBindings, toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
updateRun, updateRunCopy *placementv1alpha1.ClusterStagedUpdateRun,
) (int, error) {
updateRunRef := klog.KObj(updateRun)
Expand All @@ -115,18 +109,18 @@ func (r *Reconciler) validateStagesStatus(
klog.ErrorS(unexpectedErr, "Failed to find the stagesStatus in the clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
updatingStageIndex, lastFinishedStageIndex, validateErr := validateUpdateStageStatus(existingStageStatus, updateRunCopy)
updatingStageIndex, lastFinishedStageIndex, validateErr := validateUpdateStagesStatus(existingStageStatus, updateRunCopy)
if validateErr != nil {
return -1, validateErr
}

return validateDeleteStageStatus(updatingStageIndex, lastFinishedStageIndex, len(existingStageStatus), updateRunCopy)
return validateDeleteStageStatus(updatingStageIndex, lastFinishedStageIndex, len(existingStageStatus), toBeDeletedBindings, updateRunCopy)
}

// validateUpdateStageStatus is a helper function to validate the updating stages in the clusterStagedUpdateRun.
// validateUpdateStagesStatus is a helper function to validate the updating stages in the clusterStagedUpdateRun.
// It compares the existing stage status with the latest list of clusters to be updated.
// It returns the index of the updating stage, the index of the last finished stage and any error encountered.
func validateUpdateStageStatus(existingStageStatus []placementv1alpha1.StageUpdatingStatus, updateRun *placementv1alpha1.ClusterStagedUpdateRun) (int, int, error) {
func validateUpdateStagesStatus(existingStageStatus []placementv1alpha1.StageUpdatingStatus, updateRun *placementv1alpha1.ClusterStagedUpdateRun) (int, int, error) {
updatingStageIndex := -1
lastFinishedStageIndex := -1
// Remember the newly computed stage status.
Expand Down Expand Up @@ -159,17 +153,19 @@ func validateUpdateStageStatus(existingStageStatus []placementv1alpha1.StageUpda
}

var err error
updatingStageIndex, lastFinishedStageIndex, err = determineUpdatingStage(curStage, updatingStageIndex, lastFinishedStageIndex, &existingStageStatus[curStage], updateRun)
updatingStageIndex, lastFinishedStageIndex, err = validateClusterUpdatingStatus(curStage, updatingStageIndex, lastFinishedStageIndex, &existingStageStatus[curStage], updateRun)
if err != nil {
return -1, -1, err
}
}
return updatingStageIndex, lastFinishedStageIndex, nil
}

// determineUpdatingStage validates a stage status and looks for the updating stage and last finished stage.
// It returns latest updatingStageIndex, lastFinishedStageIndex and any error encountered.
func determineUpdatingStage(
// validateClusterUpdatingStatus validates clusters updating status inside a single stage.
// It checks the cluster updating status according to the stage status and returns error if there's mismatch.
// It accepts current `updatingStageIndex` and `lastFinishedStageIndex` for cross-stage validation.
// It returns `curStage` as updatingStageIndex if the stage is updating or advances `lastFinishedStageIndex` if the stage has finished.
func validateClusterUpdatingStatus(
curStage, updatingStageIndex, lastFinishedStageIndex int,
stageStatus *placementv1alpha1.StageUpdatingStatus,
updateRun *placementv1alpha1.ClusterStagedUpdateRun,
Expand Down Expand Up @@ -236,6 +232,7 @@ func determineUpdatingStage(
}
}
// We don't allow more than one clusters to be updating at the same time.
// TODO(wantjian): support multiple clusters updating at the same time.
if len(updatingClusters) > 1 {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("more than one cluster is updating in the stage `%s`, clusters: %v", stageStatus.StageName, updatingClusters))
klog.ErrorS(unexpectedErr, "Detected more than one updating clusters in the stage", "clusterStagedUpdateRun", klog.KObj(updateRun))
Expand All @@ -249,6 +246,7 @@ func determineUpdatingStage(
// It returns the updating stage index, or any error encountered.
func validateDeleteStageStatus(
updatingStageIndex, lastFinishedStageIndex, totalStages int,
toBeDeletedBindings []*placementv1beta1.ClusterResourceBinding,
updateRun *placementv1alpha1.ClusterStagedUpdateRun,
) (int, error) {
updateRunRef := klog.KObj(updateRun)
Expand All @@ -258,6 +256,22 @@ func validateDeleteStageStatus(
klog.ErrorS(unexpectedErr, "Failed to find the deletionStageStatus in the clusterStagedUpdateRun", "clusterStagedUpdateRun", updateRunRef)
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}

// Validate whether toBeDeletedBindings are a subnet of the clusters in the delete stage status.
// We only validate if it's a subnet because clusters can unjoin from the fleet and we allow that.
// We only need to check the existence, not the order, because clusters are always sorted by name in the delete stage.
deletingClusterMap := make(map[string]struct{}, len(existingDeleteStageStatus.Clusters))
for _, cluster := range existingDeleteStageStatus.Clusters {
deletingClusterMap[cluster.ClusterName] = struct{}{}
}
for _, binding := range toBeDeletedBindings {
if _, ok := deletingClusterMap[binding.Spec.TargetCluster]; !ok {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the cluster `%s` to be deleted is not in the delete stage", binding.Spec.TargetCluster))
klog.ErrorS(unexpectedErr, "Detect new cluster to be unscheduled", "clusterResourceBinding", klog.KObj(binding), "clusterStagedUpdateRun", updateRunRef)
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}
}

deleteStageFinishedCond := meta.FindStatusCondition(existingDeleteStageStatus.Conditions, string(placementv1alpha1.StagedUpdateRunConditionSucceeded))
deleteStageProgressingCond := meta.FindStatusCondition(existingDeleteStageStatus.Conditions, string(placementv1alpha1.StagedUpdateRunConditionProgressing))
// Check if there is any active updating stage
Expand All @@ -271,7 +285,8 @@ func validateDeleteStageStatus(
return -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
}

// If no stage is updating, continue from the last finished stage (which will result it starting from 0).
// If no stage is updating, continue from the last finished stage.
// We initialized lastFinishedStageIndex to -1, so that from the very beginning, we start from 0, the first stage.
if updatingStageIndex == -1 {
updatingStageIndex = lastFinishedStageIndex + 1
}
Expand Down Expand Up @@ -313,7 +328,7 @@ func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun *place
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the ClusterStagedUpdateRun status as failed", "clusterStagedUpdateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewAPIServerError(false, updateErr)
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}
Loading

0 comments on commit ddc7a6b

Please sign in to comment.