Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKS-2157: Cache placement group to reduce Tower API requests #161

Merged
merged 3 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,13 @@ func (r *ElfClusterReconciler) reconcileDelete(ctx *context.ClusterContext) (rec

func (r *ElfClusterReconciler) reconcileDeleteVMPlacementGroups(ctx *context.ClusterContext) (bool, error) {
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(ctx.Cluster)
if pgCount, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
if pgNames, err := ctx.VMService.DeleteVMPlacementGroupsByNamePrefix(ctx, placementGroupPrefix); err != nil {
return false, err
} else if pgCount > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", pgCount)
} else if len(pgNames) > 0 {
ctx.Logger.Info(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix), "count", len(pgNames))

// Delete placement group caches.
delPGCaches(pgNames)

return false, nil
} else {
Expand Down
9 changes: 6 additions & 3 deletions controllers/elfcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,26 @@ var _ = Describe("ElfClusterReconciler", func() {
reconciler := &ElfClusterReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
elfClusterKey := capiutil.ObjectKey(elfCluster)

mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(0, errors.New("some error"))
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), towerresources.GetVMPlacementGroupNamePrefix(cluster)).Return(nil, errors.New("some error"))

result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).To(BeZero())
Expect(err).To(HaveOccurred())

task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED)
logBuffer.Reset()
pg := fake.NewVMPlacementGroup(nil)
setPGCache(pg)
placementGroupPrefix := towerresources.GetVMPlacementGroupNamePrefix(cluster)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(1, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return([]string{*pg.Name}, nil)
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfClusterKey})
Expect(result).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Waiting for the placement groups with name prefix %s to be deleted", placementGroupPrefix)))
Expect(getPGFromCache(*pg.Name)).To(BeNil())

logBuffer.Reset()
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(0, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupsByNamePrefix(gomock.Any(), placementGroupPrefix).Return(nil, nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelClusterName(), elfCluster.Name, true).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelVIP(), elfCluster.Spec.ControlPlaneEndpoint.Host, false).Return("labelid", nil)
mockVMService.EXPECT().DeleteLabel(towerresources.GetVMLabelNamespace(), elfCluster.Namespace, true).Return("", nil)
Expand Down
18 changes: 17 additions & 1 deletion controllers/elfmachine_controller_placement_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (r *ElfMachineReconciler) createPlacementGroup(ctx *context.MachineContext,

ctx.Logger.Info("Creating placement group succeeded", "taskID", *task.ID, "placementGroup", placementGroupName)

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
placementGroup, err := r.getPlacementGroup(ctx, placementGroupName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -345,7 +345,14 @@ func (r *ElfMachineReconciler) getAvailableHostsForVM(ctx *context.MachineContex
return availableHosts
}

// getPlacementGroup returns the specified placement group.
// getPlacementGroup will get the placement group from the cache first.
// If the placement group does not exist in the cache, it will be fetched from Tower and saved to the cache(expiration time is 10s).
func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, placementGroupName string) (*models.VMPlacementGroup, error) {
if placementGroup := getPGFromCache(placementGroupName); placementGroup != nil {
return placementGroup, nil
}

placementGroup, err := ctx.VMService.GetVMPlacementGroup(placementGroupName)
if err != nil {
return nil, err
Expand All @@ -358,6 +365,9 @@ func (r *ElfMachineReconciler) getPlacementGroup(ctx *context.MachineContext, pl
return nil, nil
}

// Save placement group cache.
setPGCache(placementGroup)

return placementGroup, nil
}

Expand Down Expand Up @@ -563,6 +573,9 @@ func (r *ElfMachineReconciler) addVMsToPlacementGroup(ctx *context.MachineContex
return err
}

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})

taskID := *task.ID
task, err = ctx.VMService.WaitTask(ctx, taskID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval)
if err != nil {
Expand Down Expand Up @@ -638,6 +651,9 @@ func (r *ElfMachineReconciler) deletePlacementGroup(ctx *context.MachineContext)
return false, nil
} else {
ctx.Logger.Info(fmt.Sprintf("Placement group %s deleted", *placementGroup.Name))

// Delete placement group cache.
delPGCaches([]string{*placementGroup.Name})
}

return true, nil
Expand Down
43 changes: 36 additions & 7 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ var _ = Describe("ElfMachineReconciler", func() {
})

It("should create a new VM if none exists", func() {
resetVMTaskErrorCache()
resetMemoryCache()
vm := fake.NewTowerVM()
vm.Name = &elfMachine.Name
elfCluster.Spec.Cluster = clusterKey
Expand Down Expand Up @@ -297,7 +297,7 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(reconciler.Client.Get(reconciler, elfMachineKey, elfMachine)).To(Succeed())
Expect(elfMachine.Status.VMRef).To(Equal(*vm.ID))
Expect(elfMachine.Status.TaskRef).To(Equal(*task.ID))
resetVMTaskErrorCache()
resetMemoryCache()
})

It("should recover from lost task", func() {
Expand Down Expand Up @@ -822,7 +822,7 @@ var _ = Describe("ElfMachineReconciler", func() {

Context("powerOnVM", func() {
It("should", func() {
resetVMTaskErrorCache()
resetMemoryCache()
vm := fake.NewTowerVM()
vm.Host = &models.NestedHost{ID: service.TowerString(fake.ID())}
elfMachine.Status.VMRef = *vm.LocalID
Expand All @@ -844,7 +844,7 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("and the retry silence period passes, will try to power on the VM again"))
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.PoweringOnReason}})
resetVMTaskErrorCache()
resetMemoryCache()

// GPU
unexpectedError := errors.New("unexpected error")
Expand Down Expand Up @@ -1136,12 +1136,14 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().FindByIDs([]string{*vm2.ID}).Return([]*models.VM{vm2}, nil)
mockVMService.EXPECT().AddVMsToPlacementGroup(placementGroup, gomock.Any()).Return(task, nil)
mockVMService.EXPECT().WaitTask(gomock.Any(), *task.ID, config.WaitTaskTimeoutForPlacementGroupOperation, config.WaitTaskInterval).Return(task, nil)
setPGCache(placementGroup)

reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err := reconciler.joinPlacementGroup(machineContext, vm)
Expect(ok).To(BeTrue())
Expect(err).To(BeZero())
Expect(logBuffer.String()).To(ContainSubstring("Updating placement group succeeded"))
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
})

It("should not migrate VM when VM is running and KCP is in rolling update", func() {
Expand Down Expand Up @@ -2670,10 +2672,12 @@ var _ = Describe("ElfMachineReconciler", func() {
mockVMService.EXPECT().GetVMPlacementGroup(placementGroupName).Return(placementGroup, nil)
mockVMService.EXPECT().DeleteVMPlacementGroupByID(gomock.Any(), *placementGroup.ID).Return(true, nil)

setPGCache(placementGroup)
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err = reconciler.deletePlacementGroup(machineContext)
Expect(ok).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())

md.DeletionTimestamp = nil
md.Spec.Replicas = pointer.Int32(0)
Expand Down Expand Up @@ -2952,7 +2956,7 @@ var _ = Describe("ElfMachineReconciler", func() {

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
resetVMTaskErrorCache()
resetMemoryCache()
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(nil, errors.New(service.VMPlacementGroupNotFound))
mockVMService.EXPECT().GetCluster(elfCluster.Spec.Cluster).Return(towerCluster, nil)
mockVMService.EXPECT().CreateVMPlacementGroup(gomock.Any(), *towerCluster.ID, towerresources.GetVMPlacementGroupPolicy(machine)).Return(withTaskVMPlacementGroup, nil)
Expand All @@ -2966,7 +2970,7 @@ var _ = Describe("ElfMachineReconciler", func() {

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
resetVMTaskErrorCache()
resetMemoryCache()
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(nil, errors.New(service.VMPlacementGroupNotFound))
mockVMService.EXPECT().GetCluster(elfCluster.Spec.Cluster).Return(towerCluster, nil)
mockVMService.EXPECT().CreateVMPlacementGroup(gomock.Any(), *towerCluster.ID, towerresources.GetVMPlacementGroupPolicy(machine)).Return(withTaskVMPlacementGroup, nil)
Expand All @@ -2990,6 +2994,31 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring(fmt.Sprintf("Tower has duplicate placement group, skip creating placement group %s", placementGroupName)))
})

It("should save and get placement group cache", func() {
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctrlContext.Client, machine, cluster)
Expect(err).NotTo(HaveOccurred())
placementGroup := fake.NewVMPlacementGroup(nil)
placementGroup.Name = service.TowerString(placementGroupName)

mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
Expect(getPGFromCache(*placementGroup.Name)).To(BeNil())
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err := reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))

// Use cache
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
pg, err = reconciler.getPlacementGroup(machineContext, placementGroupName)
Expect(err).To(BeZero())
Expect(pg).To(Equal(placementGroup))
Expect(getPGFromCache(*placementGroup.Name)).To(Equal(placementGroup))
})
})

Context("Reconcile static IP allocation", func() {
Expand Down Expand Up @@ -3123,7 +3152,7 @@ var _ = Describe("ElfMachineReconciler", func() {
It("should handle failed/succeeded task", func() {
elfMachine.Spec.GPUDevices = []infrav1.GPUPassthroughDeviceSpec{{Model: "A16", Count: 1}}

resetVMTaskErrorCache()
resetMemoryCache()
task := fake.NewTowerTask()
task.Status = models.NewTaskStatus(models.TaskStatusFAILED)
elfMachine.Status.TaskRef = *task.ID
Expand Down
52 changes: 43 additions & 9 deletions controllers/tower_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@
func recordElfClusterMemoryInsufficient(ctx *context.MachineContext, isInsufficient bool) {
key := getKeyForInsufficientMemoryError(ctx.ElfCluster.Spec.Cluster)
if isInsufficient {
vmTaskErrorCache.Set(key, newClusterResource(), resourceDuration)
memoryCache.Set(key, newClusterResource(), resourceDuration)
} else {
vmTaskErrorCache.Delete(key)
memoryCache.Delete(key)
}
}

Expand All @@ -94,9 +94,9 @@

key := getKeyForDuplicatePlacementGroupError(placementGroupName)
if isPGPolicyNotSatisfied {
vmTaskErrorCache.Set(key, newClusterResource(), resourceDuration)
memoryCache.Set(key, newClusterResource(), resourceDuration)
} else {
vmTaskErrorCache.Delete(key)
memoryCache.Delete(key)
}

return nil
Expand Down Expand Up @@ -146,13 +146,13 @@
}

func getClusterResource(key string) *clusterResource {
if val, found := vmTaskErrorCache.Get(key); found {
if val, found := memoryCache.Get(key); found {
if resource, ok := val.(*clusterResource); ok {
return resource
}

// Delete unexpected data.
vmTaskErrorCache.Delete(key)
memoryCache.Delete(key)

Check warning on line 155 in controllers/tower_cache.go

View check run for this annotation

Codecov / codecov/patch

controllers/tower_cache.go#L155

Added line #L155 was not covered by tests
}

return nil
Expand All @@ -166,6 +166,40 @@
return fmt.Sprintf("pg:duplicate:%s", placementGroup)
}

// pgCacheDuration is the lifespan of placement group cache.
const pgCacheDuration = 20 * time.Second

func getKeyForPGCache(pgName string) string {
return fmt.Sprintf("pg:%s:cache", pgName)
}

// setPGCache saves the specified placement group to the memory,
// which can reduce access to the Tower service.
func setPGCache(pg *models.VMPlacementGroup) {
memoryCache.Set(getKeyForPGCache(*pg.Name), *pg, gpuCacheDuration)
}

// delPGCaches deletes the specified placement group caches.
func delPGCaches(pgNames []string) {
for i := 0; i < len(pgNames); i++ {
memoryCache.Delete(getKeyForPGCache(pgNames[i]))
}
}

// getPGFromCache gets the specified placement group from the memory.
func getPGFromCache(pgName string) *models.VMPlacementGroup {
key := getKeyForPGCache(pgName)
if val, found := memoryCache.Get(key); found {
if pg, ok := val.(models.VMPlacementGroup); ok {
return &pg
}
// Delete unexpected data.
memoryCache.Delete(key)

Check warning on line 197 in controllers/tower_cache.go

View check run for this annotation

Codecov / codecov/patch

controllers/tower_cache.go#L197

Added line #L197 was not covered by tests
}

return nil
}

/* GPU */

// gpuCacheDuration is the lifespan of gpu cache.
Expand All @@ -179,7 +213,7 @@
// which can reduce access to the Tower service.
func setGPUVMInfosCache(gpuVMInfos service.GPUVMInfos) {
gpuVMInfos.Iterate(func(g *models.GpuVMInfo) {
vmTaskErrorCache.Set(getKeyForGPUVMInfo(*g.ID), *g, gpuCacheDuration)
memoryCache.Set(getKeyForGPUVMInfo(*g.ID), *g, gpuCacheDuration)
})
}

Expand All @@ -188,12 +222,12 @@
gpuVMInfos := service.NewGPUVMInfos()
for i := 0; i < len(gpuIDs); i++ {
key := getKeyForGPUVMInfo(gpuIDs[i])
if val, found := vmTaskErrorCache.Get(key); found {
if val, found := memoryCache.Get(key); found {
if gpuVMInfo, ok := val.(models.GpuVMInfo); ok {
gpuVMInfos.Insert(&gpuVMInfo)
}
// Delete unexpected data.
vmTaskErrorCache.Delete(key)
memoryCache.Delete(key)
}
}

Expand Down
Loading