Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKS-3001 & SKS-2691: Support inplace-update and cpu and memory expansion #185

Merged
merged 7 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions api/v1beta1/conditions_consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ const (
// ResourcesHotUpdatedCondition documents the status of the hot updating resources of a VM.
ResourcesHotUpdatedCondition = "ResourceHotUpdated"

// WaitingForResourcesHotUpdateReason (Severity=Info) documents an ElfMachine waiting for updating resources.
WaitingForResourcesHotUpdateReason = "WaitingForResourcesHotUpdate"

// ExpandingVMDiskReason documents (Severity=Info) ElfMachine currently executing the expand disk operation.
ExpandingVMDiskReason = "ExpandingVMDisk"

Expand All @@ -144,6 +141,23 @@ const (
// detecting an error while adding new disk capacity to root directory; those kind of errors are
// usually transient and failed updating are automatically re-tried by the controller.
ExpandingRootPartitionFailedReason = "ExpandingRootPartitionFailed"

// ExpandingVMComputeResourcesReason documents (Severity=Info) ElfMachine currently executing the
// expand resources(CPU/memory) operation.
ExpandingVMComputeResourcesReason = "ExpandingVMComputeResources"

// ExpandingVMComputeResourcesFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// an error while expanding resources(CPU/memory); those kind of errors are usually transient and
// failed updating are automatically re-tried by the controller.
ExpandingVMComputeResourcesFailedReason = "ExpandingVMComputeResourcesFailed"

// RestartingKubeletReason documents (Severity=Info) ElfMachine currently executing the restart kubelet operation.
RestartingKubeletReason = "RestartingKubelet"

// RestartingKubeletFailedReason (Severity=Warning) documents an ElfMachine controller detecting
// an error while restarting kubelet; those kind of errors are usually transient and failed restarting
// are automatically re-tried by the controller.
RestartingKubeletFailedReason = "RestartingKubeletFailed"
)

// Conditions and Reasons related to make connections to a Tower. Can currently be used by ElfCluster and ElfMachine
Expand Down
9 changes: 9 additions & 0 deletions api/v1beta1/elfmachine_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"time"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capierrors "sigs.k8s.io/cluster-api/errors"
Expand Down Expand Up @@ -259,6 +260,14 @@ func (m *ElfMachine) IsHotUpdating() bool {
return false
}

// IsResourcesUpToDate returns whether the machine resources are up to date.
func (m *ElfMachine) IsResourcesUpToDate() bool {
specMemory := *resource.NewQuantity(m.Spec.MemoryMiB*1024*1024, resource.BinarySI)
return m.Spec.DiskGiB == m.Status.Resources.Disk &&
m.Spec.NumCPUs == m.Status.Resources.CPUCores &&
specMemory.Equal(m.Status.Resources.Memory)
}

func (m *ElfMachine) SetVMDisconnectionTimestamp(timestamp *metav1.Time) {
if m.Annotations == nil {
m.Annotations = make(map[string]string)
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1beta1

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

// CloneMode is the type of clone operation used to clone a VM from a template.
Expand Down Expand Up @@ -199,6 +200,10 @@ type GPUStatus struct {
// ResourcesStatus records the resources allocated to the virtual machine.
type ResourcesStatus struct {
Disk int32 `json:"disk,omitempty"`
// CPUCores is the total number of CPU cores allocated for the virtual machine.
CPUCores int32 `json:"cpu,omitempty"`
// Memory is the total number of memory in MiB allocated for the virtual machine.
Memory resource.Quantity `json:"memory,omitempty"`
}

//+kubebuilder:object:generate=false
Expand Down
3 changes: 2 additions & 1 deletion api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions config/crd/bases/infrastructure.cluster.x-k8s.io_elfmachines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,22 @@ spec:
resources:
description: Resources records the resources allocated for the machine.
properties:
cpu:
description: CPUCores is the total number of CPU cores allocated
for the virtual machine.
format: int32
type: integer
disk:
format: int32
type: integer
memory:
anyOf:
- type: integer
- type: string
description: Memory is the total number of memory in MiB allocated
for the virtual machine.
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
taskRef:
description: |-
Expand Down
26 changes: 0 additions & 26 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,29 +129,3 @@ rules:
- get
- patch
- update
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates/finalizers
verbs:
- update
- apiGroups:
- infrastructure.cluster.x-k8s.io
resources:
- elfmachinetemplates/status
verbs:
- get
- patch
- update
11 changes: 8 additions & 3 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (r *ElfMachineReconciler) Reconcile(ctx goctx.Context, req ctrl.Request) (r
// always update the readyCondition.
conditions.SetSummary(machineCtx.ElfMachine,
conditions.WithConditions(
infrav1.VMProvisionedCondition,
infrav1.ResourcesHotUpdatedCondition,
infrav1.VMProvisionedCondition,
infrav1.TowerAvailableCondition,
),
)
Expand Down Expand Up @@ -973,7 +973,7 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx goctx.Context, machineCtx *co
machineCtx.ElfMachine.SetVMFirstBootTimestamp(&now)
}

if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) {
releaseTicketForCreateVM(machineCtx.ElfMachine.Name)
recordElfClusterStorageInsufficient(machineCtx, false)
recordElfClusterMemoryInsufficient(machineCtx, false)
Expand Down Expand Up @@ -1024,7 +1024,12 @@ func (r *ElfMachineReconciler) reconcileVMFailedTask(ctx goctx.Context, machineC
case service.IsUpdateVMDiskTask(task, machineCtx.ElfMachine.Name):
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMDiskReason || reason == infrav1.ExpandingVMDiskFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityInfo, errorMessage)
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMDiskFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
}
case service.IsUpdateVMTask(task) && conditions.IsFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition):
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == infrav1.ExpandingVMComputeResourcesReason || reason == infrav1.ExpandingVMComputeResourcesFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, errorMessage)
}
case service.IsPowerOnVMTask(task) || service.IsUpdateVMTask(task) || service.IsVMColdMigrationTask(task):
if machineCtx.ElfMachine.RequiresGPUDevices() {
Expand Down
124 changes: 95 additions & 29 deletions controllers/elfmachine_controller_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/smartxworks/cloudtower-go-sdk/v2/models"
agentv1 "github.com/smartxworks/host-config-agent-api/api/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
capiremote "sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/conditions"
Expand All @@ -32,38 +33,26 @@ import (
"github.com/smartxworks/cluster-api-provider-elf/pkg/context"
"github.com/smartxworks/cluster-api-provider-elf/pkg/hostagent"
"github.com/smartxworks/cluster-api-provider-elf/pkg/service"
machineutil "github.com/smartxworks/cluster-api-provider-elf/pkg/util/machine"
)

func (r *ElfMachineReconciler) reconcileVMResources(ctx goctx.Context, machineCtx *context.MachineContext, vm *models.VM) (bool, error) {
log := ctrl.LoggerFrom(ctx)

hotUpdatedCondition := conditions.Get(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if hotUpdatedCondition != nil &&
hotUpdatedCondition.Reason == infrav1.WaitingForResourcesHotUpdateReason &&
hotUpdatedCondition.Message != "" {
log.Info("Waiting for hot updating resources", "message", hotUpdatedCondition.Message)

return false, nil
if ok, err := r.reconcileVMCPUAndMemory(ctx, machineCtx, vm); err != nil || !ok {
return ok, err
}

if ok, err := r.reconcieVMVolume(ctx, machineCtx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
if ok, err := r.restartKubelet(ctx, machineCtx); err != nil || !ok {
return ok, err
}

// Agent needs to wait for the node exists before it can run and execute commands.
if machineutil.IsUpdatingElfMachineResources(machineCtx.ElfMachine) &&
machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent expand vm root partition")

return false, nil
if ok, err := r.reconcieVMVolume(ctx, machineCtx, vm, infrav1.ResourcesHotUpdatedCondition); err != nil || !ok {
return ok, err
}

if ok, err := r.expandVMRootPartition(ctx, machineCtx); err != nil || !ok {
return ok, err
}

if machineutil.IsUpdatingElfMachineResources(machineCtx.ElfMachine) {
if conditions.IsFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition) {
conditions.MarkTrue(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
}

Expand Down Expand Up @@ -148,8 +137,6 @@ func (r *ElfMachineReconciler) resizeVMVolume(ctx goctx.Context, machineCtx *con

// expandVMRootPartition adds new disk capacity to root partition.
func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
log := ctrl.LoggerFrom(ctx)

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
Expand All @@ -164,35 +151,114 @@ func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineC
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionReason, clusterv1.ConditionSeverityInfo, "")
}

return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeExpandRootPartition)
}

// reconcileVMCPUAndMemory ensures that the vm CPU and memory are as expected.
func (r *ElfMachineReconciler) reconcileVMCPUAndMemory(ctx goctx.Context, machineCtx *context.MachineContext, vm *models.VM) (bool, error) {
machineCtx.ElfMachine.Status.Resources.CPUCores = *vm.Vcpu
machineCtx.ElfMachine.Status.Resources.Memory = *resource.NewQuantity(service.ByteToMiB(*vm.Memory)*1024*1024, resource.BinarySI)

if !(machineCtx.ElfMachine.Spec.NumCPUs > *vm.Vcpu ||
machineCtx.ElfMachine.Spec.MemoryMiB > service.ByteToMiB(*vm.Memory)) {
return true, nil
}

reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" ||
(reason != infrav1.ExpandingVMComputeResourcesReason && reason != infrav1.ExpandingVMComputeResourcesFailedReason) {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesReason, clusterv1.ConditionSeverityInfo, "")

// Save the condition first, and then expand the resources capacity.
// This prevents the resources expansion from succeeding but failing to save the
// condition, causing ElfMachine to not record the condition.
return false, nil
}

log := ctrl.LoggerFrom(ctx)

if ok := acquireTicketForUpdatingVM(machineCtx.ElfMachine.Name); !ok {
log.V(1).Info(fmt.Sprintf("The VM operation reaches rate limit, skip updating VM %s CPU and memory", machineCtx.ElfMachine.Status.VMRef))

return false, nil
}

withTaskVM, err := machineCtx.VMService.UpdateVM(vm, machineCtx.ElfMachine)
if err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingVMComputeResourcesFailedReason, clusterv1.ConditionSeverityWarning, err.Error())

return false, errors.Wrapf(err, "failed to trigger update CPU and memory for VM %s", *vm.Name)
}

machineCtx.ElfMachine.SetTask(*withTaskVM.TaskID)

log.Info("Waiting for the VM to be updated CPU and memory", "vmRef", machineCtx.ElfMachine.Status.VMRef, "taskRef", machineCtx.ElfMachine.Status.TaskRef)

return false, nil
}

func (r *ElfMachineReconciler) restartKubelet(ctx goctx.Context, machineCtx *context.MachineContext) (bool, error) {
reason := conditions.GetReason(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition)
if reason == "" {
return true, nil
} else if reason != infrav1.ExpandingVMComputeResourcesReason &&
reason != infrav1.ExpandingVMComputeResourcesFailedReason &&
reason != infrav1.RestartingKubeletReason &&
reason != infrav1.RestartingKubeletFailedReason {
return true, nil
}

if reason != infrav1.RestartingKubeletFailedReason {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletReason, clusterv1.ConditionSeverityInfo, "")
}

return r.reconcileHostJob(ctx, machineCtx, hostagent.HostAgentJobTypeRestartKubelet)
}

func (r *ElfMachineReconciler) reconcileHostJob(ctx goctx.Context, machineCtx *context.MachineContext, jobType hostagent.HostAgentJobType) (bool, error) {
log := ctrl.LoggerFrom(ctx)

// Agent needs to wait for the node exists before it can run and execute commands.
if machineCtx.Machine.Status.NodeInfo == nil {
log.Info("Waiting for node exists for host agent job", "jobType", jobType)

return false, nil
}

kubeClient, err := capiremote.NewClusterClient(ctx, "", r.Client, client.ObjectKey{Namespace: machineCtx.Cluster.Namespace, Name: machineCtx.Cluster.Name})
haijianyang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}

agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetExpandRootPartitionJobName(machineCtx.ElfMachine))
agentJob, err := hostagent.GetHostJob(ctx, kubeClient, machineCtx.ElfMachine.Namespace, hostagent.GetJobName(machineCtx.ElfMachine, jobType))
if err != nil && !apierrors.IsNotFound(err) {
return false, err
}

if agentJob == nil {
agentJob, err = hostagent.ExpandRootPartition(ctx, kubeClient, machineCtx.ElfMachine)
if err != nil {
agentJob = hostagent.GenerateJob(machineCtx.ElfMachine, jobType)
if err = kubeClient.Create(ctx, agentJob); err != nil {
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityInfo, err.Error())

return false, err
}

log.Info("Waiting for expanding root partition", "hostAgentJob", agentJob.Name)
log.Info("Waiting for job to complete", "hostAgentJob", agentJob.Name)

return false, nil
}

switch agentJob.Status.Phase {
case agentv1.PhaseSucceeded:
log.Info("Expand root partition to root succeeded", "hostAgentJob", agentJob.Name)
log.Info("HostJob succeeded", "hostAgentJob", agentJob.Name)
case agentv1.PhaseFailed:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
log.Info("Expand root partition failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)
switch jobType {
case hostagent.HostAgentJobTypeExpandRootPartition:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.ExpandingRootPartitionFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
case hostagent.HostAgentJobTypeRestartKubelet:
conditions.MarkFalse(machineCtx.ElfMachine, infrav1.ResourcesHotUpdatedCondition, infrav1.RestartingKubeletFailedReason, clusterv1.ConditionSeverityWarning, agentJob.Status.FailureMessage)
}
log.Info("HostJob failed, will try again after three minutes", "hostAgentJob", agentJob.Name, "failureMessage", agentJob.Status.FailureMessage)

lastExecutionTime := agentJob.Status.LastExecutionTime
if lastExecutionTime == nil {
Expand All @@ -201,13 +267,13 @@ func (r *ElfMachineReconciler) expandVMRootPartition(ctx goctx.Context, machineC
// Three minutes after the job fails, delete the job and try again.
if time.Now().After(lastExecutionTime.Add(3 * time.Minute)) {
if err := kubeClient.Delete(ctx, agentJob); err != nil {
return false, errors.Wrapf(err, "failed to delete expand root partition job %s/%s for retry", agentJob.Namespace, agentJob.Name)
return false, errors.Wrapf(err, "failed to delete hostJob %s/%s for retry", agentJob.Namespace, agentJob.Name)
}
}

return false, nil
default:
log.Info("Waiting for expanding root partition job done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)
log.Info("Waiting for HostJob done", "hostAgentJob", agentJob.Name, "jobStatus", agentJob.Status.Phase)

return false, nil
}
Expand Down
Loading
Loading