Skip to content

Commit

Permalink
Refactor available hosts and vm migration code
Browse files Browse the repository at this point in the history
  • Loading branch information
haijianyang committed Jul 26, 2023
1 parent ac6b074 commit 8037ba1
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 347 deletions.
15 changes: 1 addition & 14 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,12 +842,6 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
switch {
case service.IsCloneVMTask(task):
releaseTicketForCreateVM(ctx.ElfMachine.Name)
case service.IsVMMigrationTask(task):
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, err
}
releaseTicketForPlacementGroupVMMigration(placementGroupName)
case service.IsMemoryInsufficientError(errorMessage):
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, true)
message := fmt.Sprintf("Insufficient memory detected for ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
Expand All @@ -858,16 +852,9 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
case models.TaskStatusSUCCESSED:
ctx.Logger.Info("VM task succeeded", "vmRef", vmRef, "taskRef", taskRef, "taskDescription", service.GetTowerString(task.Description))

switch {
case service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task):
if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, false)
releaseTicketForCreateVM(ctx.ElfMachine.Name)
case service.IsVMMigrationTask(task):
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, err
}
releaseTicketForPlacementGroupVMMigration(placementGroupName)
}
default:
ctx.Logger.Info("Waiting for VM task done", "vmRef", vmRef, "taskRef", taskRef, "taskStatus", service.GetTowerTaskStatus(task.Status), "taskDescription", service.GetTowerString(task.Description))
Expand Down
187 changes: 82 additions & 105 deletions controllers/elfmachine_controller_placement_group.go

Large diffs are not rendered by default.

121 changes: 49 additions & 72 deletions controllers/elfmachine_controller_test.go

Large diffs are not rendered by default.

28 changes: 0 additions & 28 deletions controllers/vm_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ import (
const (
creationTimeout = time.Minute * 6
vmOperationRateLimit = time.Second * 6
vmMigrationTimeout = time.Minute * 20
placementGroupSilenceTime = time.Minute * 30
placementGroupCreationLockKey = "%s:creation"
)

var vmStatusMap = make(map[string]time.Time)
var limiterLock sync.Mutex
var vmOperationMap = make(map[string]time.Time)
var placementGroupVMMigrationMap = make(map[string]time.Time)
var vmOperationLock sync.Mutex

var placementGroupOperationMap = make(map[string]time.Time)
Expand Down Expand Up @@ -91,32 +89,6 @@ func acquireTicketForUpdatingVM(vmName string) bool {
return true
}

// acquireTicketForPlacementGroupVMMigration returns whether virtual machine migration
// of placement group operation can be performed.
func acquireTicketForPlacementGroupVMMigration(groupName string) bool {
vmOperationLock.Lock()
defer vmOperationLock.Unlock()

if status, ok := placementGroupVMMigrationMap[groupName]; ok {
if !time.Now().After(status.Add(vmMigrationTimeout)) {
return false
}
}

placementGroupVMMigrationMap[groupName] = time.Now()

return true
}

// releaseTicketForPlacementGroupVMMigration releases the virtual machine migration
// of placement group being operated.
func releaseTicketForPlacementGroupVMMigration(groupName string) {
vmOperationLock.Lock()
defer vmOperationLock.Unlock()

delete(placementGroupVMMigrationMap, groupName)
}

// acquireTicketForPlacementGroupOperation returns whether placement group operation
// can be performed.
func acquireTicketForPlacementGroupOperation(groupName string) bool {
Expand Down
21 changes: 0 additions & 21 deletions controllers/vm_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,6 @@ var _ = Describe("VM Operation Limiter", func() {
})
})

var _ = Describe("Placement Group VM Migration Limiter", func() {
var groupName string

BeforeEach(func() {
groupName = fake.UUID()
})

It("acquireTicketForPlacementGroupVMMigration", func() {
Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeTrue())
Expect(placementGroupVMMigrationMap).To(HaveKey(groupName))

Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeFalse())
releaseTicketForPlacementGroupVMMigration(groupName)
Expect(placementGroupVMMigrationMap).NotTo(HaveKey(groupName))

placementGroupVMMigrationMap[groupName] = time.Now().Add(-vmMigrationTimeout)
Expect(acquireTicketForPlacementGroupVMMigration(groupName)).To(BeTrue())
Expect(placementGroupVMMigrationMap).To(HaveKey(groupName))
})
})

var _ = Describe("Placement Group Operation Limiter", func() {
var groupName string

Expand Down
147 changes: 147 additions & 0 deletions pkg/service/collections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"fmt"

"github.com/smartxworks/cloudtower-go-sdk/v2/models"
"k8s.io/apimachinery/pkg/util/sets"
)

// Hosts is a set of hosts.
type Hosts map[string]*models.Host

// NewHosts creates a Hosts. from a list of values.
func NewHosts(hosts ...*models.Host) Hosts {
ss := make(Hosts, len(hosts))
ss.Insert(hosts...)
return ss
}

// NewHostsFromList creates a Hosts from the given host slice.
func NewHostsFromList(hosts []*models.Host) Hosts {
ss := make(Hosts, len(hosts))
for i := range hosts {
ss.Insert(hosts[i])
}
return ss
}

func (s Hosts) Insert(hosts ...*models.Host) {
for i := range hosts {
if hosts[i] != nil {
h := hosts[i]
s[*h.ID] = h
}
}
}

func (s Hosts) Contains(hostID string) bool {
_, ok := s[hostID]
return ok
}

// Len returns the size of the set.
func (s Hosts) Len() int {
return len(s)
}

func (s Hosts) IsEmpty() bool {
return len(s) == 0
}

func (s Hosts) String() string {
str := ""
for _, host := range s {
str += fmt.Sprintf("{id: %s,name: %s},", GetTowerString(host.ID), GetTowerString(host.Name))
}

return fmt.Sprintf("[%s]", str)
}

// Available returns a Hosts with available hosts.
func (s Hosts) Available(memory int64) Hosts {
return s.Filter(func(h *models.Host) bool {
ok, _ := IsAvailableHost(h, memory)
return ok
})
}

// Get returns a Host of the specified host.
func (s Hosts) Get(hostID string) *models.Host {
if host, ok := s[hostID]; ok {
return host
}
return nil
}

// Find returns a Hosts of the specified hosts.
func (s Hosts) Find(targetHosts sets.Set[string]) Hosts {
return s.Filter(func(h *models.Host) bool {
return targetHosts.Has(*h.ID)
})
}

// UnsortedList returns the slice with contents in random order.
func (s Hosts) UnsortedList() []*models.Host {
res := make([]*models.Host, 0, len(s))
for _, value := range s {
res = append(res, value)
}
return res
}

// Difference returns a copy without hosts that are in the given collection.
func (s Hosts) Difference(hosts Hosts) Hosts {
return s.Filter(func(h *models.Host) bool {
_, found := hosts[*h.ID]
return !found
})
}

// newFilteredHostCollection creates a Hosts from a filtered list of values.
func newFilteredHostCollection(filter Func, hosts ...*models.Host) Hosts {
ss := make(Hosts, len(hosts))
for i := range hosts {
h := hosts[i]
if filter(h) {
ss.Insert(h)
}
}
return ss
}

// Filter returns a Hosts containing only the Hosts that match all of the given HostFilters.
func (s Hosts) Filter(filters ...Func) Hosts {
return newFilteredHostCollection(And(filters...), s.UnsortedList()...)
}

// Func is the functon definition for a filter.
type Func func(host *models.Host) bool

// And returns a filter that returns true if all of the given filters returns true.
func And(filters ...Func) Func {
return func(host *models.Host) bool {
for _, f := range filters {
if !f(host) {
return false
}
}
return true
}
}
69 changes: 69 additions & 0 deletions pkg/service/collections_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"testing"

"github.com/onsi/gomega"
"github.com/smartxworks/cloudtower-go-sdk/v2/models"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
)

func TestHostCollection(t *testing.T) {
g := gomega.NewGomegaWithT(t)

t.Run("Find", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1")}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2")}

hosts := NewHosts()
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Len()).To(gomega.Equal(0))

hosts = NewHostsFromList([]*models.Host{host1, host2})
g.Expect(hosts.Get(*host1.ID)).To(gomega.Equal(host1))
g.Expect(hosts.Get(*TowerString("404"))).To(gomega.BeNil())
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(hosts.Find(sets.Set[string]{}.Insert(*host1.ID)).Len()).To(gomega.Equal(1))
})

t.Run("Available", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1"), AllocatableMemoryBytes: pointer.Int64(1), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2"), AllocatableMemoryBytes: pointer.Int64(2), Status: models.NewHostStatus(models.HostStatusCONNECTEDHEALTHY)}

hosts := NewHosts()
g.Expect(hosts.Available(0).Len()).To(gomega.Equal(0))

hosts = NewHostsFromList([]*models.Host{host1, host2})
availableHosts := hosts.Available(2)
g.Expect(availableHosts.Len()).To(gomega.Equal(1))
g.Expect(availableHosts.Contains(*host2.ID)).To(gomega.BeTrue())
})

t.Run("Difference", func(t *testing.T) {
host1 := &models.Host{ID: TowerString("1"), Name: TowerString("host1")}
host2 := &models.Host{ID: TowerString("2"), Name: TowerString("host2")}

g.Expect(NewHosts().Difference(NewHosts()).Len()).To(gomega.Equal(0))
g.Expect(NewHosts().Difference(NewHosts(host1)).Len()).To(gomega.Equal(0))
g.Expect(NewHosts(host1).Difference(NewHosts(host1)).Len()).To(gomega.Equal(0))
g.Expect(NewHosts(host1).Difference(NewHosts()).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(NewHosts(host1).Difference(NewHosts(host2)).Contains(*host1.ID)).To(gomega.BeTrue())
g.Expect(NewHosts(host1, host2).Difference(NewHosts(host2)).Contains(*host1.ID)).To(gomega.BeTrue())
})
}
15 changes: 8 additions & 7 deletions pkg/service/mock_services/vm_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8037ba1

Please sign in to comment.