Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Aug 3, 2023
1 parent d881627 commit b2d4621
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 83 deletions.
94 changes: 50 additions & 44 deletions controllers/elfmachine_controller_placement_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ func (r *ElfMachineReconciler) preCheckPlacementGroup(ctx *context.MachineContex
return nil, err
}

// When KCP is not in scaling down and it's not rolling update the 1st CP Machine, just return since the placement group is full.
if !(kcputil.IsKCPRollingUpdateFirstMachine(kcp) || kcputil.IsKCPInScalingDown(kcp)) {
ctx.Logger.V(1).Info("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String())
// When KCP is not in rolling update and it's not in scaling down, just return since the placement group is full.
if !kcputil.IsKCPInRollingUpdate(kcp) && !kcputil.IsKCPInScalingDown(kcp) {
ctx.Logger.V(1).Info("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String())

return nil, nil
}
Expand Down Expand Up @@ -211,31 +211,30 @@ func (r *ElfMachineReconciler) preCheckPlacementGroup(ctx *context.MachineContex
return nil, nil
}

// When the PlacementGroup is full and it's rolling update the 1st CP Machine and,
// place the VM on the target host without joining the PlacementGroup and power it on.
// After other CP Machine are rolled out successfully, add this 1st CP VM into the PlacementGroup.
// Now since the PlacementGroup is full and KCP is in rolling update,
// we will place the target CP VM on a selected host without joining the PlacementGroup and power it on.
// After other CP VMs are rolled out successfully, these VMs must already join the PlacementGroup
// since there is always an empty seat in the PlacementGroup, we will add the target CP VM into the PlacementGroup.

if usedHostsByPG.Len() == usedHostsByPG.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Len() &&
int(*kcp.Spec.Replicas) == usedHostsByPG.Len() {
// Only when KCP is in rolling update and the placement group is full
// does it need to get the latest created machine.
hostID, err := r.getVMHostForRollingUpdate(ctx, placementGroup, hosts)
if err != nil || hostID == "" {
return nil, err
}
usedAndUnavailableHostsByPG := usedHostsByPG.FilterUnavailableHostsWithoutEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB))
if !usedAndUnavailableHostsByPG.IsEmpty() {
ctx.Logger.V(1).Info("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts", "placementGroup", *placementGroup.Name, "usedAndUnavailableHostsByPG", usedAndUnavailableHostsByPG.String(), "usedHostsByPG", usedHostsByPG.String())

return pointer.String(hostID), err
return nil, nil
}

ctx.Logger.V(1).Info("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String())
hostID, err := r.getVMHostForRollingUpdate(ctx, placementGroup, hosts)
if err != nil || hostID == "" {
return nil, err
}

return nil, nil
return pointer.String(hostID), err
}

// getVMHostForRollingUpdate returns the target host server id for a virtual machine during rolling update.
// During KCP rolling update, machines will be deleted in the order of creation.
// Find the latest created machine in the placement group,
// and set the host where the machine is located to the first machine created by KCP rolling update.
// and set the host where the machine is located to the machine created by KCP rolling update.
// This prevents migration of virtual machine during KCP rolling update when using a placement group.
func (r *ElfMachineReconciler) getVMHostForRollingUpdate(ctx *context.MachineContext, placementGroup *models.VMPlacementGroup, hosts service.Hosts) (string, error) {
elfMachines, err := machineutil.GetControlPlaneElfMachinesInCluster(ctx, ctx.Client, ctx.Cluster.Namespace, ctx.Cluster.Name)
Expand Down Expand Up @@ -265,27 +264,31 @@ func (r *ElfMachineReconciler) getVMHostForRollingUpdate(ctx *context.MachineCon
}

machines := collections.FromMachines(placementGroupMachines...)
if machine := machines.Newest(); machine != nil {
if vm, err := ctx.VMService.Get(vmMap[machine.Name]); err != nil {
return "", err
} else {
host := hosts.Get(*vm.Host.ID)
if host == nil {
ctx.Logger.Info("Host not found, skip selecting host for VM", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)
} else {
ok, message := service.IsAvailableHost(host, *service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB))
if ok {
ctx.Logger.Info("Selected the host server for VM since the placement group is full", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)

return *vm.Host.ID, nil
}
newestMachine := machines.Newest()
if newestMachine == nil {
ctx.Logger.Info("Newest machine not found, skip selecting host for VM", "vmRef", ctx.ElfMachine.Status.VMRef)
return "", nil
}

ctx.Logger.Info(fmt.Sprintf("Host is unavailable: %s, skip selecting host for VM", message), "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)
}
}
vm, err := ctx.VMService.Get(vmMap[newestMachine.Name])
if err != nil {
return "", err
}

return "", nil
host := hosts.Get(*vm.Host.ID)
if host == nil {
ctx.Logger.Info("Host not found, skip selecting host for VM", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)
return "", err
}

ok, message := service.IsAvailableHost(host, *service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB))
if ok {
ctx.Logger.Info("Selected the host server for VM since the placement group is full", "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)
return *vm.Host.ID, nil
}

ctx.Logger.Info(fmt.Sprintf("Host is unavailable: %s, skip selecting host for VM", message), "hostID", *vm.Host.ID, "vmRef", ctx.ElfMachine.Status.VMRef)
return "", err
}

// getHostsInPlacementGroup returns the hosts where all virtual machines of placement group located.
Expand All @@ -310,7 +313,7 @@ func (r *ElfMachineReconciler) getHostsInPlacementGroup(ctx *context.MachineCont
// It returns hosts that are not in faulted state, not in the given 'usedHostsByPG',
// and have sufficient memory for running this VM.
func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContext, hosts service.Hosts, usedHostsByPG service.Hosts, vm *models.VM) service.Hosts {
availableHosts := hosts.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Difference(usedHostsByPG)
availableHosts := hosts.FilterAvailableHostsWithEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Difference(usedHostsByPG)

// If the VM is running, and the host where the VM is located
// is not used by the placement group, then it is not necessary to
Expand Down Expand Up @@ -415,14 +418,17 @@ func (r *ElfMachineReconciler) joinPlacementGroup(ctx *context.MachineContext, v

// Only when the KCP is in rolling update, the VM is stopped, and all the hosts used by the placement group are available,
// will the upgrade be allowed with the same number of hosts and CP nodes.
// In this case first machine created by KCP rolling update can be powered on without being added to the placement group.
if kcputil.IsKCPRollingUpdateFirstMachine(kcp) &&
*vm.Status == models.VMStatusSTOPPED &&
usedHostsByPG.Len() == usedHostsByPG.Available(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB)).Len() &&
int(*kcp.Spec.Replicas) == usedHostsByPG.Len() {
ctx.Logger.Info("The placement group is full and KCP is in rolling update, skip adding VM to the placement group", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID)
// In this case machine created by KCP rolling update can be powered on without being added to the placement group.
if kcputil.IsKCPInRollingUpdate(kcp) && *vm.Status == models.VMStatusSTOPPED {
usedAndUnavailableHostsByPG := usedHostsByPG.FilterUnavailableHostsWithoutEnoughMemory(*service.TowerMemory(ctx.ElfMachine.Spec.MemoryMiB))
if usedAndUnavailableHostsByPG.IsEmpty() {
ctx.Logger.Info("KCP is in rolling update, the placement group is full and has no unavailable hosts, skip adding VM to the placement group", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID)
return true, nil
}

return true, nil
ctx.Logger.Info("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts", "placementGroup", *placementGroup.Name, "availableHosts", availableHosts.String(), "usedHostsByPG", usedHostsByPG.String(), "vmRef", ctx.ElfMachine.Status.VMRef, "vmId", *vm.ID)

return false, nil
}

if *vm.Status == models.VMStatusRUNNING || *vm.Status == models.VMStatusSUSPENDED {
Expand Down
26 changes: 18 additions & 8 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,20 +992,20 @@ var _ = Describe("ElfMachineReconciler", func() {
ok, err := reconciler.joinPlacementGroup(machineContext, vm)
Expect(ok).To(BeTrue())
Expect(err).To(BeZero())
Expect(logBuffer.String()).To(ContainSubstring("The placement group is full and KCP is in rolling update, skip adding VM to the placement group"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has no unavailable hosts, skip adding VM to the placement group"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
host.HostState = &models.NestedMaintenanceHostState{State: models.NewMaintenanceModeEnum(models.MaintenanceModeEnumMAINTENANCEMODE)}
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
mockVMService.EXPECT().GetHostsByCluster(elfCluster.Spec.Cluster).Return(service.NewHosts(host), nil)
mockVMService.EXPECT().FindByIDs([]string{*placementGroup.Vms[0].ID}).Return([]*models.VM{}, nil)
mockVMService.EXPECT().FindByIDs([]string{*placementGroup.Vms[0].ID}).Return([]*models.VM{vm2}, nil)

reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err = reconciler.joinPlacementGroup(machineContext, vm)
Expect(ok).To(BeFalse())
Expect(err).To(BeZero())
Expect(logBuffer.String()).To(ContainSubstring("The placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
Expand Down Expand Up @@ -1229,6 +1229,7 @@ var _ = Describe("ElfMachineReconciler", func() {
placementGroup.Vms = []*models.NestedVM{}
kcp.Spec.Replicas = pointer.Int32(3)
kcp.Status.Replicas = 3
kcp.Status.UpdatedReplicas = 3
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, kcp, elfMachine1, machine1, elfMachine2, machine2, elfMachine3, machine3)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
Expand All @@ -1248,6 +1249,7 @@ var _ = Describe("ElfMachineReconciler", func() {
hostID, err := reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(*hostID).To(Equal(""))
Expect(logBuffer.String()).To(ContainSubstring("The placement group still has capacity"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
Expand All @@ -1260,7 +1262,7 @@ var _ = Describe("ElfMachineReconciler", func() {
hostID, err = reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(hostID).To(BeNil())
Expect(logBuffer.String()).To(ContainSubstring("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts"))
})

It("when placement group is full and KCP rolling update in progress", func() {
Expand Down Expand Up @@ -1334,7 +1336,7 @@ var _ = Describe("ElfMachineReconciler", func() {
host, err = reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(host).To(BeNil())
Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
Expand All @@ -1348,7 +1350,7 @@ var _ = Describe("ElfMachineReconciler", func() {
host, err = reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(host).To(BeNil())
Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
Expand All @@ -1362,7 +1364,7 @@ var _ = Describe("ElfMachineReconciler", func() {
host, err = reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(host).To(BeNil())
Expect(logBuffer.String()).To(ContainSubstring("KCP is rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is in rolling update, the placement group is full and has unavailable hosts, wait for enough available hosts"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
Expand Down Expand Up @@ -1416,6 +1418,14 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).To(BeZero())
Expect(hostID).To(Equal(*vm3.Host.ID))
Expect(logBuffer.String()).To(ContainSubstring("Selected the host server for VM since the placement group is full"))

ctrlContext = newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, kcp)
machineContext = newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
hostID, err = reconciler.getVMHostForRollingUpdate(machineContext, placementGroup, service.NewHostsFromList(hosts))
Expect(err).To(BeZero())
Expect(hostID).To(Equal(""))
Expect(logBuffer.String()).To(ContainSubstring("Newest machine not found, skip selecting host for VM"))
})
})

Expand Down Expand Up @@ -1458,7 +1468,7 @@ var _ = Describe("ElfMachineReconciler", func() {
hostID, err := reconciler.preCheckPlacementGroup(machineContext)
Expect(err).To(BeZero())
Expect(hostID).To(BeNil())
Expect(logBuffer.String()).To(ContainSubstring("KCP is not in scaling down and it's not rolling update the 1st CP Machine, the placement group is full, wait for enough available hosts"))
Expect(logBuffer.String()).To(ContainSubstring("KCP is not in rolling update and it's not in scaling down, the placement group is full, wait for enough available hosts"))
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.WaitingForAvailableHostRequiredByPlacementGroupReason}})

elfMachine.Status.Conditions = nil
Expand Down
20 changes: 17 additions & 3 deletions pkg/service/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,35 @@ func (s Hosts) IsEmpty() bool {

func (s Hosts) String() string {
str := ""

for _, host := range s {
str += fmt.Sprintf("{id: %s,name: %s},", GetTowerString(host.ID), GetTowerString(host.Name))
state := ""
if host.HostState != nil {
state = string(*host.HostState.State)
}

str += fmt.Sprintf("{id: %s,name: %s,memory: %d,status: %s,state: %s},", GetTowerString(host.ID), GetTowerString(host.Name), GetTowerInt64(host.AllocatableMemoryBytes), string(*host.Status), state)
}

return fmt.Sprintf("[%s]", str)
}

// Available returns a Hosts with available hosts.
func (s Hosts) Available(memory int64) Hosts {
// FilterAvailableHostsWithEnoughMemory returns a Hosts containing the available host which has allocatable memory no less than the specified memory.
func (s Hosts) FilterAvailableHostsWithEnoughMemory(memory int64) Hosts {
return s.Filter(func(h *models.Host) bool {
ok, _ := IsAvailableHost(h, memory)
return ok
})
}

// FilterUnavailableHostsWithoutEnoughMemory returns a Hosts containing the unavailable host which has allocatable memory less than the specified memory.
func (s Hosts) FilterUnavailableHostsWithoutEnoughMemory(memory int64) Hosts {
return s.Filter(func(h *models.Host) bool {
ok, _ := IsAvailableHost(h, memory)
return !ok
})
}

// Get returns a Host of the specified host.
func (s Hosts) Get(hostID string) *models.Host {
if host, ok := s[hostID]; ok {
Expand Down
17 changes: 15 additions & 2 deletions pkg/service/collections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package service

import (
"fmt"
"testing"

"github.com/onsi/gomega"
Expand Down Expand Up @@ -47,12 +48,24 @@ func TestHostCollection(t *testing.T) {
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2"), AllocatableMemoryBytes: pointer.Int64(2), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)}

hosts := NewHosts()
g.Expect(hosts.Available(0).Len()).To(gomega.Equal(0))
g.Expect(hosts.FilterAvailableHostsWithEnoughMemory(0).Len()).To(gomega.Equal(0))

hosts = NewHostsFromList([]*models.Host{host1, host2})
availableHosts := hosts.Available(2)
availableHosts := hosts.FilterAvailableHostsWithEnoughMemory(2)
g.Expect(availableHosts.Len()).To(gomega.Equal(1))
g.Expect(availableHosts.Contains(*host2.ID)).To(gomega.BeTrue())

hosts = NewHosts()
unavailableHosts := hosts.FilterUnavailableHostsWithoutEnoughMemory(0)
g.Expect(unavailableHosts.IsEmpty()).To(gomega.BeTrue())
g.Expect(unavailableHosts.Len()).To(gomega.Equal(0))
g.Expect(unavailableHosts.String()).To(gomega.Equal("[]"))

hosts = NewHostsFromList([]*models.Host{host1, host2})
unavailableHosts = hosts.FilterUnavailableHostsWithoutEnoughMemory(2)
g.Expect(unavailableHosts.Len()).To(gomega.Equal(1))
g.Expect(unavailableHosts.Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(unavailableHosts.String()).To(gomega.Equal(fmt.Sprintf("[{id: %s,name: %s,memory: %d,status: %s,state: %s},]", *host1.ID, *host1.Name, *host1.AllocatableMemoryBytes, string(*host1.Status), "")))
})

t.Run("Difference", func(t *testing.T) {
Expand Down
Loading

0 comments on commit b2d4621

Please sign in to comment.