From 935c4faeb435c00648dfb3aba539be2eba625d3c Mon Sep 17 00:00:00 2001 From: Misha Sakhnov Date: Fri, 4 Oct 2024 14:03:40 +0200 Subject: [PATCH] neonvm: sysfs cpu state based scaling support introduce separate CPU scaling flow based on the vmSpec.cpuScalingMode If vmSpec.cpuScalingMode is equal to `qmp_scaling` the logic of the scaling is preserved as before: - Scale, if required the amount of CPUs using qmp commands. - If it is required to scale cgroups, call vm-runner /cpu_change endpoint if vmSpec.cpuScalingMode is equal to `cpuSysfsStateScaling` all cpu scaling requests go directly to the vm-runner /cpu_change, which in that configuration goes to the neonvm-daemon to reconcile required amount of online CPUs. Value `cpuSysfsStateScaling` also modifies the qemu and the kernel arguments to enable plug all CPUs but mark as online only first one. Signed-off-by: Misha Sakhnov --- Makefile | 7 +- neonvm-controller/cmd/main.go | 7 +- neonvm-controller/deployment.yaml | 1 - neonvm-daemon/cmd/main.go | 55 +++----- neonvm-runner/cmd/main.go | 74 ++++++++--- pkg/api/types.go | 5 + pkg/neonvm/controllers/config.go | 5 - pkg/neonvm/controllers/vm_controller.go | 107 ++++----------- .../vm_controller_handle_cpu_scaling.go | 124 ++++++++++++++++++ pkg/neonvm/controllers/vm_qmp_test.go | 12 +- .../neonvm/cpuscaling/sysfsscaling.go | 10 +- .../00-assert.yaml | 17 +++ .../00-create-vm.yaml | 102 ++++++++++++++ .../01-assert.yaml | 23 ++++ .../01-upscale.yaml | 49 +++++++ .../02-assert.yaml | 16 +++ .../02-downscale.yaml | 7 + 17 files changed, 467 insertions(+), 154 deletions(-) create mode 100644 pkg/neonvm/controllers/vm_controller_handle_cpu_scaling.go rename neonvm-daemon/pkg/cpuscaling/online_offline_cpu.go => pkg/neonvm/cpuscaling/sysfsscaling.go (92%) create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-assert.yaml create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-create-vm.yaml create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-assert.yaml create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-upscale.yaml create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-assert.yaml create mode 100644 tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-downscale.yaml diff --git a/Makefile b/Makefile index 0612541fc..392baeaab 100644 --- a/Makefile +++ b/Makefile @@ -133,9 +133,10 @@ test: fmt vet envtest ## Run tests. .PHONY: build build: fmt vet bin/vm-builder ## Build all neonvm binaries. - GOOS=linux go build -o bin/controller neonvm/main.go - GOOS=linux go build -o bin/vxlan-controller neonvm/tools/vxlan/controller/main.go - GOOS=linux go build -o bin/runner neonvm/runner/*.go + GOOS=linux go build -o bin/controller neonvm-controller/cmd/main.go + GOOS=linux go build -o bin/vxlan-controller neonvm-vxlan-controller/cmd/main.go + GOOS=linux go build -o bin/daemon neonvm-daemon/main.go + GOOS=linux go build -o bin/runner neonvm-runner/cmd/main.go .PHONY: bin/vm-builder bin/vm-builder: ## Build vm-builder binary. diff --git a/neonvm-controller/cmd/main.go b/neonvm-controller/cmd/main.go index 4dfbf45e9..c6acb6cf6 100644 --- a/neonvm-controller/cmd/main.go +++ b/neonvm-controller/cmd/main.go @@ -42,6 +42,7 @@ import ( "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -95,7 +96,7 @@ func main() { var concurrencyLimit int var skipUpdateValidationFor map[types.NamespacedName]struct{} var disableRunnerCgroup bool - var useOnlineOfflining bool + var useCpuSysfsStateScaling bool var qemuDiskCacheSettings string var defaultMemoryProvider vmv1.MemoryProvider var memhpAutoMovableRatio string @@ -106,7 +107,6 @@ func main() { flag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") - flag.BoolVar(&useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling") flag.IntVar(&concurrencyLimit, "concurrency-limit", 1, "Maximum number of concurrent reconcile operations") flag.Func( "skip-update-validation-for", @@ -134,6 +134,7 @@ func main() { return nil }, ) + flag.BoolVar(&useCpuSysfsStateScaling, "use-cpu-sysfs-state-scaling", false, "Use sysfs cpu state scaling for CPU scaling") flag.BoolVar(&disableRunnerCgroup, "disable-runner-cgroup", false, "Disable creation of a cgroup in neonvm-runner for fractional CPU limiting") flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings") flag.Func("default-memory-provider", "Set default memory provider to use for new VMs", defaultMemoryProvider.FlagFunc) @@ -189,7 +190,7 @@ func main() { reconcilerMetrics := controllers.MakeReconcilerMetrics() rc := &controllers.ReconcilerConfig{ - UseOnlineOfflining: useOnlineOfflining, + DisableRunnerCgroup: disableRunnerCgroup, MaxConcurrentReconciles: concurrencyLimit, SkipUpdateValidationFor: skipUpdateValidationFor, QEMUDiskCacheSettings: qemuDiskCacheSettings, diff --git a/neonvm-controller/deployment.yaml b/neonvm-controller/deployment.yaml index 4da8307cc..13470eab1 100644 --- a/neonvm-controller/deployment.yaml +++ b/neonvm-controller/deployment.yaml @@ -55,7 +55,6 @@ spec: - "--concurrency-limit=128" - "--skip-update-validation-for=" - "--disable-runner-cgroup" - - "--use-online-offlining" # See #775 and its links. # * cache.writeback=on - don't set O_DSYNC (don't flush every write) # * cache.direct=on - use O_DIRECT (don't abuse host's page cache!) diff --git a/neonvm-daemon/cmd/main.go b/neonvm-daemon/cmd/main.go index 8b3be36d3..10aeaba2d 100644 --- a/neonvm-daemon/cmd/main.go +++ b/neonvm-daemon/cmd/main.go @@ -4,15 +4,16 @@ import ( "flag" "fmt" "io" - "math" "net/http" "os" "strconv" "sync" "time" - "github.com/neondatabase/autoscaling/neonvm/daemon/pkg/cpuscaling" "go.uber.org/zap" + + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling" ) func main() { @@ -31,8 +32,9 @@ func main() { defer logger.Sync() //nolint:errcheck // what are we gonna do, log something about it? logger.Info("Starting neonvm-daemon", zap.String("addr", *addr)) - cpuHotPlugController := &cpuscaling.CPUOnlineOffliner{} + cpuHotPlugController := &cpuscaling.CPUSysFsStateScaler{} srv := cpuServer{ + cpuOperationsMutex: &sync.Mutex{}, cpuSystemWideScaler: cpuHotPlugController, logger: logger.Named("cpu-srv"), } @@ -43,47 +45,29 @@ type cpuServer struct { // Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently // and ensure that status response is always actual cpuOperationsMutex *sync.Mutex - cpuSystemWideScaler *cpuscaling.CPUOnlineOffliner + cpuSystemWideScaler *cpuscaling.CPUSysFsStateScaler logger *zap.Logger } -// milliCPU is a type that represents CPU in milli units -type milliCPU uint64 - -// milliCPUFromString converts a byte slice to milliCPU -func milliCPUFromString(s []byte) (milliCPU, error) { - cpu, err := strconv.ParseUint(string(s), 10, 32) - if err != nil { - return 0, err - } - return milliCPU(cpu), nil -} - -// ToCPU converts milliCPU to CPU -func (m milliCPU) ToCPU() int { - cpu := float64(m) / 1000.0 - // Use math.Ceil to round up to the next CPU. - return int(math.Ceil(cpu)) -} - func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) { + // should be replaced with cgroups milliseconds to be exposed instead of having CPU s.cpuOperationsMutex.Lock() defer s.cpuOperationsMutex.Unlock() - totalCPUs, err := s.cpuSystemWideScaler.GetTotalCPUsCount() - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } activeCPUs, err := s.cpuSystemWideScaler.GetActiveCPUsCount() if err != nil { w.WriteHeader(http.StatusInternalServerError) return } - w.Write([]byte(fmt.Sprintf("%d %d", activeCPUs, totalCPUs))) w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte(fmt.Sprintf("%d", activeCPUs*1000))); err != nil { + s.logger.Error("could not write response", zap.Error(err)) + } } func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { + // TODO: should the call to this method be conditional, only if the statefs cpu scaling is enabled? + // on the other hand, currently this endpoint is called by runner only if the statefs scaling is enabled + // and it is a bit tricky to pass vmSpec here s.cpuOperationsMutex.Lock() defer s.cpuOperationsMutex.Unlock() body, err := io.ReadAll(r.Body) @@ -92,19 +76,20 @@ func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - - milliCPU, err := milliCPUFromString(body) + updateInt, err := strconv.Atoi(string(body)) + update := vmv1.MilliCPU(updateInt) if err != nil { - s.logger.Error("could not parse request body as uint32", zap.Error(err)) + s.logger.Error("could not unmarshal request body", zap.Error(err)) w.WriteHeader(http.StatusBadRequest) return } - if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(milliCPU.ToCPU()); err != nil { - s.logger.Error("failed to ensure online CPUs", zap.Error(err)) + s.logger.Info("Setting CPU status", zap.String("body", string(body))) + if err := s.cpuSystemWideScaler.EnsureOnlineCPUs(int(update.RoundedUp())); err != nil { + s.logger.Error("could not ensure online CPUs", zap.Error(err)) w.WriteHeader(http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + w.WriteHeader(http.StatusOK) } func (s *cpuServer) run(addr string) { diff --git a/neonvm-runner/cmd/main.go b/neonvm-runner/cmd/main.go index 7999324f4..864fe2b21 100644 --- a/neonvm-runner/cmd/main.go +++ b/neonvm-runner/cmd/main.go @@ -616,7 +616,6 @@ type Config struct { diskCacheSettings string memoryProvider vmv1.MemoryProvider autoMovableRatio string - useOnlineOfflining bool } func newConfig(logger *zap.Logger) *Config { @@ -630,7 +629,6 @@ func newConfig(logger *zap.Logger) *Config { diskCacheSettings: "cache=none", memoryProvider: "", // Require that this is explicitly set. We'll check later. autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later. - useOnlineOfflining: false, } flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump, "Base64 encoded VirtualMachine json specification") @@ -651,7 +649,6 @@ func newConfig(logger *zap.Logger) *Config { flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc) flag.StringVar(&cfg.autoMovableRatio, "memhp-auto-movable-ratio", cfg.autoMovableRatio, "Set value of kernel's memory_hotplug.auto_movable_ratio [virtio-mem only]") - flag.BoolVar(&cfg.useOnlineOfflining, "use-online-offlining", false, "Use online offlining for CPU scaling") flag.Parse() if cfg.memoryProvider == "" { @@ -761,7 +758,7 @@ func run(logger *zap.Logger) error { tg.Go("qemu-cmd", func(logger *zap.Logger) error { var err error - qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname, cfg.useOnlineOfflining) + qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, enableSSH, swapSize, hostname) return err }) @@ -815,7 +812,6 @@ func buildQEMUCmd( enableSSH bool, swapSize *resource.Quantity, hostname string, - useOnlineOfflining bool, ) ([]string, error) { // prepare qemu command line qemuCmd := []string{ @@ -893,21 +889,26 @@ func buildQEMUCmd( logger.Warn("not using KVM acceleration") } qemuCmd = append(qemuCmd, "-cpu", "max") - if useOnlineOfflining { - // if we use online offlining we specify start cpus equal to max cpus + + // cpu scaling details + maxCPUs := vmSpec.Guest.CPUs.Max.RoundedUp() + minCPUs := vmSpec.Guest.CPUs.Min.RoundedUp() + + if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState { + // if we use sysfs based scaling we specify start cpus equal to max cpus qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf( "cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1", - vmSpec.Guest.CPUs.Max.RoundedUp(), - vmSpec.Guest.CPUs.Max.RoundedUp(), - vmSpec.Guest.CPUs.Max.RoundedUp(), + maxCPUs, + maxCPUs, + maxCPUs, )) } else { // if we use hotplug we specify start cpus equal to min cpus and scale using udev rules for cpu plug events qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf( "cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1", - vmSpec.Guest.CPUs.Min.RoundedUp(), - vmSpec.Guest.CPUs.Max.RoundedUp(), - vmSpec.Guest.CPUs.Max.RoundedUp(), + minCPUs, + maxCPUs, + maxCPUs, )) } @@ -997,8 +998,8 @@ func makeKernelCmdline(cfg *Config, vmSpec *vmv1.VirtualMachineSpec, vmStatus *v if cfg.appendKernelCmdline != "" { cmdlineParts = append(cmdlineParts, cfg.appendKernelCmdline) } - if cfg.useOnlineOfflining { - // if we use online offlining we need to specify the start cpus as min CPUs + if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState { + // if we use sysfs based scaling we need to specify the start cpus as min CPUs cmdlineParts = append(cmdlineParts, fmt.Sprintf("maxcpus=%d", vmSpec.Guest.CPUs.Min.RoundedUp())) } @@ -1047,8 +1048,12 @@ func runQEMU( wg := sync.WaitGroup{} wg.Add(1) + useCpuSysfsStateScaling := false + if vmSpec.CpuScalingMode != nil && *vmSpec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState { + useCpuSysfsStateScaling = true + } go terminateQemuOnSigterm(ctx, logger, &wg) - if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer { + if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer || useCpuSysfsStateScaling { var callbacks cpuServerCallbacks if cfg.enableDummyCPUServer { @@ -1060,6 +1065,13 @@ func runQEMU( return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil }, set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error { + if useCpuSysfsStateScaling { + err := setNeonvmDaemonCPU(cpu) + if err != nil { + logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err)) + return err + } + } lastValue.Store(uint32(cpu)) return nil }, @@ -1830,3 +1842,33 @@ func overlayNetwork(iface string) (mac.MAC, error) { return mac, nil } + +func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error { + _, vmIP, _, err := calcIPs(defaultNetworkCIDR) + if err != nil { + return fmt.Errorf("could not calculate VM IP address: %w", err) + } + + ctx, cancel := context.WithTimeout(context.TODO(), time.Second) + defer cancel() + + url := fmt.Sprintf("http://%s:25183/cpu", vmIP) + body := bytes.NewReader([]byte(fmt.Sprintf("%d", uint32(cpu)))) + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body) + if err != nil { + return fmt.Errorf("could not build request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("could not send request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode) + } + + return nil +} diff --git a/pkg/api/types.go b/pkg/api/types.go index d71570def..038f8e953 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -476,12 +476,17 @@ type RunnerProtoVersion uint32 const ( RunnerProtoV1 RunnerProtoVersion = iota + 1 + RunnerProtoV2 ) func (v RunnerProtoVersion) SupportsCgroupFractionalCPU() bool { return v >= RunnerProtoV1 } +func (v RunnerProtoVersion) SupportsCpuSysfsStateScaling() bool { + return v >= RunnerProtoV2 +} + //////////////////////////////////// // Agent <-> Monitor Messages // //////////////////////////////////// diff --git a/pkg/neonvm/controllers/config.go b/pkg/neonvm/controllers/config.go index 2bcfe3362..eb81d98b2 100644 --- a/pkg/neonvm/controllers/config.go +++ b/pkg/neonvm/controllers/config.go @@ -11,11 +11,6 @@ import ( // ReconcilerConfig stores shared configuration for VirtualMachineReconciler and // VirtualMachineMigrationReconciler. type ReconcilerConfig struct { - // UseOnlineOfflining, if true, enables using online offlining for new VM runner pods instead of QMP cpu hotplugging. - // - // This is defined as a config option so we can do a gradual rollout of this change. - UseOnlineOfflining bool - // DisableRunnerCgroup, if true, disables running QEMU in a cgroup in new VM runner pods. // Fractional CPU scaling will continue to *pretend* to work, but it will not do anything in // practice. diff --git a/pkg/neonvm/controllers/vm_controller.go b/pkg/neonvm/controllers/vm_controller.go index 246655278..e22302e3a 100644 --- a/pkg/neonvm/controllers/vm_controller.go +++ b/pkg/neonvm/controllers/vm_controller.go @@ -81,8 +81,7 @@ type VMReconciler struct { Scheme *runtime.Scheme Recorder record.EventRecorder Config *ReconcilerConfig - - Metrics ReconcilerMetrics `exhaustruct:"optional"` + Metrics ReconcilerMetrics `exhaustruct:"optional"` } // The following markers are used to generate the rules permissions (RBAC) on config/rbac using controller-gen @@ -273,18 +272,18 @@ func (r *VMReconciler) updateVMStatusCPU( ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod, - qmpPluggedCPUs uint32, + activeCPUs uint32, cgroupUsage *api.VCPUCgroup, ) { log := log.FromContext(ctx) // We expect: // - vm.Status.CPUs = cgroupUsage.VCPUs - // - vm.Status.CPUs.RoundUp() == qmpPluggedCPUs + // - vm.Status.CPUs.RoundUp() == activeCPUs // Otherwise, we update the status. var currentCPUUsage vmv1.MilliCPU if cgroupUsage != nil { - if cgroupUsage.VCPUs.RoundedUp() != qmpPluggedCPUs { + if cgroupUsage.VCPUs.RoundedUp() != activeCPUs { // This is not expected but it's fine. We only report the // mismatch here and will resolve it in the next reconcile // iteration loops by comparing these values to spec CPU use @@ -292,12 +291,12 @@ func (r *VMReconciler) updateVMStatusCPU( log.Error(nil, "Mismatch in the number of VM's plugged CPUs and runner pod's cgroup vCPUs", "VirtualMachine", vm.Name, "Runner Pod", vmRunner.Name, - "plugged CPUs", qmpPluggedCPUs, + "plugged CPUs", activeCPUs, "cgroup vCPUs", cgroupUsage.VCPUs) } - currentCPUUsage = min(cgroupUsage.VCPUs, vmv1.MilliCPU(1000*qmpPluggedCPUs)) + currentCPUUsage = min(cgroupUsage.VCPUs, vmv1.MilliCPU(1000*activeCPUs)) } else { - currentCPUUsage = vmv1.MilliCPU(1000 * qmpPluggedCPUs) + currentCPUUsage = vmv1.MilliCPU(1000 * activeCPUs) } if vm.Status.CPUs == nil || *vm.Status.CPUs != currentCPUUsage { vm.Status.CPUs = ¤tCPUUsage @@ -310,10 +309,10 @@ func (r *VMReconciler) updateVMStatusCPU( func (r *VMReconciler) updateVMStatusMemory( vm *vmv1.VirtualMachine, - qmpMemorySize *resource.Quantity, + QmpMemorySize *resource.Quantity, ) { - if vm.Status.MemorySize == nil || !qmpMemorySize.Equal(*vm.Status.MemorySize) { - vm.Status.MemorySize = qmpMemorySize + if vm.Status.MemorySize == nil || !QmpMemorySize.Equal(*vm.Status.MemorySize) { + vm.Status.MemorySize = QmpMemorySize r.Recorder.Event(vm, "Normal", "MemoryInfo", fmt.Sprintf("VirtualMachine %s uses %v memory", vm.Name, @@ -549,21 +548,25 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) return err } - // get CPU details from QEMU - cpuSlotsPlugged, _, err := QmpGetCpus(QmpAddr(vm)) - if err != nil { - log.Error(err, "Failed to get CPU details from VirtualMachine", "VirtualMachine", vm.Name) - return err - } - pluggedCPU := uint32(len(cpuSlotsPlugged)) - // get cgroups CPU details from runner pod cgroupUsage, err := getRunnerCgroup(ctx, vm) if err != nil { log.Error(err, "Failed to get CPU details from runner", "VirtualMachine", vm.Name) return err } - + var pluggedCPU uint32 + if vm.Spec.CpuScalingMode != nil && *vm.Spec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState { + log.Info("CPU scaling mode is set to CpuSysfsState, CPU usage check based on cgroups") + pluggedCPU = cgroupUsage.VCPUs.RoundedUp() + } else { + // get CPU details from QEMU + cpuSlotsPlugged, _, err := QmpGetCpus(QmpAddr(vm)) + if err != nil { + log.Error(err, "Failed to get CPU details from VirtualMachine", "VirtualMachine", vm.Name) + return err + } + pluggedCPU = uint32(len(cpuSlotsPlugged)) + } // update status by CPUs used in the VM r.updateVMStatusCPU(ctx, vm, vmRunner, pluggedCPU, cgroupUsage) @@ -690,62 +693,12 @@ func (r *VMReconciler) doReconcile(ctx context.Context, vm *vmv1.VirtualMachine) return err } - cpuScaled := false - ramScaled := false - - // do hotplug/unplug CPU - // firstly get current state from QEMU - cpuSlotsPlugged, _, err := QmpGetCpus(QmpAddr(vm)) + cpuScaled, err := r.handleCPUScaling(ctx, vm, vmRunner) if err != nil { - log.Error(err, "Failed to get CPU details from VirtualMachine", "VirtualMachine", vm.Name) + log.Error(err, "failed to handle CPU scaling") return err } - specCPU := vm.Spec.Guest.CPUs.Use - pluggedCPU := uint32(len(cpuSlotsPlugged)) - - cgroupUsage, err := getRunnerCgroup(ctx, vm) - if err != nil { - log.Error(err, "Failed to get CPU details from runner", "VirtualMachine", vm.Name) - return err - } - - // compare guest spec to count of plugged and runner pod cgroups - if specCPU.RoundedUp() > pluggedCPU { - // going to plug one CPU - log.Info("Plug one more CPU into VM") - if err := QmpPlugCpu(QmpAddr(vm)); err != nil { - return err - } - r.Recorder.Event(vm, "Normal", "ScaleUp", - fmt.Sprintf("One more CPU was plugged into VM %s", - vm.Name)) - } else if specCPU.RoundedUp() < pluggedCPU { - // going to unplug one CPU - log.Info("Unplug one CPU from VM") - if err := QmpUnplugCpu(QmpAddr(vm)); err != nil { - return err - } - r.Recorder.Event(vm, "Normal", "ScaleDown", - fmt.Sprintf("One CPU was unplugged from VM %s", - vm.Name)) - } else if specCPU != cgroupUsage.VCPUs { - log.Info("Update runner pod cgroups", "runner", cgroupUsage.VCPUs, "spec", specCPU) - if err := setRunnerCgroup(ctx, vm, specCPU); err != nil { - return err - } - reason := "ScaleDown" - if specCPU > cgroupUsage.VCPUs { - reason = "ScaleUp" - } - r.Recorder.Event(vm, "Normal", reason, - fmt.Sprintf("Runner pod cgroups was updated on VM %s", - vm.Name)) - } else { - // seems already plugged correctly - cpuScaled = true - } - // update status by CPUs used in the VM - r.updateVMStatusCPU(ctx, vm, vmRunner, pluggedCPU, cgroupUsage) + ramScaled := false // do hotplug/unplug Memory switch *vm.Status.MemoryProvider { @@ -1303,7 +1256,7 @@ func setRunnerCgroup(ctx context.Context, vm *vmv1.VirtualMachine, cpu vmv1.Mill defer resp.Body.Close() if resp.StatusCode != 200 { - return fmt.Errorf("unexpected status %s", resp.Status) + return fmt.Errorf("setRunnerCgroup: unexpected status %s", resp.Status) } return nil } @@ -1325,7 +1278,7 @@ func getRunnerCgroup(ctx context.Context, vm *vmv1.VirtualMachine) (*api.VCPUCgr } if resp.StatusCode != 200 { - return nil, fmt.Errorf("unexpected status %s", resp.Status) + return nil, fmt.Errorf("getRunnerCgroup: unexpected status %s", resp.Status) } body, err := io.ReadAll(resp.Body) @@ -1333,7 +1286,6 @@ func getRunnerCgroup(ctx context.Context, vm *vmv1.VirtualMachine) (*api.VCPUCgr if err != nil { return nil, err } - var result api.VCPUCgroup err = json.Unmarshal(body, &result) if err != nil { @@ -1447,9 +1399,6 @@ func podSpec( }}, Command: func() []string { cmd := []string{"runner"} - if config.UseOnlineOfflining { - cmd = append(cmd, "-use-online-offlining") // TODO: need to force enable-dummy-cpu-server - } if config.DisableRunnerCgroup { cmd = append(cmd, "-skip-cgroup-management") // cgroup management disabled, but we still need something to provide diff --git a/pkg/neonvm/controllers/vm_controller_handle_cpu_scaling.go b/pkg/neonvm/controllers/vm_controller_handle_cpu_scaling.go new file mode 100644 index 000000000..c1df5d88c --- /dev/null +++ b/pkg/neonvm/controllers/vm_controller_handle_cpu_scaling.go @@ -0,0 +1,124 @@ +package controllers + +import ( + "context" + "fmt" + + "sigs.k8s.io/controller-runtime/pkg/log" + + corev1 "k8s.io/api/core/v1" + + vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" + "github.com/neondatabase/autoscaling/pkg/api" +) + +// handleCPUScaling is extracted from doReconcile and encapsulates the logic to handle CPU scaling. +// if vm scaling mode is set to CpuSysfsState, the scaling is delegated to neonvm-daemon +// otherwise the scaling is first done by scaling amount of cores in the VM using QMP and then by updating the cgroup +func (r *VMReconciler) handleCPUScaling(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) { + + log := log.FromContext(ctx) + useCpuSysfsStateScaling := false + if vm.Spec.CpuScalingMode != nil && *vm.Spec.CpuScalingMode == vmv1.CpuScalingModeCpuSysfsState { + useCpuSysfsStateScaling = true + } + + var scaled bool + var err error + if !useCpuSysfsStateScaling { + scaled, err = r.handleQMPBasedCPUScaling(ctx, vm, vmRunner) + } else { + scaled, err = r.delegateScalingToNeonvmDaemon(ctx, vm, vmRunner) + } + + if err != nil { + log.Error(err, "Failed to scale CPU", "VirtualMachine", vm.Name, "use_cpu_sysfs_state", useCpuSysfsStateScaling) + return false, err + } + + return scaled, nil +} + +// handleQMPBasedCPUScaling handles CPU scaling using QMP, extracted as is from doReconcile +func (r *VMReconciler) handleQMPBasedCPUScaling(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) { + log := log.FromContext(ctx) + specCPU := vm.Spec.Guest.CPUs.Use + cgroupUsage, err := getRunnerCgroup(ctx, vm) + if err != nil { + log.Error(err, "Failed to get CPU details from runner", "VirtualMachine", vm.Name) + return false, err + } + var hotPlugCPUScaled bool + var pluggedCPU uint32 + cpuSlotsPlugged, _, err := QmpGetCpus(QmpAddr(vm)) + if err != nil { + log.Error(err, "Failed to get CPU details from VirtualMachine", "VirtualMachine", vm.Name) + return false, err + } + pluggedCPU = uint32(len(cpuSlotsPlugged)) + + log.Info("Using QMP CPU control") + if specCPU.RoundedUp() > pluggedCPU { + // going to plug one CPU + log.Info("Plug one more CPU into VM") + if err := QmpPlugCpu(QmpAddr(vm)); err != nil { + return false, err + } + r.Recorder.Event(vm, "Normal", "ScaleUp", + fmt.Sprintf("One more CPU was plugged into VM %s", + vm.Name)) + } else if specCPU.RoundedUp() < pluggedCPU { + // going to unplug one CPU + log.Info("Unplug one CPU from VM") + if err := QmpUnplugCpu(QmpAddr(vm)); err != nil { + return false, err + } + r.Recorder.Event(vm, "Normal", "ScaleDown", + fmt.Sprintf("One CPU was unplugged from VM %s", + vm.Name)) + return false, nil + } else if specCPU != cgroupUsage.VCPUs { + _, err := r.handleCgroupCPUUpdate(ctx, vm, cgroupUsage) + if err != nil { + log.Error(err, "Failed to update cgroup CPU", "VirtualMachine", vm.Name) + return false, err + } + } else { + log.Info("No need to plug or unplug CPU") + hotPlugCPUScaled = true + } + r.updateVMStatusCPU(ctx, vm, vmRunner, pluggedCPU, cgroupUsage) + return hotPlugCPUScaled, nil +} + +func (r *VMReconciler) delegateScalingToNeonvmDaemon(ctx context.Context, vm *vmv1.VirtualMachine, vmRunner *corev1.Pod) (bool, error) { + log := log.FromContext(ctx) + specCPU := vm.Spec.Guest.CPUs.Use + + cgroupUsage, err := getRunnerCgroup(ctx, vm) + if err != nil { + log.Error(err, "Failed to get CPU details from runner", "VirtualMachine", vm.Name) + return false, err + } + if specCPU != cgroupUsage.VCPUs { + return r.handleCgroupCPUUpdate(ctx, vm, cgroupUsage) + } + r.updateVMStatusCPU(ctx, vm, vmRunner, cgroupUsage.VCPUs.RoundedUp(), cgroupUsage) + return true, nil + +} + +func (r *VMReconciler) handleCgroupCPUUpdate(ctx context.Context, vm *vmv1.VirtualMachine, cgroupUsage *api.VCPUCgroup) (bool, error) { + specCPU := vm.Spec.Guest.CPUs.Use + if err := setRunnerCgroup(ctx, vm, specCPU); err != nil { + return false, err + } + reason := "ScaleDown" + if specCPU > cgroupUsage.VCPUs { + reason = "ScaleUp" + } + r.Recorder.Event(vm, "Normal", reason, + fmt.Sprintf("Runner pod cgroups was updated on VM %s %s", + vm.Name, specCPU)) + return true, nil +} diff --git a/pkg/neonvm/controllers/vm_qmp_test.go b/pkg/neonvm/controllers/vm_qmp_test.go index d70bc5ea5..f69462f76 100644 --- a/pkg/neonvm/controllers/vm_qmp_test.go +++ b/pkg/neonvm/controllers/vm_qmp_test.go @@ -1,10 +1,8 @@ -package controllers_test +package controllers import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - - "github.com/neondatabase/autoscaling/pkg/neonvm/controllers" ) type qmpEvent struct { @@ -43,14 +41,14 @@ var _ = Describe("VM QMP interaction", func() { Context("QMP test", func() { It("should support basic QMP operations", func() { By("adding memslot") - qmp := newQMPMock() - defer qmp.done() - qmp.expect(` + qmpMock := newQMPMock() + defer qmpMock.done() + qmpMock.expect(` {"execute": "object-add", "arguments": {"id": "memslot1", "size": 100, "qom-type": "memory-backend-ram"}}`, `{}`) - err := controllers.QmpAddMemoryBackend(qmp, 1, 100) + err := QmpAddMemoryBackend(qmpMock, 1, 100) Expect(err).To(Not(HaveOccurred())) }) }) diff --git a/neonvm-daemon/pkg/cpuscaling/online_offline_cpu.go b/pkg/neonvm/cpuscaling/sysfsscaling.go similarity index 92% rename from neonvm-daemon/pkg/cpuscaling/online_offline_cpu.go rename to pkg/neonvm/cpuscaling/sysfsscaling.go index 3bb661798..b8da5992d 100644 --- a/neonvm-daemon/pkg/cpuscaling/online_offline_cpu.go +++ b/pkg/neonvm/cpuscaling/sysfsscaling.go @@ -11,10 +11,10 @@ import ( // CPU directory path const cpuPath = "/sys/devices/system/cpu/" -type CPUOnlineOffliner struct { +type CPUSysFsStateScaler struct { } -func (c *CPUOnlineOffliner) EnsureOnlineCPUs(X int) error { +func (c *CPUSysFsStateScaler) EnsureOnlineCPUs(X int) error { cpus, err := getAllCPUs() if err != nil { return err @@ -89,7 +89,7 @@ func (c *CPUOnlineOffliner) EnsureOnlineCPUs(X int) error { } // GetActiveCPUsCount() returns the count of online CPUs. -func (c *CPUOnlineOffliner) GetActiveCPUsCount() (uint32, error) { +func (c *CPUSysFsStateScaler) GetActiveCPUsCount() (uint32, error) { cpus, err := getAllCPUs() if err != nil { return 0, err @@ -110,7 +110,7 @@ func (c *CPUOnlineOffliner) GetActiveCPUsCount() (uint32, error) { } // GetTotalCPUsCount returns the total number of CPUs (online + offline). -func (c *CPUOnlineOffliner) GetTotalCPUsCount() (uint32, error) { +func (c *CPUSysFsStateScaler) GetTotalCPUsCount() (uint32, error) { cpus, err := getAllCPUs() if err != nil { return 0, err @@ -189,7 +189,7 @@ func setCPUOnline(cpu int, online bool) error { err := os.WriteFile(filepath.Join(cpuPath, fmt.Sprintf("cpu%d/online", cpu)), []byte(state), 0644) if err != nil { - return fmt.Errorf("failed to set CPU %d online status: %v", cpu, err) + return fmt.Errorf("failed to set CPU %d online status: %w", cpu, err) } return nil diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-assert.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-assert.yaml new file mode 100644 index 000000000..3a0b27560 --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-assert.yaml @@ -0,0 +1,17 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 250m + memorySize: 1Gi + memoryProvider: DIMMSlots diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-create-vm.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-create-vm.yaml new file mode 100644 index 000000000..b32f10d09 --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/00-create-vm.yaml @@ -0,0 +1,102 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Service +metadata: + name: example +spec: + ports: + - name: postgres + port: 5432 + protocol: TCP + targetPort: postgres + type: NodePort + selector: + vm.neon.tech/name: example +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example + labels: + autoscaling.neon.tech/enabled: "true" + annotations: + autoscaling.neon.tech/bounds: '{ "min": { "cpu": "250m", "mem": "1Gi" }, "max": {"cpu": 3, "mem": "4Gi" } }' +spec: + cpuScalingMode: sysfsScaling + schedulerName: autoscale-scheduler + guest: + cpus: + min: 0.25 + max: 3.25 # set value greater than bounds so our tests check we don't exceed the bounds. + use: 0.5 + memorySlotSize: 1Gi + memorySlots: + min: 1 + max: 5 + use: 1 + memoryProvider: DIMMSlots + rootDisk: + image: vm-postgres:15-bullseye + size: 8Gi + args: + - -c + - 'config_file=/etc/postgresql/postgresql.conf' + env: + # for testing only - allows login without password + - name: POSTGRES_HOST_AUTH_METHOD + value: trust + ports: + - name: postgres + port: 5432 + - name: host-metrics + port: 9100 + - name: monitor + port: 10301 + extraNetwork: + enable: true + disks: + - name: pgdata + mountPath: /var/lib/postgresql + emptyDisk: + size: 16Gi + - name: postgres-config + mountPath: /etc/postgresql + configMap: + name: example-config + items: + - key: postgresql.conf + path: postgresql.conf + - name: cache + mountPath: /neonvm/cache + tmpfs: + size: 1Gi + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: example-config +data: + postgresql.conf: | + listen_addresses = '*' + shared_preload_libraries = 'pg_stat_statements' + + max_connections = 64 + shared_buffers = 256MB + effective_cache_size = 1536MB + maintenance_work_mem = 128MB + checkpoint_completion_target = 0.9 + wal_buffers = 16MB + default_statistics_target = 100 + random_page_cost = 1.1 + effective_io_concurrency = 200 + work_mem = 4MB + min_wal_size = 1GB + max_wal_size = 4GB + max_worker_processes = 4 + max_parallel_workers_per_gather = 2 + max_parallel_workers = 4 + max_parallel_maintenance_workers = 2 diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-assert.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-assert.yaml new file mode 100644 index 000000000..336cca86d --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-assert.yaml @@ -0,0 +1,23 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 90 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 3 + memorySize: 4Gi +--- +apiVersion: v1 +kind: pod +metadata: + name: workload +status: + phase: Running diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-upscale.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-upscale.yaml new file mode 100644 index 000000000..d95636340 --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/01-upscale.yaml @@ -0,0 +1,49 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +unitTest: false +--- +apiVersion: v1 +kind: Pod +metadata: + name: workload +spec: + terminationGracePeriodSeconds: 1 + initContainers: + - name: wait-for-pg + image: postgres:15-bullseye + command: + - sh + - "-c" + - | + set -e + until pg_isready --username=postgres --dbname=postgres --host=example --port=5432; do + sleep 1 + done + containers: + - name: pgbench + image: postgres:15-bullseye + volumeMounts: + - name: my-volume + mountPath: /etc/misc + command: + - pgbench + args: + - postgres://postgres@example:5432/postgres + - --client=20 + - --progress=1 + - --progress-timestamp + - --time=600 + - --file=/etc/misc/query.sql + volumes: + - name: my-volume + configMap: + name: query + restartPolicy: Never +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: query +data: + query.sql: | + select length(factorial(length(factorial(1223)::text)/2)::text); diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-assert.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-assert.yaml new file mode 100644 index 000000000..8e97b6494 --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-assert.yaml @@ -0,0 +1,16 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: vm.neon.tech/v1 +kind: VirtualMachine +metadata: + name: example +status: + phase: Running + restartCount: 0 + conditions: + - type: Available + status: "True" + cpus: 250m + memorySize: 1Gi diff --git a/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-downscale.yaml b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-downscale.yaml new file mode 100644 index 000000000..160f31462 --- /dev/null +++ b/tests/e2e/autoscaling.cpu-sys-fs-state-scaling/02-downscale.yaml @@ -0,0 +1,7 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: +- apiVersion: v1 + kind: Pod + name: workload +unitTest: false