From b2d4621639a3f5681de15652e9dd4d7cd182788d Mon Sep 17 00:00:00 2001 From: haijianyang Date: Wed, 2 Aug 2023 23:47:05 -0400 Subject: [PATCH] Update --- .../elfmachine_controller_placement_group.go | 94 ++++++++++--------- controllers/elfmachine_controller_test.go | 26 +++-- pkg/service/collections.go | 20 +++- pkg/service/collections_test.go | 17 +++- pkg/util/kcp/kcp.go | 39 +++----- 5 files changed, 113 insertions(+), 83 deletions(-) diff --git a/controllers/elfmachine_controller_placement_group.go b/controllers/elfmachine_controller_placement_group.go index 6a13b896..f17a9803 100644 --- a/controllers/elfmachine_controller_placement_group.go +++ b/controllers/elfmachine_controller_placement_group.go @@ -168,9 +168,9 @@ func (r *ElfMachineReconciler) preCheckPlacementGroup(ctx *context.MachineContex return nil, err } - // When KCP is not in scaling down and it's not rolling update the 1st CP Machine, just return since the placement group is full. - if !(kcputil.IsKCPRollingUpdateFirstMachine(kcp) || kcputil.IsKCPInScalingDown(kcp)) { - ctx.Logger.V(1).Info("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String()) + // When KCP is not in rolling update and it's not in scaling down, just return since the placement group is full. + if !kcputil.IsKCPInRollingUpdate(kcp) && !kcputil.IsKCPInScalingDown(kcp) { + ctx.Logger.V(1).Info("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String()) return nil, nil } @@ -211,31 +211,30 @@ func (r *ElfMachineReconciler) preCheckPlacementGroup(ctx *context.MachineContex return nil, nil } - // When the PlacementGroup is full and it's rolling update the 1st CP Machine and, - // place the VM on the target host without joining the PlacementGroup and power it on. - // After other CP Machine are rolled out successfully, add this 1st CP VM into the PlacementGroup. + // Now since the PlacementGroup is full and KCP is in rolling update, + // we will place the target CP VM on a selected host without joining the PlacementGroup and power it on. + // After other CP VMs are rolled out successfully, these VMs must already join the PlacementGroup + // since there is always an empty seat in the PlacementGroup, we will add the target CP VM into the PlacementGroup. - if usedHostsByPG.Len() == usedHostsByPG.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Len() && - int(*kcp.Spec.Replicas) == usedHostsByPG.Len() { - // Only when KCP is in rolling update and the placement group is full - // does it need to get the latest created machine. - hostID, err := r.getVMHostForRollingUpdate(ctx, placementGroup, hosts) - if err != nil || hostID == "" { - return nil, err - } + usedAndUnavailableHostsByPG := usedHostsByPG.FilterUnavailableHostsWithoutEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)) + if !usedAndUnavailableHostsByPG.IsEmpty() { + ctx.Logger.V(1).Info("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts", "placementGroup", *placementGroup.Name, "usedAndUnavailableHostsByPG", usedAndUnavailableHostsByPG.String(), "usedHostsByPG", usedHostsByPG.String()) - return pointer.String(hostID), err + return nil, nil } - ctx.Logger.V(1).Info("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String()) + hostID, err := r.getVMHostForRollingUpdate(ctx, placementGroup, hosts) + if err != nil || hostID == "" { + return nil, err + } - return nil, nil + return pointer.String(hostID), err } // getVMHostForRollingUpdate returns the target host server id for a virtual machine during rolling update. // During KCP rolling update, machines will be deleted in the order of creation. // Find the latest created machine in the placement group, -// and set the host where the machine is located to the first machine created by KCP rolling update. +// and set the host where the machine is located to the machine created by KCP rolling update. // This prevents migration of virtual machine during KCP rolling update when using a placement group. func (r *ElfMachineReconciler) getVMHostForRollingUpdate(ctx *context.MachineContext, placementGroup *models.VMPlacementGroup, hosts service.Hosts) (string, error) { elfMachines, err := machineutil.GetControlPlaneElfMachinesInCluster(ctx, ctx.Client, ctx.Cluster.Namespace, ctx.Cluster.Name) @@ -265,27 +264,31 @@ func (r *ElfMachineReconciler) getVMHostForRollingUpdate(ctx *context.MachineCon } machines := collections.FromMachines(placementGroupMachines...) - if machine := machines.Newest(); machine != nil { - if vm, err := ctx.VMService.Get(vmMap[machine.Name]); err != nil { - return "", err - } else { - host := hosts.Get(*vm.Host.ID) - if host == nil { - ctx.Logger.Info("Host not found, skip selecting host for VM", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) - } else { - ok, message := service.IsAvailableHost(host, *service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)) - if ok { - ctx.Logger.Info("Selected the host server for VM since the placement group is full", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) - - return *vm.Host.ID, nil - } + newestMachine := machines.Newest() + if newestMachine == nil { + ctx.Logger.Info("Newest machine not found, skip selecting host for VM", "vmRef", ctx.ElfMachine.Status.VMRef) + return "", nil + } - ctx.Logger.Info(fmt.Sprintf("Host is unavailable: %s, skip selecting host for VM", message), "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) - } - } + vm, err := ctx.VMService.Get(vmMap[newestMachine.Name]) + if err != nil { + return "", err } - return "", nil + host := hosts.Get(*vm.Host.ID) + if host == nil { + ctx.Logger.Info("Host not found, skip selecting host for VM", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) + return "", err + } + + ok, message := service.IsAvailableHost(host, *service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)) + if ok { + ctx.Logger.Info("Selected the host server for VM since the placement group is full", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) + return *vm.Host.ID, nil + } + + ctx.Logger.Info(fmt.Sprintf("Host is unavailable: %s, skip selecting host for VM", message), "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef) + return "", err } // getHostsInPlacementGroup returns the hosts where all virtual machines of placement group located. @@ -310,7 +313,7 @@ func (r *ElfMachineReconciler) getHostsInPlacementGroup(ctx *context.MachineCont // It returns hosts that are not in faulted state, not in the given 'usedHostsByPG', // and have sufficient memory for running this VM. func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContext, hosts service.Hosts, usedHostsByPG service.Hosts, vm *models.VM) service.Hosts { - availableHosts := hosts.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Difference(usedHostsByPG) + availableHosts := hosts.FilterAvailableHostsWithEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Difference(usedHostsByPG) // If the VM is running, and the host where the VM is located // is not used by the placement group, then it is not necessary to @@ -415,14 +418,17 @@ func (r *ElfMachineReconciler) joinPlacementGroup(ctx *context.MachineContext, v // Only when the KCP is in rolling update, the VM is stopped, and all the hosts used by the placement group are available, // will the upgrade be allowed with the same number of hosts and CP nodes. - // In this case first machine created by KCP rolling update can be powered on without being added to the placement group. - if kcputil.IsKCPRollingUpdateFirstMachine(kcp) && - *vm.Status == models.VMStatusSTOPPED && - usedHostsByPG.Len() == usedHostsByPG.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Len() && - int(*kcp.Spec.Replicas) == usedHostsByPG.Len() { - ctx.Logger.Info("The placement group is full and KCP is in rolling update, skip adding VM to the placement group", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID) + // In this case machine created by KCP rolling update can be powered on without being added to the placement group. + if kcputil.IsKCPInRollingUpdate(kcp) && *vm.Status == models.VMStatusSTOPPED { + usedAndUnavailableHostsByPG := usedHostsByPG.FilterUnavailableHostsWithoutEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)) + if usedAndUnavailableHostsByPG.IsEmpty() { + ctx.Logger.Info("KCP is in rolling update, the placement group is full and has no unavailable hosts, skip adding VM to the placement group", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID) + return true, nil + } - return true, nil + ctx.Logger.Info("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID) + + return false, nil } if *vm.Status == models.VMStatusRUNNING || *vm.Status == models.VMStatusSUSPENDED { diff --git a/controllers/elfmachine_controller_test.go b/controllers/elfmachine_controller_test.go index 61da905c..72343811 100644 --- a/controllers/elfmachine_controller_test.go +++ b/controllers/elfmachine_controller_test.go @@ -992,20 +992,20 @@ var _ = Describe("ElfMachineReconciler", func() { ok, err := reconciler.joinPlacementGroup(machineContext, vm) Expect(ok).To(BeTrue()) Expect(err).To(BeZero()) - Expect(logBuffer.String()).To(ContainSubstring("The placement group is full and KCP is in rolling update, skip adding VM to the placement group")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has no unavailable hosts, skip adding VM to the placement group")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) host.HostState = &models.NestedMaintenanceHostState{State: models.NewMaintenanceModeEnum(models.MaintenanceModeEnumMAINTENANCEMODE)} mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil) mockVMService.EXPECT().GetHostsByCluster(elfCluster.Spec.Cluster).Return(service.NewHosts(host), nil) - mockVMService.EXPECT().FindByIDs([]string{*placementGroup.Vms[0].ID}).Return([]*models.VM{}, nil) + mockVMService.EXPECT().FindByIDs([]string{*placementGroup.Vms[0].ID}).Return([]*models.VM{vm2}, nil) reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} ok, err = reconciler.joinPlacementGroup(machineContext, vm) Expect(ok).To(BeFalse()) Expect(err).To(BeZero()) - Expect(logBuffer.String()).To(ContainSubstring("The placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) @@ -1229,6 +1229,7 @@ var _ = Describe("ElfMachineReconciler", func() { placementGroup.Vms = []*models.NestedVM{} kcp.Spec.Replicas = pointer.Int32(3) kcp.Status.Replicas = 3 + kcp.Status.UpdatedReplicas = 3 ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, kcp, elfMachine1, machine1, elfMachine2, machine2, elfMachine3, machine3) machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService) fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine) @@ -1248,6 +1249,7 @@ var _ = Describe("ElfMachineReconciler", func() { hostID, err := reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(*hostID).To(Equal("")) + Expect(logBuffer.String()).To(ContainSubstring("The placement group still has capacity")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) @@ -1260,7 +1262,7 @@ var _ = Describe("ElfMachineReconciler", func() { hostID, err = reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(hostID).To(BeNil()) - Expect(logBuffer.String()).To(ContainSubstring("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts")) }) It("when placement group is full and KCP rolling update in progress", func() { @@ -1334,7 +1336,7 @@ var _ = Describe("ElfMachineReconciler", func() { host, err = reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(host).To(BeNil()) - Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) @@ -1348,7 +1350,7 @@ var _ = Describe("ElfMachineReconciler", func() { host, err = reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(host).To(BeNil()) - Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) @@ -1362,7 +1364,7 @@ var _ = Describe("ElfMachineReconciler", func() { host, err = reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(host).To(BeNil()) - Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts")) logBuffer = new(bytes.Buffer) klog.SetOutput(logBuffer) @@ -1416,6 +1418,14 @@ var _ = Describe("ElfMachineReconciler", func() { Expect(err).To(BeZero()) Expect(hostID).To(Equal(*vm3.Host.ID)) Expect(logBuffer.String()).To(ContainSubstring("Selected the host server for VM since the placement group is full")) + + ctrlContext = newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, kcp) + machineContext = newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService) + reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService} + hostID, err = reconciler.getVMHostForRollingUpdate(machineContext, placementGroup, service.NewHostsFromList(hosts)) + Expect(err).To(BeZero()) + Expect(hostID).To(Equal("")) + Expect(logBuffer.String()).To(ContainSubstring("Newest machine not found, skip selecting host for VM")) }) }) @@ -1458,7 +1468,7 @@ var _ = Describe("ElfMachineReconciler", func() { hostID, err := reconciler.preCheckPlacementGroup(machineContext) Expect(err).To(BeZero()) Expect(hostID).To(BeNil()) - Expect(logBuffer.String()).To(ContainSubstring("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts")) + Expect(logBuffer.String()).To(ContainSubstring("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts")) expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForAvailableHostRequiredByPlacementGroupReason}}) elfMachine.Status.Conditions = nil diff --git a/pkg/service/collections.go b/pkg/service/collections.go index 05c78d8a..98680733 100644 --- a/pkg/service/collections.go +++ b/pkg/service/collections.go @@ -67,21 +67,35 @@ func (s Hosts) IsEmpty() bool { func (s Hosts) String() string { str := "" + for _, host := range s { - str += fmt.Sprintf("{id: %s,name: %s},", GetTowerString(host.ID), GetTowerString(host.Name)) + state := "" + if host.HostState != nil { + state = string(*host.HostState.State) + } + + str += fmt.Sprintf("{id: %s,name: %s,memory: %d,status: %s,state: %s},", GetTowerString(host.ID), GetTowerString(host.Name), GetTowerInt64(host.AllocatableMemoryBytes), string(*host.Status), state) } return fmt.Sprintf("[%s]", str) } -// Available returns a Hosts with available hosts. -func (s Hosts) Available(memory int64) Hosts { +// FilterAvailableHostsWithEnoughMemory returns a Hosts containing the available host which has allocatable memory no less than the specified memory. +func (s Hosts) FilterAvailableHostsWithEnoughMemory(memory int64) Hosts { return s.Filter(func(h *models.Host) bool { ok, _ := IsAvailableHost(h, memory) return ok }) } +// FilterUnavailableHostsWithoutEnoughMemory returns a Hosts containing the unavailable host which has allocatable memory less than the specified memory. +func (s Hosts) FilterUnavailableHostsWithoutEnoughMemory(memory int64) Hosts { + return s.Filter(func(h *models.Host) bool { + ok, _ := IsAvailableHost(h, memory) + return !ok + }) +} + // Get returns a Host of the specified host. func (s Hosts) Get(hostID string) *models.Host { if host, ok := s[hostID]; ok { diff --git a/pkg/service/collections_test.go b/pkg/service/collections_test.go index cf74e046..dabb92a9 100644 --- a/pkg/service/collections_test.go +++ b/pkg/service/collections_test.go @@ -17,6 +17,7 @@ limitations under the License. package service import ( + "fmt" "testing" "github.com/onsi/gomega" @@ -47,12 +48,24 @@ func TestHostCollection(t *testing.T) { host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2"), AllocatableMemoryBytes: pointer.Int64(2), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)} hosts := NewHosts() - g.Expect(hosts.Available(0).Len()).To(gomega.Equal(0)) + g.Expect(hosts.FilterAvailableHostsWithEnoughMemory(0).Len()).To(gomega.Equal(0)) hosts = NewHostsFromList([]*models.Host{host1, host2}) - availableHosts := hosts.Available(2) + availableHosts := hosts.FilterAvailableHostsWithEnoughMemory(2) g.Expect(availableHosts.Len()).To(gomega.Equal(1)) g.Expect(availableHosts.Contains(*host2.ID)).To(gomega.BeTrue()) + + hosts = NewHosts() + unavailableHosts := hosts.FilterUnavailableHostsWithoutEnoughMemory(0) + g.Expect(unavailableHosts.IsEmpty()).To(gomega.BeTrue()) + g.Expect(unavailableHosts.Len()).To(gomega.Equal(0)) + g.Expect(unavailableHosts.String()).To(gomega.Equal("[]")) + + hosts = NewHostsFromList([]*models.Host{host1, host2}) + unavailableHosts = hosts.FilterUnavailableHostsWithoutEnoughMemory(2) + g.Expect(unavailableHosts.Len()).To(gomega.Equal(1)) + g.Expect(unavailableHosts.Contains(*host1.ID)).To(gomega.BeTrue()) + g.Expect(unavailableHosts.String()).To(gomega.Equal(fmt.Sprintf("[{id: %s,name: %s,memory: %d,status: %s,state: %s},]", *host1.ID, *host1.Name, *host1.AllocatableMemoryBytes, string(*host1.Status), ""))) }) t.Run("Difference", func(t *testing.T) { diff --git a/pkg/util/kcp/kcp.go b/pkg/util/kcp/kcp.go index 0c9a33e8..afbaffd3 100644 --- a/pkg/util/kcp/kcp.go +++ b/pkg/util/kcp/kcp.go @@ -22,36 +22,23 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" ) -// IsKCPInRollingUpdate returns whether KCP is in scaling down. +// IsKCPInRollingUpdate returns whether KCP is in rolling update. // -// When KCP is in rolling update, KCP controller marks -// MachinesSpecUpToDateCondition to false and RollingUpdateInProgressReason as Reason. -// -// When all machines are up to date, KCP controller marks MachinesSpecUpToDateCondition to true. -// -// For more information about KCP MachinesSpecUpToDateCondition and RollingUpdateInProgressReason, refer to https://github.com/kubernetes-sigs/cluster-api/blob/main/api/v1beta1/condition_consts.go -func IsKCPInRollingUpdate(kcp *controlplanev1.KubeadmControlPlane) bool { - return conditions.IsFalse(kcp, controlplanev1.MachinesSpecUpToDateCondition) && - conditions.GetReason(kcp, controlplanev1.MachinesSpecUpToDateCondition) == controlplanev1.RollingUpdateInProgressReason -} - -// IsKCPRollingUpdateFirstMachine returns true if KCP is in rolling update and creating the first CP Machine. -// -// KCP rollout algorithm is as follows: -// Find Machines that have an outdated spec, If there is a machine requiring rollout -// 1.Scale up control plane creating a machine with the new spec -// 2.Scale down control plane by removing one of the machine that needs rollout (the oldest out-of date machine in the failure domain that has the most control-plane machines on it) -// -// kcp.Status.UpdatedReplicas is the total number of machines that are up to date with the control -// plane's configuration and therefore do not require rollout. -// -// So when KCP is in rolling update and creating the first CP Machine, -// kcp.Status.Replicas is greater than kcp.Spec.Replicas and kcp.Status.UpdatedReplicas equals 1. +// When *kcp.Spec.Replicas > kcp.Status.UpdatedReplicas, it must be in a KCP rolling update process. +// When *kcp.Spec.Replicas == kcp.Status.UpdatedReplicas, it could be in one of the following cases: +// 1. It's not in a KCP rolling update process. So kcp.Spec.Replicas == kcp.Status.Replicas. +// 2. It's at the end of a KCP rolling update process, and the last KCP replica (i.e the last KCP ElfMachine) is created just now. +// There is still an old KCP ElfMachine, so kcp.Spec.Replicas + 1 == kcp.Status.Replicas. // // For more information about KCP replicas, refer to https://github.com/kubernetes-sigs/cluster-api/blob/main/controlplane/kubeadm/api/v1beta1/kubeadm_control_plane_types.go // For more information about KCP rollout, refer to https://github.com/kubernetes-sigs/cluster-api/blob/main/docs/proposals/20191017-kubeadm-based-control-plane.md#kubeadmcontrolplane-rollout -func IsKCPRollingUpdateFirstMachine(kcp *controlplanev1.KubeadmControlPlane) bool { - return IsKCPInRollingUpdate(kcp) && kcp.Status.UpdatedReplicas == 1 +func IsKCPInRollingUpdate(kcp *controlplanev1.KubeadmControlPlane) bool { + if (*kcp.Spec.Replicas > kcp.Status.UpdatedReplicas && *kcp.Spec.Replicas <= kcp.Status.Replicas) || + (*kcp.Spec.Replicas == kcp.Status.UpdatedReplicas && *kcp.Spec.Replicas < kcp.Status.Replicas) { + return true + } + + return false } // IsKCPInScalingDown returns whether KCP is in scaling down.