Skip to content

Commit

Permalink
neonvm: introduce CPU sysfs state scaling flow based on the vmSpec.cp…
Browse files Browse the repository at this point in the history
…uScalingMode (#1111)

Introduce separate CPU scaling flow based on the vmSpec.cpuScalingMode

If vmSpec.cpuScalingMode is equal to `qmpScaling` the logic of the
scaling is preserved as before:

- Scale, if required the amount of CPUs using qmp commands.
- If it is required to scale cgroups, call vm-runner /cpu_change
endpoint

if vmSpec.cpuScalingMode is equal to `sysfsScaling` all cpu
scaling requests go directly to the vm-runner /cpu_change, which in
that configuration goes to the neonvm-daemon to reconcile required
amount of online CPUs.

Value `cpuSysfsStateScaling` also modifies the qemu and the kernel
arguments to enable plug all CPUs but mark as online only first one.

---------

Signed-off-by: Misha Sakhnov <[email protected]>
Co-authored-by: Oleg Vasilev <[email protected]>
Co-authored-by: Em Sharnoff <[email protected]>
  • Loading branch information
3 people authored Nov 27, 2024
1 parent cf514ed commit 0bc5fe1
Show file tree
Hide file tree
Showing 25 changed files with 1,096 additions and 222 deletions.
3 changes: 3 additions & 0 deletions neonvm-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func main() {
var concurrencyLimit int
var skipUpdateValidationFor map[types.NamespacedName]struct{}
var disableRunnerCgroup bool
var defaultCpuScalingMode vmv1.CpuScalingMode
var qemuDiskCacheSettings string
var defaultMemoryProvider vmv1.MemoryProvider
var memhpAutoMovableRatio string
Expand Down Expand Up @@ -133,6 +134,7 @@ func main() {
return nil
},
)
flag.Func("default-cpu-scaling-mode", "Set default cpu scaling mode to use for new VMs", defaultCpuScalingMode.FlagFunc)
flag.BoolVar(&disableRunnerCgroup, "disable-runner-cgroup", false, "Disable creation of a cgroup in neonvm-runner for fractional CPU limiting")
flag.StringVar(&qemuDiskCacheSettings, "qemu-disk-cache-settings", "cache=none", "Set neonvm-runner's QEMU disk cache settings")
flag.Func("default-memory-provider", "Set default memory provider to use for new VMs", defaultMemoryProvider.FlagFunc)
Expand Down Expand Up @@ -200,6 +202,7 @@ func main() {
FailurePendingPeriod: failurePendingPeriod,
FailingRefreshInterval: failingRefreshInterval,
AtMostOnePod: atMostOnePod,
DefaultCPUScalingMode: defaultCpuScalingMode,
}

vmReconciler := &controllers.VMReconciler{
Expand Down
81 changes: 70 additions & 11 deletions neonvm-daemon/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ package main
import (
"flag"
"fmt"
"io"
"net/http"
"os"
"strconv"
"sync"
"time"

"go.uber.org/zap"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/neonvm/cpuscaling"
)

func main() {
Expand All @@ -26,24 +32,77 @@ func main() {
defer logger.Sync() //nolint:errcheck // what are we gonna do, log something about it?

logger.Info("Starting neonvm-daemon", zap.String("addr", *addr))
srv := cpuServer{
cpuOperationsMutex: &sync.Mutex{},
cpuScaler: cpuscaling.NewCPUScaler(),
logger: logger.Named("cpu-srv"),
}
srv.run(*addr)
}

type cpuServer struct {
// Protects CPU operations from concurrent access to prevent multiple ensureOnlineCPUs calls from running concurrently
// and ensure that status response is always actual
cpuOperationsMutex *sync.Mutex
cpuScaler *cpuscaling.CPUScaler
logger *zap.Logger
}

func (s *cpuServer) handleGetCPUStatus(w http.ResponseWriter) {
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()

activeCPUs, err := s.cpuScaler.ActiveCPUsCount()
if err != nil {
s.logger.Error("could not get active CPUs count", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)

srv := cpuServer{}
srv.run(logger, *addr)
if _, err := w.Write([]byte(fmt.Sprintf("%d", activeCPUs*1000))); err != nil {
s.logger.Error("could not write response", zap.Error(err))
}
}

type cpuServer struct{}
func (s *cpuServer) handleSetCPUStatus(w http.ResponseWriter, r *http.Request) {
s.cpuOperationsMutex.Lock()
defer s.cpuOperationsMutex.Unlock()

func (s *cpuServer) run(logger *zap.Logger, addr string) {
logger = logger.Named("cpu-srv")
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error("could not read request body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

updateInt, err := strconv.Atoi(string(body))
if err != nil {
s.logger.Error("could not unmarshal request body", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}

s.logger.Debug("Setting CPU status", zap.String("body", string(body)))
update := vmv1.MilliCPU(updateInt)
if err := s.cpuScaler.ReconcileOnlineCPU(int(update.RoundedUp())); err != nil {
s.logger.Error("could not ensure online CPUs", zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}

func (s *cpuServer) run(addr string) {
mux := http.NewServeMux()
mux.HandleFunc("/cpu", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
logger.Error("unimplemented!")
w.WriteHeader(http.StatusInternalServerError)
s.handleGetCPUStatus(w)
return
} else if r.Method == http.MethodPut {
logger.Error("unimplemented!")
w.WriteHeader(http.StatusInternalServerError)
s.handleSetCPUStatus(w, r)
return
} else {
// unknown method
w.WriteHeader(http.StatusNotFound)
Expand All @@ -61,7 +120,7 @@ func (s *cpuServer) run(logger *zap.Logger, addr string) {

err := server.ListenAndServe()
if err != nil {
logger.Fatal("CPU server exited with error", zap.Error(err))
s.logger.Fatal("CPU server exited with error", zap.Error(err))
}
logger.Info("CPU server exited without error")
s.logger.Info("CPU server exited without error")
}
161 changes: 91 additions & 70 deletions neonvm-runner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"os/signal"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -612,10 +611,10 @@ type Config struct {
kernelPath string
appendKernelCmdline string
skipCgroupManagement bool
enableDummyCPUServer bool
diskCacheSettings string
memoryProvider vmv1.MemoryProvider
autoMovableRatio string
cpuScalingMode vmv1.CpuScalingMode
}

func newConfig(logger *zap.Logger) *Config {
Expand All @@ -625,10 +624,10 @@ func newConfig(logger *zap.Logger) *Config {
kernelPath: defaultKernelPath,
appendKernelCmdline: "",
skipCgroupManagement: false,
enableDummyCPUServer: false,
diskCacheSettings: "cache=none",
memoryProvider: "", // Require that this is explicitly set. We'll check later.
autoMovableRatio: "", // Require that this is explicitly set IFF memoryProvider is VirtioMem. We'll check later.
cpuScalingMode: "", // Require that this is explicitly set. We'll check later.
}
flag.StringVar(&cfg.vmSpecDump, "vmspec", cfg.vmSpecDump,
"Base64 encoded VirtualMachine json specification")
Expand All @@ -641,15 +640,12 @@ func newConfig(logger *zap.Logger) *Config {
flag.BoolVar(&cfg.skipCgroupManagement, "skip-cgroup-management",
cfg.skipCgroupManagement,
"Don't try to manage CPU")
flag.BoolVar(&cfg.enableDummyCPUServer, "enable-dummy-cpu-server",
cfg.skipCgroupManagement,
"Use with -skip-cgroup-management. Provide a CPU server but don't actually do anything with it")
flag.StringVar(&cfg.diskCacheSettings, "qemu-disk-cache-settings",
cfg.diskCacheSettings, "Cache settings to add to -drive args for VM disks")
flag.Func("memory-provider", "Set provider for memory hotplug", cfg.memoryProvider.FlagFunc)
flag.StringVar(&cfg.autoMovableRatio, "memhp-auto-movable-ratio",
cfg.autoMovableRatio, "Set value of kernel's memory_hotplug.auto_movable_ratio [virtio-mem only]")

flag.Func("cpu-scaling-mode", "Set CPU scaling mode", cfg.cpuScalingMode.FlagFunc)
flag.Parse()

if cfg.memoryProvider == "" {
Expand All @@ -658,8 +654,8 @@ func newConfig(logger *zap.Logger) *Config {
if cfg.memoryProvider == vmv1.MemoryProviderVirtioMem && cfg.autoMovableRatio == "" {
logger.Fatal("missing required flag '-memhp-auto-movable-ratio'")
}
if cfg.enableDummyCPUServer && !cfg.skipCgroupManagement {
logger.Fatal("flag -enable-dummy-cpu-server requires -skip-cgroup-management")
if cfg.cpuScalingMode == "" {
logger.Fatal("missing required flag '-cpu-scaling-mode'")
}

return cfg
Expand Down Expand Up @@ -890,12 +886,32 @@ func buildQEMUCmd(
logger.Warn("not using KVM acceleration")
}
qemuCmd = append(qemuCmd, "-cpu", "max")
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
vmSpec.Guest.CPUs.Min.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
vmSpec.Guest.CPUs.Max.RoundedUp(),
))

// cpu scaling details
maxCPUs := vmSpec.Guest.CPUs.Max.RoundedUp()
minCPUs := vmSpec.Guest.CPUs.Min.RoundedUp()

switch cfg.cpuScalingMode {
case vmv1.CpuScalingModeSysfs:
// Boot with all CPUs plugged, we will online them on-demand
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
maxCPUs,
maxCPUs,
maxCPUs,
))
case vmv1.CpuScalingModeQMP:
// Boot with minCPUs hotplugged, but with slots reserved for maxCPUs.
qemuCmd = append(qemuCmd, "-smp", fmt.Sprintf(
"cpus=%d,maxcpus=%d,sockets=1,cores=%d,threads=1",
minCPUs,
maxCPUs,
maxCPUs,
))
default:
// we should never get here because we validate the flag in newConfig
panic(fmt.Errorf("unknown CPU scaling mode %s", cfg.cpuScalingMode))
}

// memory details
logger.Info(fmt.Sprintf("Using memory provider %s", cfg.memoryProvider))
Expand Down Expand Up @@ -983,6 +999,10 @@ func makeKernelCmdline(cfg *Config, vmSpec *vmv1.VirtualMachineSpec, vmStatus *v
if cfg.appendKernelCmdline != "" {
cmdlineParts = append(cmdlineParts, cfg.appendKernelCmdline)
}
if cfg.cpuScalingMode == vmv1.CpuScalingModeSysfs {
// Limit the number of online CPUs kernel boots with. More CPUs will be enabled on upscaling
cmdlineParts = append(cmdlineParts, fmt.Sprintf("maxcpus=%d", vmSpec.Guest.CPUs.Min.RoundedUp()))
}

return strings.Join(cmdlineParts, " ")
}
Expand Down Expand Up @@ -1030,37 +1050,34 @@ func runQEMU(

wg.Add(1)
go terminateQemuOnSigterm(ctx, logger, &wg)
if !cfg.skipCgroupManagement || cfg.enableDummyCPUServer {
var callbacks cpuServerCallbacks

if cfg.enableDummyCPUServer {
lastValue := &atomic.Uint32{}
lastValue.Store(uint32(vmSpec.Guest.CPUs.Min))

callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
lastValue.Store(uint32(cpu))
return nil
},
}
} else {
// Standard implementation -- actually set the cgroup
callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return getCgroupQuota(cgroupPath)
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
return setCgroupLimit(logger, cpu, cgroupPath)
},
}
}

wg.Add(1)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg)
var callbacks cpuServerCallbacks
// lastValue is used to store last fractional CPU request
// we need to store the value as is because we can't convert it back from MilliCPU
// and otherwise we would have infinite reconciliation loop
// this will eventually be dropped in favor of real fractional CPU scaling based on the cgroups
lastValue := &atomic.Uint32{}
lastValue.Store(uint32(vmSpec.Guest.CPUs.Min))

callbacks = cpuServerCallbacks{
get: func(logger *zap.Logger) (*vmv1.MilliCPU, error) {
return lo.ToPtr(vmv1.MilliCPU(lastValue.Load())), nil
},
set: func(logger *zap.Logger, cpu vmv1.MilliCPU) error {
if cfg.cpuScalingMode == vmv1.CpuScalingModeSysfs {
err := setNeonvmDaemonCPU(cpu)
if err != nil {
logger.Error("setting CPU through NeonVM Daemon failed", zap.Any("cpu", cpu), zap.Error(err))
return err
}
}
lastValue.Store(uint32(cpu))
return nil
},
}

wg.Add(1)
go listenForCPUChanges(ctx, logger, vmSpec.RunnerPort, callbacks, &wg)
wg.Add(1)
go forwardLogs(ctx, logger, &wg)

Expand Down Expand Up @@ -1463,32 +1480,6 @@ func setCgroupLimit(logger *zap.Logger, r vmv1.MilliCPU, cgroupPath string) erro
return nil
}

func getCgroupQuota(cgroupPath string) (*vmv1.MilliCPU, error) {
isV2 := cgroups.Mode() == cgroups.Unified
var path string
if isV2 {
path = filepath.Join(cgroupMountPoint, cgroupPath, "cpu.max")
} else {
path = filepath.Join(cgroupMountPoint, "cpu", cgroupPath, "cpu.cfs_quota_us")
}
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}

arr := strings.Split(strings.Trim(string(data), "\n"), " ")
if len(arr) == 0 {
return nil, errors.New("unexpected cgroup data")
}
quota, err := strconv.ParseUint(arr[0], 10, 64)
if err != nil {
return nil, err
}
cpu := vmv1.MilliCPU(uint32(quota * 1000 / cgroupPeriod))
cpu /= cpuLimitOvercommitFactor
return &cpu, nil
}

func terminateQemuOnSigterm(ctx context.Context, logger *zap.Logger, wg *sync.WaitGroup) {
logger = logger.Named("terminate-qemu-on-sigterm")

Expand Down Expand Up @@ -1812,3 +1803,33 @@ func overlayNetwork(iface string) (mac.MAC, error) {

return mac, nil
}

func setNeonvmDaemonCPU(cpu vmv1.MilliCPU) error {
_, vmIP, _, err := calcIPs(defaultNetworkCIDR)
if err != nil {
return fmt.Errorf("could not calculate VM IP address: %w", err)
}

ctx, cancel := context.WithTimeout(context.TODO(), time.Second)
defer cancel()

url := fmt.Sprintf("http://%s:25183/cpu", vmIP)
body := bytes.NewReader([]byte(fmt.Sprintf("%d", uint32(cpu))))

req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return fmt.Errorf("could not build request: %w", err)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("could not send request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("neonvm-daemon responded with status %d", resp.StatusCode)
}

return nil
}
Loading

0 comments on commit 0bc5fe1

Please sign in to comment.